Alink漫谈(十三):在线学习算法FTRL之具体实现罗西的思考

Alink是阿里巴巴基于实时计算引擎Flink研发的新一代机器学习算法平台,是业界首个同时支持批式算法、流式算法的机器学习平台。本文和上文一起介绍了在线学习算法FTRL在Alink中是如何实现的,希望对大家有所帮助。

为了让大家更好理解,我们再次贴出整体流程图:

在线训练主要逻辑是:

前面说到,FTRL先要训练出一个逻辑回归模型作为FTRL算法的初始模型,这是为了系统冷启动的需要。

具体逻辑回归模型设定/训练是:

//traininitialbatchmodelLogisticRegressionTrainBatchOplr=newLogisticRegressionTrainBatchOp().setVectorCol(vecColName).setLabelCol(labelColName).setWithIntercept(true).setMaxIter(10);BatchOperator<>initModel=featurePipelineModel.transform(trainBatchData).link(lr);训练好之后,模型信息是DataSet类型,位于变量BatchOperator<>initModel之中,这是一个批处理算子。

FtrlTrainStreamOp将initModel作为初始化参数。

FtrlTrainStreamOpmodel=newFtrlTrainStreamOp(initModel)在FtrlTrainStreamOp构造函数中会加载这个模型;

dataBridge=DirectReader.collect(initModel);具体加载时通过MemoryDataBridge直接获取初始化模型DataSet中的数据。

publicMemoryDataBridgegenerate(BatchOperatorbatchOperator,ParamsglobalParams){returnnewMemoryDataBridge(batchOperator.collect());}2.2分割高维向量从前文可知,Alink的FTRL算法设置的特征向量维度是30000。所以算法第一步就是切分高维度向量,以便分布式计算。

StringvecColName="vec";intnumHashFeatures=30000;首先要获取切分信息,代码如下,就是将特征数目featureSize除以并行度parallelism,然后得到了每个task对应系数的初始位置。

privatestaticint[]getSplitInfo(intfeatureSize,booleanhasInterceptItem,intparallelism){intcoefSize=(hasInterceptItem)featureSize+1:featureSize;intsubSize=coefSize/parallelism;int[]poses=newint[parallelism+1];intoffset=coefSize%parallelism;for(inti=0;i

//Tuple5DataStream>input=initData.flatMap(newSplitVector(splitInfo,hasInterceptItem,vectorSize,vectorTrainIdx,featureIdx,labelIdx)).partitionCustom(newCustomBlockPartitioner(),1);具体切分在SplitVector.flatMap函数完成,结果就是把一个高维度向量分割给各个CalcTask。

代码摘要如下:

publicvoidflatMap(Rowrow,Collector>collector)throwsException{ longsampleId=counter;counter+=parallelism;Vectorvec;if(vectorTrainIdx==-1){.....}else{//输入row的第vectorTrainIdx个field就是那个30000大小的系数向量vec=VectorUtil.getVector(row.getField(vectorTrainIdx));}if(vecinstanceofSparseVector){MaptmpVec=newHashMap<>();for(inti=0;i

此处理论上有以下几个重点:

伪代码思路大致如下

doublep=learner.predict(x);//预测learner.updateModel(x,p,y);//更新模型doubleloss=LogLossEvalutor.calLogLoss(p,y);//计算损失evalutor.addLogLoss(loss);//更新损失totalLoss+=loss;trainedNum+=1;具体实施上Alink有自己的特点和调整。

机器学习都需要迭代训练,Alink这里利用了FlinkStream的迭代功能。

IterativeStream的实例是通过DataStream的iterate方法创建的˙。iterate方法存在两个重载形式:

Alink选择了第二种。

在创建ConnectedIterativeStreams时候,用迭代流的初始输入作为第一个输入流,用反馈流作为第二个输入。

每一种数据流(DataStream)都会有与之对应的流转换(StreamTransformation)。IterativeStream对应的转换是FeedbackTransformation。

迭代流(IterativeStream)对应的转换是反馈转换(FeedbackTransformation),它表示拓扑中的一个反馈点(也即迭代头)。一个反馈点包含一个输入边以及若干个反馈边,且Flink要求每个反馈边的并行度必须跟输入边的并行度一致,这一点在往该转换中加入反馈边时会进行校验。

当IterativeStream对象被构造时,FeedbackTransformation的实例会被创建并传递给DataStream的构造方法。

迭代的关闭是通过调用IterativeStream的实例方法closeWith来实现的。这个函数指定了某个流将成为迭代程序的结束,并且这个流将作为输入的第二部分(secondinput)被反馈回迭代。

对于Alink来说,迭代构建代码是:

//traindataformat=//feedbackformat=Tuple7IterativeStream.ConnectedIterativeStreams,Tuple7>iteration=input.iterate(Long.MAX_VALUE).withFeedbackType(TypeInformation.of(newTypeHint>(){}));//即iteration是一个IterativeStream.ConnectedIterativeStreams<...>2.3.2.1迭代的输入从代码和注释可以看出,迭代的两种输入是:

反馈流的设置是通过调用IterativeStream的实例方法closeWith来实现的。Alink这里是

反馈流的格式是:

迭代体由两部分构成:CalcTask/ReduceTask。

CalcTask每一个实例都拥有初始化模型dataBridge。

DataStreamiterativeBody=iteration.flatMap(newCalcTask(dataBridge,splitInfo,getParams()))2.3.3.1迭代初始化迭代是由CalcTask.open函数开始,主要做如下几件事

CalcTask.flatMap1主要实现的是FTRL算法中的predict部分(注意,不是FTRL预测)。

解释:pt=σ(Xtw)是LR的预测函数,求出pt的唯一目的是为了求出目标函数(在LR中采用交叉熵损失函数作为目标函数)对参数w的一阶导数g,gi=(ptyt)xi。此步骤同样适用于FTRL优化其他目标函数,唯一的不同就是求次梯度g(次梯度是左导和右导之间的集合,函数可导--左导等于右导时,次梯度就等于一阶梯度)的方法不同。

函数的输入是"训练输入数据",即SplitVector.flatMap的输出---->CalcCalcTask的输入。输入数据是一个五元组,其格式为traindataformat=

有三点需要注意:

大家会说,不对!predict函数应该是sigmoid=1.0/(1.0+np.exp(-w.dot(x)))。是的,这里还没有做sigmoid操作。当ReduceTask做了聚合之后,会把聚合好的p反馈回迭代体,然后在CalcTask.flatMap2中才会做sigmoid操作。

publicvoidflatMap1(Tuple5value,Collector>out)throwsException{if(!savedFristModel){//第一次进入需要存模型out.collect(Tuple7.of(-1L,0,getRuntimeContext().getIndexOfThisSubtask(),newDenseVector(coef),labelValues,-1.0,modelId++));savedFristModel=true;}LongtimeStamps=System.currentTimeMillis();doublewx=0.0;LongsampleId=value.f0;Vectorvec=value.f3;if(vecinstanceofSparseVector){int[]indices=((SparseVector)vec).getIndices();//这里就是具体的Predictfor(inti=0;i

publicstaticclassReduceTaskextendsRichFlatMapFunction,Tuple7>{privateintparallelism;privateint[]poses;privateMap>buffer;privateMap>>models=newHashMap<>();}flatMap函数大致完成如下功能,即两种归并:

当具体用作输出模型使用时,其变量如下:

models={HashMap@13258}size=1{Long@13456}1->{ArrayList@13678}size=1key={Long@13456}1value={ArrayList@13678}size=10={Tuple2@13698}"(1,0.0-8.244533295515879E-50.0-1.103997743166529E-40.0-3.336931546279811E-5....."2.3.3.4判断是否反馈这个filterresult是用来判断是否反馈的。这里t3.f0是sampleId,t3.f2是subNum。

DataStream>result=iterativeBody.filter(newFilterFunction>(){@Overridepublicbooleanfilter(Tuple7t3)throwsException{//ift3.f0>0&&t3.f2>0thenfeedbackreturn(t3.f0>0&&t3.f2>0);}});对于t3.f0,有两处代码会设置为负值。

对于t3.f2,如果subNum大于零,说明在高维向量切分时候,是得到了有意义的数值。

这里是filteroutput。

DataStreamoutput=iterativeBody.filter(newFilterFunction>(){@Overridepublicbooleanfilter(Tuple7value){/*ifvalue.f0smallthan0,thenoutput*/returnvalue.f0<0;}}).flatMap(newWriteModel(labelType,getVectorCol(),featureCols,hasInterceptItem));2.3.3.6处理反馈数据/更新参数CalcTask.flatMap2实际完成的是FTRL算法的其余部分,即更新参数部分。主要逻辑如下:

在LogisticRegression中,sigmoid函数是σ(a)=1/(1+exp(-a)),预估pt=σ(xt.wt),则LogLoss函数是

直接计算可以得到

具体LR+FTRL算法实现如下:

publicvoidflatMap(Tuple7value,Collectorout){//输入value变量打印如下:value={Tuple7@13296}f0={Long@13306}-1f1={Integer@13307}0f2={Integer@13308}2f3={DenseVector@13309}"-0.73834267321375650.00.00.01.5885293675862715E-4-4.834608575902742E-50.00.0-6.754208708318647E-5......"data={double[30001]@13314}f4={Object[2]@13310}f5={Double@13311}-1.0f6={Long@13312}0//生成模型LinearModelDatamodelData=newLinearModelData();......modelData.coefVector=(DenseVector)value.f3;modelData.labelValues=(Object[])value.f4;//把模型数据转换成ListrowsRowCollectorlistCollector=newRowCollector();newLinearModelDataConverter().save(modelData,listCollector);Listrows=listCollector.getRows();for(Rowr:rows){introwSize=r.getArity();for(intj=0;j

//ftrlpredictFtrlPredictStreamOppredictResult=newFtrlPredictStreamOp(initModel).setVectorCol(vecColName).setPredictionCol("pred").setReservedCols(newString[]{labelColName}).setPredictionDetailCol("details").linkFrom(model,featurePipelineModel.transform(splitter.getSideOutput(0)));从上面代码我们可以看到

linkFrom函数完成了业务逻辑,大致功能如下:

即FTRL的预测功能有三个输入:

构造函数中完成了初始化,即获取事先训练好的逻辑回归模型。

publicFtrlPredictStreamOp(BatchOperatormodel){super(newParams());if(model!=null){dataBridge=DirectReader.collect(model);}else{thrownewIllegalArgumentException("Ftrlalgo:initialmodelisnull.Pleasesetavalidinitialmodel.");}}3.2获取在线训练模型CollectModel完成了获取在线训练模型功能。

其逻辑主要是:模型被分成若干块,其中(long)inRow.getField(1)这里记录了具体有多少块。所以flatMap函数会把这些块累积起来,最后组装成模型,统一发送给下游算子。

具体是通过一个HashMap<>buffers来完成临时拼装/最后组装的。

publicstaticclassPredictProcessextendsRichCoFlatMapFunction{privateLinearModelMapperpredictor=null;privateStringmodelSchemaJson;privateStringdataSchemaJson;privateParamsparams;privateintiter=0;privateDataBridgedataBridge;}3.3.1加载预设置模型其构造函数获得了FtrlPredictStreamOp类的dataBridge,即事先训练好的逻辑回归模型。每一个Task都拥有完整的模型。

open函数会加载逻辑回归模型。

publicvoidopen(Configurationparameters)throwsException{this.predictor=newLinearModelMapper(TableUtil.fromSchemaJson(modelSchemaJson),TableUtil.fromSchemaJson(dataSchemaJson),this.params);if(dataBridge!=null){//readinitmodelListmodelRows=DirectReader.directRead(dataBridge);LinearModelDatamodel=newLinearModelDataConverter().load(modelRows);this.predictor.loadModel(model);}}3.3.2在线预测FtrlPredictStreamOp.flatMap1函数完成了在线预测。

publicvoidflatMap1(Rowrow,Collectorcollector)throwsException{collector.collect(this.predictor.map(row));}调用栈如下:

predictWithProb:157,LinearModelMapper(com.alibaba.alink.operator.common.linear)predictResultDetail:114,LinearModelMapper(com.alibaba.alink.operator.common.linear)map:90,RichModelMapper(com.alibaba.alink.common.mapper)flatMap1:174,FtrlPredictStreamOp$PredictProcess(com.alibaba.alink.operator.stream.onlinelearning)flatMap1:143,FtrlPredictStreamOp$PredictProcess(com.alibaba.alink.operator.stream.onlinelearning)processElement1:53,CoStreamFlatMap(org.apache.flink.streaming.api.operators.co)processRecord1:135,StreamTwoInputProcessor(org.apache.flink.streaming.runtime.io)具体是通过LinearModelMapper完成。

publicabstractclassRichModelMapperextendsModelMapper{publicRowmap(Rowrow)throwsException{if(isPredDetail){//我们的示例代码在这里Tuple2t2=predictResultDetail(row);returnthis.outputColsHelper.getResultRow(row,Row.of(t2.f0,t2.f1));}else{returnthis.outputColsHelper.getResultRow(row,Row.of(predictResult(row)));}}}预测代码如下,可以看出来使用了sigmoid。

/***Predictthelabelinformationwiththeprobabilityofeachlabel.*/publicTuple2predictWithProb(Vectorvector){doubledotValue=MatVecOp.dot(vector,model.coefVector);switch(model.linearModelType){caseLR:caseSVM:doubleprob=sigmoid(dotValue);returnnewTuple2<>(dotValue>=0model.labelValues[0]:model.labelValues[1],newDouble[]{prob,1-prob});}}3.3.3在线更新模型FtrlPredictStreamOp.flatMap2函数完成了处理在线训练输出的模型数据流,在线更新模型。

LinearModelData参数是由CollectModel完成加载并且传输出来的。

publicvoidflatMap2(LinearModelDatalinearModel,Collectorcollector)throwsException{this.predictor.loadModel(linearModel);}

THE END
1.onlinelearning是什么意思onlinelearning的翻译音标Online Learning 释义 线上学习 实用场景例句 全部 There has been a very positive response toonline learning. 对网络课程的反映十分热烈. 互联网 Online learningis more flexible and affordable than and - mortar model of higher education. 网络教导比传统的高级教导模式更机动,更累赘得起. https://m.iciba.com/online%20learning
2.产品学习领英学习领英学习是一套互动性极强的技能培养解决方案,且可内置于学员日常工作的工具当中。领英学习提供与时俱进的高品质内容,并可根据学员的职业目标、组织的发展方向对内容进行个性化定制。 学习中心 从资源库中获取提示、教程、视频和专家主持的网络研讨会等资源,帮助您的组织充分利用领英学习。 浏览学习中心 帮助https://learning.linkedin.com/zh-cn/customer-success-center/product-learning
3.基于python的英语在线学习考试网站基于python的英语在线学习考试网站 文章目录 设计思路 开发与测试: 核心代码部分展示 文章目录/协作提纲 源码/演示视频获取方式 项目介绍 对于之前英语知识应用网站的管理,大部分都是使用传统的人工方式去管理,这样导致了管理效率低下、出错频率高。而且,时间一长的话,积累下来的数据信息不容易保存,对于查询、更新还有https://blog.csdn.net/QQ402205496/article/details/144333450
4.在线教育平台的最新动态:新课程上线,互动学习模式提升学生参与度在线教育平台的最新动态:新课程上线,互动学习模式提升学生参与度,助力个性化学习体验 最近,一则关于在线教育平台的消息引起了广泛关注。某知名在线教育机构http://yn.zhzsf.vip/zhyxd/26885.html
5.免费在线学习平台:提升技能与知识的最佳选择,随时随地畅享教育最新消息:某知名在线学习平台近日宣布,将推出一系列新的免费课程,涵盖编程、设计、商业管理等多个领域,以满足日益增长的学习需求。 免费在线学习平台的崛起 随着科技的发展和互联网的普及,免费在线学习平台如雨后春笋般涌现。这些平台为用户提供了丰富多样的教育资源,使得人们可以在任何时间、任何地点进行自我提升。根据《http://wiggle.miiqu.com/miqgl/24539.html
6.在线学习网站的资源丰富,课程种类多样,非常适合自学者和在职人员如何选择适合自己的在线课程? 用户可以根据自身目标、时间安排及预算进行筛选,同时参考其他学员评价以做出更明智决策。 在线教育如何保障内容质量? 平台可通过引入认证体系及专家审核机制来提高内容质量,并鼓励用户反馈以持续改进。 参考文献: Smith, J. (2022). The Impact of Online Learning on Professional Developmenhttp://hx.xfcxx.com/gl91/2366.html
7.在线英语课堂的革新之路,革命性变革与未来展望五金件在线英语课堂的革命性变革带来了前所未有的机遇和挑战。随着技术的不断进步,学生们可以随时随地学习英语,享受个性化的学习体验。在线英语课堂将继续发展,更加注重学生自主学习和互动体验的结合,同时利用人工智能和大数据技术,实现更精准的教学和评估。展望未来,在线英语教育市场潜力巨大,有望继续保持迅猛的发展势头。 http://m.zj-xy.net/post/18466.html
8.在线学习微百科在线学习概述 线上学习(英语:Online Learning)是一种透过互联网工具来学习或训练的方式。 在线学习类型 在国内在线学习发展趋势是职业考试和个人能力提高,将一种方便、效率、低成本的学习模式。 分为两类: 职业考试,个人技能提高。https://ispeak.vibaike.com/8854
9.概念距离教育的优势在于它可以突破时空的限制;提供更多的学习机 会;扩大教学规模;提高教学质量;降低教学的成本。基于远程 教育的特点和优势,许多有识之士已经认识到发展远程教育的重 要意义和广阔前景。 4.在线学习(onlinelearning) 所谓E-Learning,即为在线学习。是指在由通讯技术、微 https://www.360doc.cn/document/40594724_641723500.html
10.onlinelearningvue:在线学习平台online-learning-vue 本周六 · 上海源创会,一起聊聊AI Agent 构建与应用 扫描微信二维码支付 取消 支付完成 Watch 不关注关注所有动态仅关注版本发行动态关注但不提醒动态 5Star1Fork0 KWKENNY/online-learning-vue 代码Issues0Pull Requests0Wiki统计流水线https://gitee.com/kwkenny/OnlineLearningVue/
11.机器学习PAI全新功效——实时新闻热点OnlineLearning实践针对这种场景,PAI平台开创性的提出来Online-Learning的解决方案,通过流式算法和离线算法的结合,既能够发挥离线训练对大规模数据的强大处理能力,又能够发挥流式机器学习算法对实时模型的更新能力,做到流批同跑,完美解决模型时效性的问题。今天就以实时热点新闻挖掘案例为例,为大家介绍PAI OnlineLearning的解决方案。 https://maimai.cn/article/detail?fid=1092991292&efid=q4lYsgkD4uccYLTNjKAn9A
12.thisisespeciallytruewhenitcomestoonlinelearning80.How do you like online learning?(请自拟一句话作答) Online learning is a different and good way to learn,which also need good learning habits.. 试题答案 在线课程 分析文章讲述了在线学习的优点和需要有一个好习惯的重要性,并且对于在线学习提了四点建议. http://www.1010jiajiao.com/czyy/shiti_id_d9a0d20d5001e7776524a893d61eeef7
13.Coursera一流教育者提供在线课程和证书。免费加入开始学习免费课程 探索来自全球一流大学和公司的免费在线课程。 状态:免费 免费 DeepLearning.AI 拥抱脸庞的 Open Source 模型 显示另外 8 个 查看所有 学位课程 先人一步,现在就开始攻读学位 通过这些项目,您可以培养宝贵的技能,获得职业证书,甚至是在注册学位之前在学位方面取得进展。 https://www.coursera.org/
14.KhanAcademyFreeOnlineCourses,Lessons&PracticeFor every student, every classroom. Realresults. We’re a nonprofit with the mission to provide a free, world-class education for anyone, anywhere. LearnersTeachersParents Math: high school & college Why Khan Academy works Personalized learning https://www.khanacademy.org/
15.onlinelearningplatform橙光线基本的体育产业,赛事组织在线培训课程。 扎实的基础是您成功的关键,方便高效的在线学习平台为您提供服务。 [vc_cta h2=”体育产业经纪人课程”]了解体育产业,搞懂基本原理,既是备考好帮手,又是入门的必备知识库。资深体育专家解读,与时俱进的理解和分析。相信会帮到您。了解更多,报名上课[/vc_cta][vc_cta h2=https://www.cgxmanagement.com/?p=3814
16.虚幻引擎学习资源汇总虚幻官方在线学习:成系列视频课程 https://www.unrealengine.com/zh-CN/onlinelearning-courses 虚幻引擎官方文档:功能模块文档 https://docs.unrealengine.com/zh-CN/index.html 虚幻引擎官方论坛:通用交流讨论 https://forums.unrealengine.com/ 虚幻官方问答中心:专注技术问答中心 https://www.d-arts.cn/article/article_info/key/MTE5NjgyNDY2MDGD34Vlr6mgcw.html
17.OnlineLearningConsortium(OLC)The go-to digital learning source for thousands of educators. See how advanced quality teaching techniques can reach and engage the modern learner!https://onlinelearningconsortium.org/
18.HomePagePyramidOnlineLearningWelcome to Pyramid’s Learning Library Expand your professional development, earn CEUs, and get PECS? Implementer Certified on the Pyramid Online Learning Portal. VIEW SERVICES OUR SERVICES PECS? CERTIFICATION Validate your PECS knowledge and skills under the supervision of a Pyramid Consultant withhttps://www.pyramidonlinelearning.com/