大家在使用Alink过程中有任何问题可钉钉扫描下方二维码进群交流~
在线学习(OnlineLearning)是机器学习的一种模型训练方法,可以根据线上数据的变化,实时调整模型,是模型能够反映线上的变化,从而提高线上预测的准确率。
为了更好的理解在线学习(OnlineLearning)的概念,我们先介绍与之相对应的概念:批量训练(BatchLearning),先确定一个样本训练集,针对训练集的全体数据进行训练,一般需要使用迭代过程,重复使用数据集,不断调整参数。在线学习不需要要事先确定训练数据集,训练数据在训练过程中逐条到达的,每来一个训练样本,就会根据该样本产生的损失函数值、目标函数值及梯度,对模型进行一次迭代。
介绍完核心的两个组件,我们再看看所需的初始模型、训练数据流和预测数据流是如何准备的。如下图所示,初始模型,是采用传统的离线训练方式,对批式的训练数据进行训练得到的。
FTRL算法是线性算法,其输入的数据必须都是数值型的,而原始的数据既有数值型的,也有离散型的,我们需要进行相应的特征工程操作,将原始特征数据变换为向量形式。
我们这里需要使用特征工程的组件,将批式原始训练数据转化为批式向量训练数据;将流式原始训练数据转化为流式向量训练数据;将流式原始预测数据转化为流式向量预测数据。
我们在com.alibaba.alinkpackage下新建一个Java文件:
packagecom.alibaba.alink;publicclassFTRLExample{publicstaticvoidmain(String[]args)throwsException{}}本文的示例参考Alink的Pythondemo:
我们看到每条数据包含多个数据项,是以逗号分隔。下面是各数据列的定义如下:
我们根据各列的定义,组装schemaStr如下:
StringschemaStr="idstring,clickstring,dtstring,C1string,banner_posint,site_idstring,site_domainstring,"+"site_categorystring,app_idstring,app_domainstring,app_categorystring,device_idstring,"+"device_ipstring,device_modelstring,device_typestring,device_conn_typestring,C14int,C15int,"+"C16int,C17int,C18int,C19int,C20int,C21int";有了schema的定义,我们可以通过CsvSourceBatchOp读取显示数据,脚本如下:
由于列数较多,我们不容易将数据与列名对应起来。为了更好的看数据,这里有一个小技巧,打印出来的文本数据及分隔换行符号,正好是MarkDown格式,可以将其复制粘贴到MarkDown编辑器,即可看到整齐的图片显示,如下图所示:
上一篇展示了数据,这里会继续深入了解数据,由数据列的描述信息,知道里面含有哪些数值型特征,哪些为枚举型特征。具体内容如下面脚本所示:
StringlabelColName="click";String[]selectedColNames=newString[]{"C1","banner_pos","site_category","app_domain","app_category","device_type","device_conn_type","C14","C15","C16","C17","C18","C19","C20","C21","site_id","site_domain","device_id","device_model"};String[]categoryColNames=newString[]{"C1","banner_pos","site_category","app_domain","app_category","device_type","device_conn_type","site_id","site_domain","device_id","device_model"};String[]numericalColNames=newString[]{"C14","C15","C16","C17","C18","C19","C20","C21"};“click“列标明了是否被点击,是分类问题的标签列。对于数值型特征,各特征的取值范围差异很大,一般需要进行标准化、归一化等操作;枚举类型的特征不能直接应用到FTRL模型,需要进行枚举值到向量值的映射,后面还需将各列的变换结果合成到一个向量,即是后面模型训练的特征向量。
我们的示例里,选择对数值类型进行标准化操作,并使用了FeatureHash算法组件,在其参数设置中,需要指定处理的各列名称,并需要标明,哪些是枚举类型,那么没被标明的列就是数值类型。FeatureHash操作会将这些特征通过hasn的方式,映射到一个稀疏向量中,向量的维度可以设置,我们这里设置为30000。每个数值列都会被hash到一个向量项,该列的数值就会付给对应的向量项;而对每个枚举特征的不同枚举值,也会被hash到向量项,并被赋值为1。
//resultcolumnnameoffeatureenginerringStringvecColName="vec";intnumHashFeatures=30000;//setupfeatureenginerringpipelinePipelinefeature_pipeline=newPipeline().add(newStandardScaler().setSelectedCols(numericalColNames)).add(newFeatureHasher().setSelectedCols(selectedColNames).setCategoricalCols(categoryColNames).setOutputCol(vecColName).setNumFeatures(numHashFeatures));我们定义特征工程处理pipeline(管道),其中包括了StandardScaler和FeatureHasher,对批式训练数据trainBatchData执行fit方法,及进行训练,得到PipelineModel(管道模型)。该管道模型可以作用在批式数据,也可以应用在流式数据,生成特征向量。我们先把这个特征工程处理模型保存到本地,设置文件路径为:/Users/yangxu/alink/data/temp/feature_pipe_model.csv。
//fitandsavefeaturepipelinemodelStringFEATURE_PIPELINE_MODEL_FILE="/Users/yangxu/alink/data/temp/feature_pipe_model.csv";feature_pipeline.fit(trainBatchData).save(FEATURE_PIPELINE_MODEL_FILE);BatchOperator.execute();示例四上一篇我们训练并保存了特征工程处理模型,我们这里需要使用特征工程处理模型:
批式原始训练数据为:
//loadpipelinemodelPipelineModelfeature_pipelineModel=PipelineModel.load(FEATURE_PIPELINE_MODEL_FILE);Alink的PipelineModel既能预测批式数据,也可以预测流式数据,而且调用方式系统,使用模型实例的transform方法即可。
批式向量训练数据可以通过如下代码得到:
feature_pipelineModel.transform(trainBatchData)流式向量训练数据可以通过如下代码得到:
feature_pipelineModel.transform(train_stream_data)流式向量预测数据可以通过如下代码得到:
feature_pipelineModel.transform(test_stream_data)进一步,我们通过批式向量训练数据可以训练出一个线性模型作为后面在线学习FTRL算法的初始模型。如下面脚本所示,首先定义逻辑回归分类器lr,然后将批式向量训练数据“连接”到此分类器,输出结果便为逻辑回归模型。
FTRL在线模型训练的代码如下,在FtrlTrainStreamOp的构造函数中输入初始模型initModel,随后是设置各种参数,并“连接“流式向量训练数据。
#ftrltrainmodel=FtrlTrainStreamOp(initModel)\.setVectorCol(vecColName)\.setLabelCol(labelColName)\.setWithIntercept(True)\.setAlpha(0.1)\.setBeta(0.1)\.setL1(0.01)\.setL2(0.01)\.setTimeInterval(10)\.setVectorSize(numHashFeatures)\.linkFrom(feature_pipelineModel.transform(train_stream_data))FTRL在线预测的代码如下,需要“连接”FTRL在线模型训练输出的模型流,和流式向量预测数据。
#ftrlpredictpredResult=FtrlPredictStreamOp(initModel)\.setVectorCol(vecColName)\.setPredictionCol("pred")\.setReservedCols([labelColName])\.setPredictionDetailCol("details")\.linkFrom(model,feature_pipelineModel.transform(test_stream_data))我们可以如下设置流式结果的打印,由于数据较多,打印前先对流式数据进行采样。注意,对于流式的任务,print()方法不能触发流式任务的执行,必须调用StreamOperator.execute()方法,才能开始执行。
predResult.sample(0.0001).print();StreamOperator.execute();在执行的过程中,会先运行批式的初始模型训练,待批式任务执行结束,再启动流式任务。得到的结果显示片段如下:
click|pred|details-----|----|-------为打印的流式数据的列名信息,第1列是原始的“click”信息,第二列为预测结果列,第三列为预测的详细信息列。对应的预测结果形式为:
0|0|{"0":"0.7870435722294925","1":"0.2129564277705075"}最后,我们再将预测结果流predResult,接入流式二分类评估组件EvalBinaryClassStreamOp,并设置相应的参数,由于每次评估结果给出的是Json格式,为了便于显示,还可以在后面上Json内容提取组件JsonValueStreamOp。代码如下:
//ftrlevalpredResult.link(newEvalBinaryClassStreamOp().setLabelCol(labelColName).setPredictionCol("pred").setPredictionDetailCol("details").setTimeInterval(10)).link(newJsonValueStreamOp().setSelectedCol("Data").setReservedCols(newString[]{"Statistics"}).setOutputCols(newString[]{"Accuracy","AUC","ConfusionMatrix"}).setJsonPath(newString[]{"$.Accuracy","$.AUC","$.ConfusionMatrix"})).print();StreamOperator.execute();注意:流式的组件“连接”完成后,需要调用流式任务执行命令,即StreamOperator.execute(),开始执行。显示结果如下:
最后,贴出完整代码,感兴趣的读者可以运行实验。
注意,由于示例中需要演示中间结果,有很多打印或执行的方法,我现将调用这些方法的代码设为了注释,读者可以自己释放某些代码,查看运行效果。