Alink是阿里巴巴基于实时计算引擎Flink研发的新一代机器学习算法平台,是业界首个同时支持批式算法、流式算法的机器学习平台。本文和上文一起介绍了在线学习算法FTRL在Alink中是如何实现的,希望对大家有所帮助。
为了让大家更好理解,我们再次贴出整体流程图:
在线训练主要逻辑是:
前面说到,FTRL先要训练出一个逻辑回归模型作为FTRL算法的初始模型,这是为了系统冷启动的需要。
具体逻辑回归模型设定/训练是:
//traininitialbatchmodelLogisticRegressionTrainBatchOplr=newLogisticRegressionTrainBatchOp().setVectorCol(vecColName).setLabelCol(labelColName).setWithIntercept(true).setMaxIter(10);BatchOperator<>initModel=featurePipelineModel.transform(trainBatchData).link(lr);训练好之后,模型信息是DataSet类型,位于变量BatchOperator<>initModel之中,这是一个批处理算子。
FtrlTrainStreamOp将initModel作为初始化参数。
FtrlTrainStreamOpmodel=newFtrlTrainStreamOp(initModel)在FtrlTrainStreamOp构造函数中会加载这个模型;
dataBridge=DirectReader.collect(initModel);具体加载时通过MemoryDataBridge直接获取初始化模型DataSet中的数据。
publicMemoryDataBridgegenerate(BatchOperatorbatchOperator,ParamsglobalParams){returnnewMemoryDataBridge(batchOperator.collect());}2.2分割高维向量从前文可知,Alink的FTRL算法设置的特征向量维度是30000。所以算法第一步就是切分高维度向量,以便分布式计算。
StringvecColName="vec";intnumHashFeatures=30000;首先要获取切分信息,代码如下,就是将特征数目featureSize除以并行度parallelism,然后得到了每个task对应系数的初始位置。
privatestaticint[]getSplitInfo(intfeatureSize,booleanhasInterceptItem,intparallelism){intcoefSize=(hasInterceptItem)featureSize+1:featureSize;intsubSize=coefSize/parallelism;int[]poses=newint[parallelism+1];intoffset=coefSize%parallelism;for(inti=0;i //Tuple5 代码摘要如下: publicvoidflatMap(Rowrow,Collector 此处理论上有以下几个重点: 伪代码思路大致如下 doublep=learner.predict(x);//预测learner.updateModel(x,p,y);//更新模型doubleloss=LogLossEvalutor.calLogLoss(p,y);//计算损失evalutor.addLogLoss(loss);//更新损失totalLoss+=loss;trainedNum+=1;具体实施上Alink有自己的特点和调整。 机器学习都需要迭代训练,Alink这里利用了FlinkStream的迭代功能。 IterativeStream的实例是通过DataStream的iterate方法创建的˙。iterate方法存在两个重载形式: Alink选择了第二种。 在创建ConnectedIterativeStreams时候,用迭代流的初始输入作为第一个输入流,用反馈流作为第二个输入。 每一种数据流(DataStream)都会有与之对应的流转换(StreamTransformation)。IterativeStream对应的转换是FeedbackTransformation。 迭代流(IterativeStream)对应的转换是反馈转换(FeedbackTransformation),它表示拓扑中的一个反馈点(也即迭代头)。一个反馈点包含一个输入边以及若干个反馈边,且Flink要求每个反馈边的并行度必须跟输入边的并行度一致,这一点在往该转换中加入反馈边时会进行校验。 当IterativeStream对象被构造时,FeedbackTransformation的实例会被创建并传递给DataStream的构造方法。 迭代的关闭是通过调用IterativeStream的实例方法closeWith来实现的。这个函数指定了某个流将成为迭代程序的结束,并且这个流将作为输入的第二部分(secondinput)被反馈回迭代。 对于Alink来说,迭代构建代码是: //traindataformat= 反馈流的设置是通过调用IterativeStream的实例方法closeWith来实现的。Alink这里是 反馈流的格式是: 迭代体由两部分构成:CalcTask/ReduceTask。 CalcTask每一个实例都拥有初始化模型dataBridge。 DataStreamiterativeBody=iteration.flatMap(newCalcTask(dataBridge,splitInfo,getParams()))2.3.3.1迭代初始化迭代是由CalcTask.open函数开始,主要做如下几件事 CalcTask.flatMap1主要实现的是FTRL算法中的predict部分(注意,不是FTRL预测)。 解释:pt=σ(Xtw)是LR的预测函数,求出pt的唯一目的是为了求出目标函数(在LR中采用交叉熵损失函数作为目标函数)对参数w的一阶导数g,gi=(ptyt)xi。此步骤同样适用于FTRL优化其他目标函数,唯一的不同就是求次梯度g(次梯度是左导和右导之间的集合,函数可导--左导等于右导时,次梯度就等于一阶梯度)的方法不同。 函数的输入是"训练输入数据",即SplitVector.flatMap的输出---->CalcCalcTask的输入。输入数据是一个五元组,其格式为traindataformat= 有三点需要注意: 大家会说,不对!predict函数应该是sigmoid=1.0/(1.0+np.exp(-w.dot(x)))。是的,这里还没有做sigmoid操作。当ReduceTask做了聚合之后,会把聚合好的p反馈回迭代体,然后在CalcTask.flatMap2中才会做sigmoid操作。 publicvoidflatMap1(Tuple5 publicstaticclassReduceTaskextendsRichFlatMapFunction 当具体用作输出模型使用时,其变量如下: models={HashMap@13258}size=1{Long@13456}1->{ArrayList@13678}size=1key={Long@13456}1value={ArrayList@13678}size=10={Tuple2@13698}"(1,0.0-8.244533295515879E-50.0-1.103997743166529E-40.0-3.336931546279811E-5....."2.3.3.4判断是否反馈这个filterresult是用来判断是否反馈的。这里t3.f0是sampleId,t3.f2是subNum。 DataStream 对于t3.f2,如果subNum大于零,说明在高维向量切分时候,是得到了有意义的数值。 这里是filteroutput。 DataStream 在LogisticRegression中,sigmoid函数是σ(a)=1/(1+exp(-a)),预估pt=σ(xt.wt),则LogLoss函数是 直接计算可以得到 具体LR+FTRL算法实现如下: publicvoidflatMap(Tuple7 //ftrlpredictFtrlPredictStreamOppredictResult=newFtrlPredictStreamOp(initModel).setVectorCol(vecColName).setPredictionCol("pred").setReservedCols(newString[]{labelColName}).setPredictionDetailCol("details").linkFrom(model,featurePipelineModel.transform(splitter.getSideOutput(0)));从上面代码我们可以看到 linkFrom函数完成了业务逻辑,大致功能如下: 即FTRL的预测功能有三个输入: 构造函数中完成了初始化,即获取事先训练好的逻辑回归模型。 publicFtrlPredictStreamOp(BatchOperatormodel){super(newParams());if(model!=null){dataBridge=DirectReader.collect(model);}else{thrownewIllegalArgumentException("Ftrlalgo:initialmodelisnull.Pleasesetavalidinitialmodel.");}}3.2获取在线训练模型CollectModel完成了获取在线训练模型功能。 其逻辑主要是:模型被分成若干块,其中(long)inRow.getField(1)这里记录了具体有多少块。所以flatMap函数会把这些块累积起来,最后组装成模型,统一发送给下游算子。 具体是通过一个HashMap<>buffers来完成临时拼装/最后组装的。 publicstaticclassPredictProcessextendsRichCoFlatMapFunction open函数会加载逻辑回归模型。 publicvoidopen(Configurationparameters)throwsException{this.predictor=newLinearModelMapper(TableUtil.fromSchemaJson(modelSchemaJson),TableUtil.fromSchemaJson(dataSchemaJson),this.params);if(dataBridge!=null){//readinitmodelList publicvoidflatMap1(Rowrow,Collector predictWithProb:157,LinearModelMapper(com.alibaba.alink.operator.common.linear)predictResultDetail:114,LinearModelMapper(com.alibaba.alink.operator.common.linear)map:90,RichModelMapper(com.alibaba.alink.common.mapper)flatMap1:174,FtrlPredictStreamOp$PredictProcess(com.alibaba.alink.operator.stream.onlinelearning)flatMap1:143,FtrlPredictStreamOp$PredictProcess(com.alibaba.alink.operator.stream.onlinelearning)processElement1:53,CoStreamFlatMap(org.apache.flink.streaming.api.operators.co)processRecord1:135,StreamTwoInputProcessor(org.apache.flink.streaming.runtime.io)具体是通过LinearModelMapper完成。 publicabstractclassRichModelMapperextendsModelMapper{publicRowmap(Rowrow)throwsException{if(isPredDetail){//我们的示例代码在这里Tuple2 /***Predictthelabelinformationwiththeprobabilityofeachlabel.*/publicTuple2 LinearModelData参数是由CollectModel完成加载并且传输出来的。 publicvoidflatMap2(LinearModelDatalinearModel,Collector