Alink漫谈(十三):在线学习算法FTRL之具体实现罗西的思考

Alink是阿里巴巴基于实时计算引擎Flink研发的新一代机器学习算法平台,是业界首个同时支持批式算法、流式算法的机器学习平台。本文和上文一起介绍了在线学习算法FTRL在Alink中是如何实现的,希望对大家有所帮助。

为了让大家更好理解,我们再次贴出整体流程图:

在线训练主要逻辑是:

前面说到,FTRL先要训练出一个逻辑回归模型作为FTRL算法的初始模型,这是为了系统冷启动的需要。

具体逻辑回归模型设定/训练是:

//traininitialbatchmodelLogisticRegressionTrainBatchOplr=newLogisticRegressionTrainBatchOp().setVectorCol(vecColName).setLabelCol(labelColName).setWithIntercept(true).setMaxIter(10);BatchOperator<>initModel=featurePipelineModel.transform(trainBatchData).link(lr);训练好之后,模型信息是DataSet类型,位于变量BatchOperator<>initModel之中,这是一个批处理算子。

FtrlTrainStreamOp将initModel作为初始化参数。

FtrlTrainStreamOpmodel=newFtrlTrainStreamOp(initModel)在FtrlTrainStreamOp构造函数中会加载这个模型;

dataBridge=DirectReader.collect(initModel);具体加载时通过MemoryDataBridge直接获取初始化模型DataSet中的数据。

publicMemoryDataBridgegenerate(BatchOperatorbatchOperator,ParamsglobalParams){returnnewMemoryDataBridge(batchOperator.collect());}2.2分割高维向量从前文可知,Alink的FTRL算法设置的特征向量维度是30000。所以算法第一步就是切分高维度向量,以便分布式计算。

StringvecColName="vec";intnumHashFeatures=30000;首先要获取切分信息,代码如下,就是将特征数目featureSize除以并行度parallelism,然后得到了每个task对应系数的初始位置。

privatestaticint[]getSplitInfo(intfeatureSize,booleanhasInterceptItem,intparallelism){intcoefSize=(hasInterceptItem)featureSize+1:featureSize;intsubSize=coefSize/parallelism;int[]poses=newint[parallelism+1];intoffset=coefSize%parallelism;for(inti=0;i

//Tuple5DataStream>input=initData.flatMap(newSplitVector(splitInfo,hasInterceptItem,vectorSize,vectorTrainIdx,featureIdx,labelIdx)).partitionCustom(newCustomBlockPartitioner(),1);具体切分在SplitVector.flatMap函数完成,结果就是把一个高维度向量分割给各个CalcTask。

代码摘要如下:

publicvoidflatMap(Rowrow,Collector>collector)throwsException{ longsampleId=counter;counter+=parallelism;Vectorvec;if(vectorTrainIdx==-1){.....}else{//输入row的第vectorTrainIdx个field就是那个30000大小的系数向量vec=VectorUtil.getVector(row.getField(vectorTrainIdx));}if(vecinstanceofSparseVector){MaptmpVec=newHashMap<>();for(inti=0;i

此处理论上有以下几个重点:

伪代码思路大致如下

doublep=learner.predict(x);//预测learner.updateModel(x,p,y);//更新模型doubleloss=LogLossEvalutor.calLogLoss(p,y);//计算损失evalutor.addLogLoss(loss);//更新损失totalLoss+=loss;trainedNum+=1;具体实施上Alink有自己的特点和调整。

机器学习都需要迭代训练,Alink这里利用了FlinkStream的迭代功能。

IterativeStream的实例是通过DataStream的iterate方法创建的˙。iterate方法存在两个重载形式:

Alink选择了第二种。

在创建ConnectedIterativeStreams时候,用迭代流的初始输入作为第一个输入流,用反馈流作为第二个输入。

每一种数据流(DataStream)都会有与之对应的流转换(StreamTransformation)。IterativeStream对应的转换是FeedbackTransformation。

迭代流(IterativeStream)对应的转换是反馈转换(FeedbackTransformation),它表示拓扑中的一个反馈点(也即迭代头)。一个反馈点包含一个输入边以及若干个反馈边,且Flink要求每个反馈边的并行度必须跟输入边的并行度一致,这一点在往该转换中加入反馈边时会进行校验。

当IterativeStream对象被构造时,FeedbackTransformation的实例会被创建并传递给DataStream的构造方法。

迭代的关闭是通过调用IterativeStream的实例方法closeWith来实现的。这个函数指定了某个流将成为迭代程序的结束,并且这个流将作为输入的第二部分(secondinput)被反馈回迭代。

对于Alink来说,迭代构建代码是:

//traindataformat=//feedbackformat=Tuple7IterativeStream.ConnectedIterativeStreams,Tuple7>iteration=input.iterate(Long.MAX_VALUE).withFeedbackType(TypeInformation.of(newTypeHint>(){}));//即iteration是一个IterativeStream.ConnectedIterativeStreams<...>2.3.2.1迭代的输入从代码和注释可以看出,迭代的两种输入是:

反馈流的设置是通过调用IterativeStream的实例方法closeWith来实现的。Alink这里是

反馈流的格式是:

迭代体由两部分构成:CalcTask/ReduceTask。

CalcTask每一个实例都拥有初始化模型dataBridge。

DataStreamiterativeBody=iteration.flatMap(newCalcTask(dataBridge,splitInfo,getParams()))2.3.3.1迭代初始化迭代是由CalcTask.open函数开始,主要做如下几件事

CalcTask.flatMap1主要实现的是FTRL算法中的predict部分(注意,不是FTRL预测)。

解释:pt=σ(Xtw)是LR的预测函数,求出pt的唯一目的是为了求出目标函数(在LR中采用交叉熵损失函数作为目标函数)对参数w的一阶导数g,gi=(ptyt)xi。此步骤同样适用于FTRL优化其他目标函数,唯一的不同就是求次梯度g(次梯度是左导和右导之间的集合,函数可导--左导等于右导时,次梯度就等于一阶梯度)的方法不同。

函数的输入是"训练输入数据",即SplitVector.flatMap的输出---->CalcCalcTask的输入。输入数据是一个五元组,其格式为traindataformat=

有三点需要注意:

大家会说,不对!predict函数应该是sigmoid=1.0/(1.0+np.exp(-w.dot(x)))。是的,这里还没有做sigmoid操作。当ReduceTask做了聚合之后,会把聚合好的p反馈回迭代体,然后在CalcTask.flatMap2中才会做sigmoid操作。

publicvoidflatMap1(Tuple5value,Collector>out)throwsException{if(!savedFristModel){//第一次进入需要存模型out.collect(Tuple7.of(-1L,0,getRuntimeContext().getIndexOfThisSubtask(),newDenseVector(coef),labelValues,-1.0,modelId++));savedFristModel=true;}LongtimeStamps=System.currentTimeMillis();doublewx=0.0;LongsampleId=value.f0;Vectorvec=value.f3;if(vecinstanceofSparseVector){int[]indices=((SparseVector)vec).getIndices();//这里就是具体的Predictfor(inti=0;i

publicstaticclassReduceTaskextendsRichFlatMapFunction,Tuple7>{privateintparallelism;privateint[]poses;privateMap>buffer;privateMap>>models=newHashMap<>();}flatMap函数大致完成如下功能,即两种归并:

当具体用作输出模型使用时,其变量如下:

models={HashMap@13258}size=1{Long@13456}1->{ArrayList@13678}size=1key={Long@13456}1value={ArrayList@13678}size=10={Tuple2@13698}"(1,0.0-8.244533295515879E-50.0-1.103997743166529E-40.0-3.336931546279811E-5....."2.3.3.4判断是否反馈这个filterresult是用来判断是否反馈的。这里t3.f0是sampleId,t3.f2是subNum。

DataStream>result=iterativeBody.filter(newFilterFunction>(){@Overridepublicbooleanfilter(Tuple7t3)throwsException{//ift3.f0>0&&t3.f2>0thenfeedbackreturn(t3.f0>0&&t3.f2>0);}});对于t3.f0,有两处代码会设置为负值。

对于t3.f2,如果subNum大于零,说明在高维向量切分时候,是得到了有意义的数值。

这里是filteroutput。

DataStreamoutput=iterativeBody.filter(newFilterFunction>(){@Overridepublicbooleanfilter(Tuple7value){/*ifvalue.f0smallthan0,thenoutput*/returnvalue.f0<0;}}).flatMap(newWriteModel(labelType,getVectorCol(),featureCols,hasInterceptItem));2.3.3.6处理反馈数据/更新参数CalcTask.flatMap2实际完成的是FTRL算法的其余部分,即更新参数部分。主要逻辑如下:

在LogisticRegression中,sigmoid函数是σ(a)=1/(1+exp(-a)),预估pt=σ(xt.wt),则LogLoss函数是

直接计算可以得到

具体LR+FTRL算法实现如下:

publicvoidflatMap(Tuple7value,Collectorout){//输入value变量打印如下:value={Tuple7@13296}f0={Long@13306}-1f1={Integer@13307}0f2={Integer@13308}2f3={DenseVector@13309}"-0.73834267321375650.00.00.01.5885293675862715E-4-4.834608575902742E-50.00.0-6.754208708318647E-5......"data={double[30001]@13314}f4={Object[2]@13310}f5={Double@13311}-1.0f6={Long@13312}0//生成模型LinearModelDatamodelData=newLinearModelData();......modelData.coefVector=(DenseVector)value.f3;modelData.labelValues=(Object[])value.f4;//把模型数据转换成ListrowsRowCollectorlistCollector=newRowCollector();newLinearModelDataConverter().save(modelData,listCollector);Listrows=listCollector.getRows();for(Rowr:rows){introwSize=r.getArity();for(intj=0;j

//ftrlpredictFtrlPredictStreamOppredictResult=newFtrlPredictStreamOp(initModel).setVectorCol(vecColName).setPredictionCol("pred").setReservedCols(newString[]{labelColName}).setPredictionDetailCol("details").linkFrom(model,featurePipelineModel.transform(splitter.getSideOutput(0)));从上面代码我们可以看到

linkFrom函数完成了业务逻辑,大致功能如下:

即FTRL的预测功能有三个输入:

构造函数中完成了初始化,即获取事先训练好的逻辑回归模型。

publicFtrlPredictStreamOp(BatchOperatormodel){super(newParams());if(model!=null){dataBridge=DirectReader.collect(model);}else{thrownewIllegalArgumentException("Ftrlalgo:initialmodelisnull.Pleasesetavalidinitialmodel.");}}3.2获取在线训练模型CollectModel完成了获取在线训练模型功能。

其逻辑主要是:模型被分成若干块,其中(long)inRow.getField(1)这里记录了具体有多少块。所以flatMap函数会把这些块累积起来,最后组装成模型,统一发送给下游算子。

具体是通过一个HashMap<>buffers来完成临时拼装/最后组装的。

publicstaticclassPredictProcessextendsRichCoFlatMapFunction{privateLinearModelMapperpredictor=null;privateStringmodelSchemaJson;privateStringdataSchemaJson;privateParamsparams;privateintiter=0;privateDataBridgedataBridge;}3.3.1加载预设置模型其构造函数获得了FtrlPredictStreamOp类的dataBridge,即事先训练好的逻辑回归模型。每一个Task都拥有完整的模型。

open函数会加载逻辑回归模型。

publicvoidopen(Configurationparameters)throwsException{this.predictor=newLinearModelMapper(TableUtil.fromSchemaJson(modelSchemaJson),TableUtil.fromSchemaJson(dataSchemaJson),this.params);if(dataBridge!=null){//readinitmodelListmodelRows=DirectReader.directRead(dataBridge);LinearModelDatamodel=newLinearModelDataConverter().load(modelRows);this.predictor.loadModel(model);}}3.3.2在线预测FtrlPredictStreamOp.flatMap1函数完成了在线预测。

publicvoidflatMap1(Rowrow,Collectorcollector)throwsException{collector.collect(this.predictor.map(row));}调用栈如下:

predictWithProb:157,LinearModelMapper(com.alibaba.alink.operator.common.linear)predictResultDetail:114,LinearModelMapper(com.alibaba.alink.operator.common.linear)map:90,RichModelMapper(com.alibaba.alink.common.mapper)flatMap1:174,FtrlPredictStreamOp$PredictProcess(com.alibaba.alink.operator.stream.onlinelearning)flatMap1:143,FtrlPredictStreamOp$PredictProcess(com.alibaba.alink.operator.stream.onlinelearning)processElement1:53,CoStreamFlatMap(org.apache.flink.streaming.api.operators.co)processRecord1:135,StreamTwoInputProcessor(org.apache.flink.streaming.runtime.io)具体是通过LinearModelMapper完成。

publicabstractclassRichModelMapperextendsModelMapper{publicRowmap(Rowrow)throwsException{if(isPredDetail){//我们的示例代码在这里Tuple2t2=predictResultDetail(row);returnthis.outputColsHelper.getResultRow(row,Row.of(t2.f0,t2.f1));}else{returnthis.outputColsHelper.getResultRow(row,Row.of(predictResult(row)));}}}预测代码如下,可以看出来使用了sigmoid。

/***Predictthelabelinformationwiththeprobabilityofeachlabel.*/publicTuple2predictWithProb(Vectorvector){doubledotValue=MatVecOp.dot(vector,model.coefVector);switch(model.linearModelType){caseLR:caseSVM:doubleprob=sigmoid(dotValue);returnnewTuple2<>(dotValue>=0model.labelValues[0]:model.labelValues[1],newDouble[]{prob,1-prob});}}3.3.3在线更新模型FtrlPredictStreamOp.flatMap2函数完成了处理在线训练输出的模型数据流,在线更新模型。

LinearModelData参数是由CollectModel完成加载并且传输出来的。

publicvoidflatMap2(LinearModelDatalinearModel,Collectorcollector)throwsException{this.predictor.loadModel(linearModel);}

THE END
1.从零基础到精通:一步步保姆级大模型训练教程,手把手教学,学不会你来2.3 模型的评测方法 CSDN独家福利 1.预训练阶段(Pretraining Stage) 工欲善其事,必先利其器。 当前,不少工作选择在一个较强的基座模型上进行微调,且通常效果不错(如:[alpaca]、[vicuna] 等)。 这种成功的前提在于:预训练模型和下游任务的差距不大,预训练模型中通常已经包含微调任务中所需要的知识。 https://blog.csdn.net/2401_85325726/article/details/144342271
2.训练人工智能基础模型的方法腾讯云开发者社区训练人工智能基础模型的方法通常包括以下步骤: 1. 数据收集:收集大量高质量的、标注的数据用于训练和测试。这些数据可以来自公开的数据集或自己收集的数据。 2. 数据预处理:将数据集进行预处理,例如https://cloud.tencent.com/developer/information/%E8%AE%AD%E7%BB%83%E4%BA%BA%E5%B7%A5%E6%99%BA%E8%83%BD%E5%9F%BA%E7%A1%80%E6%A8%A1%E5%9E%8B%E7%9A%84%E6%96%B9%E6%B3%95
3.ai大模型训练方法有哪些?使用多GPU或多节点来加速模型训练。 自动化机器学习(AutoML) 自动化模型选择、超参数调优等过程。 持续学习和在线学习 使模型能够持续学习新数据,而不需要从头开始训练。 模型评估 使用交叉验证、混淆矩阵等方法评估模型性能。 模型部署 将训练好的模型部署到生产环境中。 https://www.elecfans.com/d/4031955.html
4.随时间在线训练脉冲神经网络模型的图像数据分类方法与流程10.为了克服上述现有技术的不足,本发明提供一种基于随时间在线训练的脉冲神经网络模型进行图像视觉数据分类的方法,方法取名为ottt(online training through time)。通过本发明提供的方法,可以在训练snn模型时极大地减小训练内存的开销,将训练得到的模型用于计算机图像数据和神经形态图像视觉数据的分类与识别等视觉任务,能够https://www.xjishu.com/zhuanli/55/202210694741.html
5.大语言模型训练数据常见的4种处理方法不在线第一只蜗牛因此,如何从收集到的数据中删除低质量数据成为大语言模型训练中的重要步骤。大语言模型训练中所使用的低质量数据过滤方法可以大致分为两类:基于分类器的方法和基于启发式的方法。基于分类器的方法目标是训练文本质量判断模型,并利用该模型识别并过滤低质量数据。https://xie.infoq.cn/article/6edbb0252aecc0fce50c43abb
6.AI:ModelScope(一站式开源的模型即服务共享平台)的简介安装AI:ModelScope(一站式开源的模型即服务共享平台)的简介、安装、使用方法之详细攻略,ModelScope旨在打造下一代开源的模型即服务共享平台,汇集了行业领先的预训练模型,减少了开发者的重复研发成本。个人认为,相比于AIhttps://blog.51cto.com/yunyaniu/5935335
7.图解机器学习模型评估方法与准则使用历史数据训练一个适合解决目标任务的一个或多个机器学习模型。 对模型进行验证(Validation)与离线评估(Offline Evaluation)。 通过评估指标选择一个较好的模型。 2)在线实验方法 除了离线评估之外,其实还有一种在线评估的实验方法。由于模型是在老的模型产生的数据上学习和验证的,而线上的数据与之前是不同的,因此https://www.jianshu.com/p/70a6f39d91bf
8.《自然语言处理:基于预训练模型的方法》(车万翔)简介书评当当网图书频道在线销售正版《自然语言处理:基于预训练模型的方法》,作者:车万翔,出版社:电子工业出版社。最新《自然语言处理:基于预训练模型的方法》简介、书评、试读、价格、图片等相关信息,尽在DangDang.com,网购《自然语言处理:基于预训练模型的方法》,就上当http://product.dangdang.com/29273992.html
9.转化率预估(pCVR)系列延迟预估模型(上篇)3.模型训练及预估:CVR和DFM联合训练,采用EM算法或sgd-Joint Learning等方法。在线预估时,只使用CVR模型,DFM被舍弃。 背景 展示广告中,oCPX/CPA模式(eCPM=pCTR * pCVR * CPA)下pcvr预估的准确性至关重要。 在参考ctr模型优化经验优化cvr模型时,会遇到一个问题,与点击(相比曝光)发生时间相比,转化发生时间要晚的https://www.zhuanzhi.ai/document/fb87ca09a0899775ab401f8300c5e5f9
10.一步一步教你在线免费训练机器学习模型(启用GPU和TPU)由于我无法在这篇文章中涵盖所有在线训练机器学习模型的服务,因此本文将有第二部分。 所有需要学习和实践机器学习的资源都是开源的,可在线获取。从计算、数据集、算法以及免费在线提供的各种高质量教程,你只需要互联网连接和学习的热情。 我希望这篇文章有用,因为它解决了开始走向机器学习和数据科学之路的业界人士所面https://blog.itpub.net/31545819/viewspace-2216969/
11.朱庆华宋珊珊风险视角下生成式人工智能的司法应用路径建构全生命周期的模型治理路径 针对算法黑箱、算法偏见,通常的应对之举是算法治理,即通过一系列规章制度和实践方法,旨在确保算法的公正性、透明度、可解释性和安全性,重点关注算法的实现和运行过程,包括数据采集、特征工程、模型开发、评估和部署等方面。而模型治理更侧重于机器学习模型的整个生命周期,包括需求规划、数据https://www.thepaper.cn/newsDetail_forward_26236606