Alink漫谈(十三):在线学习算法FTRL之具体实现罗西的思考

Alink是阿里巴巴基于实时计算引擎Flink研发的新一代机器学习算法平台,是业界首个同时支持批式算法、流式算法的机器学习平台。本文和上文一起介绍了在线学习算法FTRL在Alink中是如何实现的,希望对大家有所帮助。

为了让大家更好理解,我们再次贴出整体流程图:

在线训练主要逻辑是:

前面说到,FTRL先要训练出一个逻辑回归模型作为FTRL算法的初始模型,这是为了系统冷启动的需要。

具体逻辑回归模型设定/训练是:

//traininitialbatchmodelLogisticRegressionTrainBatchOplr=newLogisticRegressionTrainBatchOp().setVectorCol(vecColName).setLabelCol(labelColName).setWithIntercept(true).setMaxIter(10);BatchOperator<>initModel=featurePipelineModel.transform(trainBatchData).link(lr);训练好之后,模型信息是DataSet类型,位于变量BatchOperator<>initModel之中,这是一个批处理算子。

FtrlTrainStreamOp将initModel作为初始化参数。

FtrlTrainStreamOpmodel=newFtrlTrainStreamOp(initModel)在FtrlTrainStreamOp构造函数中会加载这个模型;

dataBridge=DirectReader.collect(initModel);具体加载时通过MemoryDataBridge直接获取初始化模型DataSet中的数据。

publicMemoryDataBridgegenerate(BatchOperatorbatchOperator,ParamsglobalParams){returnnewMemoryDataBridge(batchOperator.collect());}2.2分割高维向量从前文可知,Alink的FTRL算法设置的特征向量维度是30000。所以算法第一步就是切分高维度向量,以便分布式计算。

StringvecColName="vec";intnumHashFeatures=30000;首先要获取切分信息,代码如下,就是将特征数目featureSize除以并行度parallelism,然后得到了每个task对应系数的初始位置。

privatestaticint[]getSplitInfo(intfeatureSize,booleanhasInterceptItem,intparallelism){intcoefSize=(hasInterceptItem)featureSize+1:featureSize;intsubSize=coefSize/parallelism;int[]poses=newint[parallelism+1];intoffset=coefSize%parallelism;for(inti=0;i

//Tuple5DataStream>input=initData.flatMap(newSplitVector(splitInfo,hasInterceptItem,vectorSize,vectorTrainIdx,featureIdx,labelIdx)).partitionCustom(newCustomBlockPartitioner(),1);具体切分在SplitVector.flatMap函数完成,结果就是把一个高维度向量分割给各个CalcTask。

代码摘要如下:

publicvoidflatMap(Rowrow,Collector>collector)throwsException{ longsampleId=counter;counter+=parallelism;Vectorvec;if(vectorTrainIdx==-1){.....}else{//输入row的第vectorTrainIdx个field就是那个30000大小的系数向量vec=VectorUtil.getVector(row.getField(vectorTrainIdx));}if(vecinstanceofSparseVector){MaptmpVec=newHashMap<>();for(inti=0;i

此处理论上有以下几个重点:

伪代码思路大致如下

doublep=learner.predict(x);//预测learner.updateModel(x,p,y);//更新模型doubleloss=LogLossEvalutor.calLogLoss(p,y);//计算损失evalutor.addLogLoss(loss);//更新损失totalLoss+=loss;trainedNum+=1;具体实施上Alink有自己的特点和调整。

机器学习都需要迭代训练,Alink这里利用了FlinkStream的迭代功能。

IterativeStream的实例是通过DataStream的iterate方法创建的˙。iterate方法存在两个重载形式:

Alink选择了第二种。

在创建ConnectedIterativeStreams时候,用迭代流的初始输入作为第一个输入流,用反馈流作为第二个输入。

每一种数据流(DataStream)都会有与之对应的流转换(StreamTransformation)。IterativeStream对应的转换是FeedbackTransformation。

迭代流(IterativeStream)对应的转换是反馈转换(FeedbackTransformation),它表示拓扑中的一个反馈点(也即迭代头)。一个反馈点包含一个输入边以及若干个反馈边,且Flink要求每个反馈边的并行度必须跟输入边的并行度一致,这一点在往该转换中加入反馈边时会进行校验。

当IterativeStream对象被构造时,FeedbackTransformation的实例会被创建并传递给DataStream的构造方法。

迭代的关闭是通过调用IterativeStream的实例方法closeWith来实现的。这个函数指定了某个流将成为迭代程序的结束,并且这个流将作为输入的第二部分(secondinput)被反馈回迭代。

对于Alink来说,迭代构建代码是:

//traindataformat=//feedbackformat=Tuple7IterativeStream.ConnectedIterativeStreams,Tuple7>iteration=input.iterate(Long.MAX_VALUE).withFeedbackType(TypeInformation.of(newTypeHint>(){}));//即iteration是一个IterativeStream.ConnectedIterativeStreams<...>2.3.2.1迭代的输入从代码和注释可以看出,迭代的两种输入是:

反馈流的设置是通过调用IterativeStream的实例方法closeWith来实现的。Alink这里是

反馈流的格式是:

迭代体由两部分构成:CalcTask/ReduceTask。

CalcTask每一个实例都拥有初始化模型dataBridge。

DataStreamiterativeBody=iteration.flatMap(newCalcTask(dataBridge,splitInfo,getParams()))2.3.3.1迭代初始化迭代是由CalcTask.open函数开始,主要做如下几件事

CalcTask.flatMap1主要实现的是FTRL算法中的predict部分(注意,不是FTRL预测)。

解释:pt=σ(Xtw)是LR的预测函数,求出pt的唯一目的是为了求出目标函数(在LR中采用交叉熵损失函数作为目标函数)对参数w的一阶导数g,gi=(ptyt)xi。此步骤同样适用于FTRL优化其他目标函数,唯一的不同就是求次梯度g(次梯度是左导和右导之间的集合,函数可导--左导等于右导时,次梯度就等于一阶梯度)的方法不同。

函数的输入是"训练输入数据",即SplitVector.flatMap的输出---->CalcCalcTask的输入。输入数据是一个五元组,其格式为traindataformat=

有三点需要注意:

大家会说,不对!predict函数应该是sigmoid=1.0/(1.0+np.exp(-w.dot(x)))。是的,这里还没有做sigmoid操作。当ReduceTask做了聚合之后,会把聚合好的p反馈回迭代体,然后在CalcTask.flatMap2中才会做sigmoid操作。

publicvoidflatMap1(Tuple5value,Collector>out)throwsException{if(!savedFristModel){//第一次进入需要存模型out.collect(Tuple7.of(-1L,0,getRuntimeContext().getIndexOfThisSubtask(),newDenseVector(coef),labelValues,-1.0,modelId++));savedFristModel=true;}LongtimeStamps=System.currentTimeMillis();doublewx=0.0;LongsampleId=value.f0;Vectorvec=value.f3;if(vecinstanceofSparseVector){int[]indices=((SparseVector)vec).getIndices();//这里就是具体的Predictfor(inti=0;i

publicstaticclassReduceTaskextendsRichFlatMapFunction,Tuple7>{privateintparallelism;privateint[]poses;privateMap>buffer;privateMap>>models=newHashMap<>();}flatMap函数大致完成如下功能,即两种归并:

当具体用作输出模型使用时,其变量如下:

models={HashMap@13258}size=1{Long@13456}1->{ArrayList@13678}size=1key={Long@13456}1value={ArrayList@13678}size=10={Tuple2@13698}"(1,0.0-8.244533295515879E-50.0-1.103997743166529E-40.0-3.336931546279811E-5....."2.3.3.4判断是否反馈这个filterresult是用来判断是否反馈的。这里t3.f0是sampleId,t3.f2是subNum。

DataStream>result=iterativeBody.filter(newFilterFunction>(){@Overridepublicbooleanfilter(Tuple7t3)throwsException{//ift3.f0>0&&t3.f2>0thenfeedbackreturn(t3.f0>0&&t3.f2>0);}});对于t3.f0,有两处代码会设置为负值。

对于t3.f2,如果subNum大于零,说明在高维向量切分时候,是得到了有意义的数值。

这里是filteroutput。

DataStreamoutput=iterativeBody.filter(newFilterFunction>(){@Overridepublicbooleanfilter(Tuple7value){/*ifvalue.f0smallthan0,thenoutput*/returnvalue.f0<0;}}).flatMap(newWriteModel(labelType,getVectorCol(),featureCols,hasInterceptItem));2.3.3.6处理反馈数据/更新参数CalcTask.flatMap2实际完成的是FTRL算法的其余部分,即更新参数部分。主要逻辑如下:

在LogisticRegression中,sigmoid函数是σ(a)=1/(1+exp(-a)),预估pt=σ(xt.wt),则LogLoss函数是

直接计算可以得到

具体LR+FTRL算法实现如下:

publicvoidflatMap(Tuple7value,Collectorout){//输入value变量打印如下:value={Tuple7@13296}f0={Long@13306}-1f1={Integer@13307}0f2={Integer@13308}2f3={DenseVector@13309}"-0.73834267321375650.00.00.01.5885293675862715E-4-4.834608575902742E-50.00.0-6.754208708318647E-5......"data={double[30001]@13314}f4={Object[2]@13310}f5={Double@13311}-1.0f6={Long@13312}0//生成模型LinearModelDatamodelData=newLinearModelData();......modelData.coefVector=(DenseVector)value.f3;modelData.labelValues=(Object[])value.f4;//把模型数据转换成ListrowsRowCollectorlistCollector=newRowCollector();newLinearModelDataConverter().save(modelData,listCollector);Listrows=listCollector.getRows();for(Rowr:rows){introwSize=r.getArity();for(intj=0;j

//ftrlpredictFtrlPredictStreamOppredictResult=newFtrlPredictStreamOp(initModel).setVectorCol(vecColName).setPredictionCol("pred").setReservedCols(newString[]{labelColName}).setPredictionDetailCol("details").linkFrom(model,featurePipelineModel.transform(splitter.getSideOutput(0)));从上面代码我们可以看到

linkFrom函数完成了业务逻辑,大致功能如下:

即FTRL的预测功能有三个输入:

构造函数中完成了初始化,即获取事先训练好的逻辑回归模型。

publicFtrlPredictStreamOp(BatchOperatormodel){super(newParams());if(model!=null){dataBridge=DirectReader.collect(model);}else{thrownewIllegalArgumentException("Ftrlalgo:initialmodelisnull.Pleasesetavalidinitialmodel.");}}3.2获取在线训练模型CollectModel完成了获取在线训练模型功能。

其逻辑主要是:模型被分成若干块,其中(long)inRow.getField(1)这里记录了具体有多少块。所以flatMap函数会把这些块累积起来,最后组装成模型,统一发送给下游算子。

具体是通过一个HashMap<>buffers来完成临时拼装/最后组装的。

publicstaticclassPredictProcessextendsRichCoFlatMapFunction{privateLinearModelMapperpredictor=null;privateStringmodelSchemaJson;privateStringdataSchemaJson;privateParamsparams;privateintiter=0;privateDataBridgedataBridge;}3.3.1加载预设置模型其构造函数获得了FtrlPredictStreamOp类的dataBridge,即事先训练好的逻辑回归模型。每一个Task都拥有完整的模型。

open函数会加载逻辑回归模型。

publicvoidopen(Configurationparameters)throwsException{this.predictor=newLinearModelMapper(TableUtil.fromSchemaJson(modelSchemaJson),TableUtil.fromSchemaJson(dataSchemaJson),this.params);if(dataBridge!=null){//readinitmodelListmodelRows=DirectReader.directRead(dataBridge);LinearModelDatamodel=newLinearModelDataConverter().load(modelRows);this.predictor.loadModel(model);}}3.3.2在线预测FtrlPredictStreamOp.flatMap1函数完成了在线预测。

publicvoidflatMap1(Rowrow,Collectorcollector)throwsException{collector.collect(this.predictor.map(row));}调用栈如下:

predictWithProb:157,LinearModelMapper(com.alibaba.alink.operator.common.linear)predictResultDetail:114,LinearModelMapper(com.alibaba.alink.operator.common.linear)map:90,RichModelMapper(com.alibaba.alink.common.mapper)flatMap1:174,FtrlPredictStreamOp$PredictProcess(com.alibaba.alink.operator.stream.onlinelearning)flatMap1:143,FtrlPredictStreamOp$PredictProcess(com.alibaba.alink.operator.stream.onlinelearning)processElement1:53,CoStreamFlatMap(org.apache.flink.streaming.api.operators.co)processRecord1:135,StreamTwoInputProcessor(org.apache.flink.streaming.runtime.io)具体是通过LinearModelMapper完成。

publicabstractclassRichModelMapperextendsModelMapper{publicRowmap(Rowrow)throwsException{if(isPredDetail){//我们的示例代码在这里Tuple2t2=predictResultDetail(row);returnthis.outputColsHelper.getResultRow(row,Row.of(t2.f0,t2.f1));}else{returnthis.outputColsHelper.getResultRow(row,Row.of(predictResult(row)));}}}预测代码如下,可以看出来使用了sigmoid。

/***Predictthelabelinformationwiththeprobabilityofeachlabel.*/publicTuple2predictWithProb(Vectorvector){doubledotValue=MatVecOp.dot(vector,model.coefVector);switch(model.linearModelType){caseLR:caseSVM:doubleprob=sigmoid(dotValue);returnnewTuple2<>(dotValue>=0model.labelValues[0]:model.labelValues[1],newDouble[]{prob,1-prob});}}3.3.3在线更新模型FtrlPredictStreamOp.flatMap2函数完成了处理在线训练输出的模型数据流,在线更新模型。

LinearModelData参数是由CollectModel完成加载并且传输出来的。

publicvoidflatMap2(LinearModelDatalinearModel,Collectorcollector)throwsException{this.predictor.loadModel(linearModel);}

THE END
1.在线学习算法(OnlineLearning)理论与实践在线学习算法(Online Learning)理论与实践 本文介绍在线学习(Online Learning)的基本原理,包括Bayesian Online Learning和FTRL算法,并以美团移动端推荐重排序为例,展示在线学习在提高推荐效果方面的实践。通过实时调整模型,以反映线上变化,提升预测准确率。 摘要由CSDN通过智能技术生成https://blog.csdn.net/hzwaxx/article/details/83867630
2.在线学习方法概述腾讯云开发者社区在线学习方法概述 算法常常用到逻辑回归算法,而传统的批量学习算法如 SGD 无法应对大规模、高维的数据集和实时数据流。为了解决这个问题,在线最优化算法如 TG [1]、FOBOS [2]、RDA [3]、FTRL [4,5,6] 应运而生,下面将介绍、对比这些算法。 TODOhttps://cloud.tencent.com/developer/article/1561719
3.重要学习网址收藏1ZhemgLee的技术博客2.数学推导+纯Python实现机器学习算法30:系列总结与感悟 总:https://mp.weixin.qq.com/s/jJd8Gg61eaE0JKZqQSeE8g 分1:https://mp.weixin.qq.com/s/E9lMqNM8uNc57KNvnsBZGQ 分2-n: 3.深度学习第60讲:深度学习笔记系列总结与感悟 总:https://mp.weixin.qq.com/s/qXfu-UzZmlv-aQt3IIUnAQ https://blog.51cto.com/u_15240054/2873341
4.基于深度学习框架的医疗图像分类算法实现开题报告(8页)基于深度学习框架的医疗图像分类算法实现开题报告.docx,PAGE 5 - 本科生毕业设计(论文) 开题报告 题目:基于深度学习框架的医疗图像分类算法实现 姓名: 张震宇 学号: 201306080330 指导教师: 齐勇 班级: 计 133 所在院系: 电气与信息工程学院计算机系 学生https://m.book118.com/html/2020/0517/8003122062002112.shtm
5.《机器学习算法的数学解析与Python实现》(莫凡)简介书评当当网图书频道在线销售正版《机器学习算法的数学解析与Python实现》,作者:莫凡,出版社:机械工业出版社。最新《机器学习算法的数学解析与Python实现》简介、书评、试读、价格、图片等相关信息,尽在DangDang.com,网购《机器学习算法的数学解析与Python实现》,就上当当http://product.dangdang.com/28503697.html
6.79强化学习基础算法及实践策略梯度强化学习方法实现上一个实验中,我们了解并实现了策略梯度算法。同时,使用 OpenAI 提供的 Gym 强化学习环境测试了算法。不过,受限于 Notebook 实验环境,我们无法可视化强化学习的过程,只能打印出学习参数和每个 Episode 学习的时间步长。 本次挑战,我们将利用实验楼提供的桌面在线环境,完成 CartPole 小游戏可视化学习过程。 https://www.jianshu.com/p/71eacadc41bf
7.人工智能基础:五大机器学习经典算法入门课程哔哩哔哩11项必备技能,7种机器学习实现方式,34种深度学习常用算法 26.6万播放/共159课时 【限时特惠】小白玩转Python数据分析 全流程Python数据分析课,配合多个项目实战,点满数据技能 134.3万播放/共106课时 小白也能听懂的人工智能原理 仅用中学数学知识就能看懂的人工智能入门课 480.3万播放/共14课时 机器学习必修课:经典AIhttps://www.bilibili.com/cheese/play/ss10026?search_query=%E8%BF%9B%E5%8C%96%E7%AE%97%E6%B3%95%E5%AD%A6%E4%B9%A0
8.科学网—[转载]强化学习在资源优化领域的应用基于这种行业趋势,本文针对强化学习算法在资源优化领域的应用展开调研,帮助读者了解该领域最新的进展,学习如何利用数据驱动的方式解决资源优化问题。鉴于资源优化问题场景众多、设定繁杂,划分出3类应用广泛的资源优化问题,即资源平衡问题、资源分配问题、装箱问题,集中进行调研。在每个领域阐述问题的特性,并根据具体的问题特性https://blog.sciencenet.cn/blog-3472670-1312677.html
9.2021届计算机科学方向毕业设计(论文)阶段性汇报本次阶段性汇报主要介绍多智能体强化学习框架MALib的设计与完成情况,以及针对目前的研究内容实现的功能。其次介绍在当前框架上部分强化学习算法的复现情况以及遇到的问题。最后介绍后续的计划安排。 杨宁 电阻式随机存取存储器的侧信道安全研究 在本阶段,研究完成了对ReRAM业界现有公开数学仿真模型的收集、测试以及修改;https://zhiyuan.sjtu.edu.cn/html/zhiyuan/announcement_view.php?id=3943
10.力扣(LeetCode)全球极客挚爱的技术成长平台算法作为面试中非常核心的一环,攻克其高效的方法为先熟练掌握数据结构,再系统学习算法。本文会详细介绍面试中经常用到的数据结构数组,字符串,链表,哈希表,栈,队列,堆,优先队列,树,以及图的使用、底层原理以及各个操作的性 927 101 3251 OneQ?4 小时前https://leetcode-cn.com/
11.《深度强化学习算法与实践:基于PyTorch的实现》小说在线阅读张校捷编著创作的计算机网络小说《深度强化学习算法与实践:基于PyTorch的实现》,已更新0章,最新章节:。本书从强化学习的基础知识出发,结合PyTorch深度学习框架,介绍深度强化学习算法各种模型的相关算法原理和基于PyTorch的代码实现。作为一本介绍深度强化学习知识的相关https://m.qidian.com/book/1033738513/
12.Python机器学习10大经典算法的讲解和示例python10个经典的机器学习算法包括:线性回归、逻辑回归、K-最近邻(KNN)、支持向量机(SVM)、决策树、随机森林、朴素贝叶斯、K-均值聚类、主成分分析(PCA)、和梯度提升(Gradient Boosting),我将使用常见的机器学习库,如scikit-learn,numpy和pandas 来实现这些算法https://www.jb51.net/python/322045198.htm
13.超详细算法岗的学习路线大总结!机器学习 or 深度学习基础 论文or 项目介绍 其他问题 & 向面试官提问 本文将从以上四点进行展开。 一、数据结构&算法题 随着算法岗越来越卷,Coding几乎成了面试必考的一项,而且在面评中的权重也越来越高,根据个人面试经验,如果这一环节没有很顺利得完成的话,几乎必挂,尤其对于非科班转行的同学,需要特别重视。 https://leetcode.cn/circle/discuss/SX3aa6/
14.七月在线七月在线创始人,七月大模型与机器人技术总负责人 北理工校外导师,微软AI MVP,Github上2万余star,CSDN 2000万PV博客『结构之法 算法之道』博主,去过近百所985/211高校分享算法,亦是华为云等数十个大会的演讲嘉宾。2015年创办七月在线,并于2018年获得好未来千万投资,到2022年平台上聚集了350+的大厂专家讲师团队https://www.julyedu.com/
15.范慧杰5) 机器人在线学习和场景感知研究 经费来源:国家自然科学基金; 2014-2017 6) 基于稀疏表达和字典选择的一致异常行为算法研究 经费来源:国家自然科学基金; 2012-2014 7) 基于水平集理论PLIF火焰前锋与特征提取算法研究 经费来源:国家自然科学基金; 2009-2011 http://www.sia.cas.cn/vision/kytd/yjry/202307/t20230726_6834850.html
16.推荐几个算法可视化网站,从此轻松学算法!大家好,我是大彬~ 很多初学者在学习数据结构与算法的时候,都会觉得很难,很大一部分是因为数据结构与算法本身比较抽象,不好理解。对于这一点,可以通过一些可视化动画来帮助理解。 下面大彬推荐几个学习数据结构和算法的可视化工具。 Data Structure Visualizations 这是https://m.nowcoder.com/feed/main/detail/6ecdab56f00b44bfacf3cb854929059e