分类和回归是监督式机器学习算法的两个类别。监督式ML也称为预测分析,可利用算法来找出加标签数据中的模式,然后使用可识别这些模式的模型来预测新数据的标签。分类和回归算法采用带标签(也称为目标结果)和特征(也称为属性)的数据集,并学习如何基于这些数据特征标记新数据。
分类可识别某个项目所属的类别,如信用卡交易是否合法。回归可预测连续数值,例如房价。
回归可估计目标结果因变量(标签)与一个或多个自变量(特征)之间的关系。回归可用于分析标签和特征变量之间的关系强度,通过调整一个或多个特征变量来确定标签的变化量,并预测标签和特征变量之间的趋势。
我们来看一下有关房价的线性回归示例,其中给定了历史房价以及房屋面积(以平方英尺计)、卧室数量和位置等房屋特征:
线性回归对Y“标签”与X“特征”之间的关系进行建模,,inthiscasetherelationshipbetweenthehousepriceandsize,withtheequation:Y=截距+(系数*X)+误差。系数用于衡量特征对标签的影响,本例中即房屋面积对房价的影响。
多重线性回归可对两个或更多“特征”和一个“标签”之间的关系进行建模。例如,若要对房价与房屋面积、卧室数量和卫生间数量之间的关系进行建模,多重线性回归函数将如下所示:
Yi=β0+β1X1+β2X2+···+βpXp+
Price==截距+(系数1*面积)+(系数2*卧室数量)+(系数3*卫生间数量)+误差。
系数用于衡量每个特征对房价的影响。
决策树将创建一个模型,该模型通过评估一组遵循if-then-else模式的规则来预测标签。if-then-else特征问题为节点,答案“true”或“false”则为决策树中指向子节点的分支。
决策树模型会估算在评估做出正确决策的概率时所需的最少true/false问题数。决策树可用于分类以预测类别或某类别的概率,或用于回归以预测连续数值。以下示例展示了通过简化版决策树预测房价的具体流程:
集成学习算法结合了多种机器学习算法,可获得更出色的模型。随机森林是用于分类和回归的一种主流集成学习方法。该算法基于训练阶段中不同的数据子集,构建出由多个决策树组成的模型。然后结合所有树的输出来作出预测,以便缩小方差并提高预测准确性。使用随机森林分类会将标签预测为大多数决策树所预测的类。而使用随机森林回归时,标签为各个决策树的回归预测均值。
Spark提供了以下回归算法:
机器学习是一个迭代过程,其中包括:
必须将ML算法要使用的特征和标签置入某个特征向量中,该向量为数字向量,代表每个特征的值。特征向量用于训练、测试和评估ML算法的结果,以构建最佳模型。
参考学习Spark
SparkML提供了一套统一的高级别API,这些API基于DataFrame构建,用于搭建ML流程或ML工作流程。基于DataFrame构建ML流程可实现分区数据处理的可扩展性,且便于通过SQL操作数据。
在SparkML流程中,我们通过转换器传递数据并提取特征,使用估测器生成模型,并使用评估器测量模型的准确性。
Tobuildamodel,youextractthefeaturesthatmostcontributetotheprediction.Inordertomakesomeofthefeaturesmorerelevantforpredictingthemedianhousevalue,insteadofusingtotalswe’llcalculateandusetheseratios:roomsperhouse=totalrooms/households,peopleperhouse=population/households,andbedroomsperrooms=卧室总数/房间总数。
在这种情况下,我们对以下标签和特征使用随机森林回归:
第一步是将数据加载到DataFrame。在以下代码中,我们指定了要加载到数据集的数据源和模式。
Inthefollowingcodeexample,weusetheDataFramewithColumn()transformation,toaddcolumnsfortheratiofeatures:roomsperhouse=totalrooms/households,peopleperhouse=population/households,andbedroomsperrooms=卧室总数/房间总数。然后,我们缓存DataFrame并创建临时视图,以增强性能,同时提高SQL的易用性。
//为特征创建比率df=df.withColumn("roomsPhouse",col("totalrooms")/col("houshlds"))df=df.withColumn("popPhouse",col("population")/col("houshlds"))df=df.withColumn("bedrmsPRoom",col("totalbdrms")/col("totalrooms"))df=df.drop("totalrooms","houshlds","population","totalbdrms")df.cachedf.createOrReplaceTempView("house")spark.catalog.cacheTable("house")
df.describe("medincome","medhvalue","roomsPhouse","popPhouse").showresult:+-------+------------------+------------------+------------------+------------------+|summary|medincome|medhvalue|roomsPhouse|popPhouse|+-------+------------------+------------------+------------------+------------------+|count|20640|20640|20640|20640||mean|3.8706710030346416|206855.81690891474|5.428999742190365|3.070655159436382||stddev|1.8998217183639696|115395.61587441359|2.4741731394243205|10.38604956221361||min|0.4999|14999.0|0.8461538461538461|0.6923076923076923||max|15.0001|500001.0|141.9090909090909|1243.3333333333333|+-------+------------------+------------------+------------------+------------------+
df.select(corr("medhvalue","medincome")).show()+--------------------------+|corr(medhvalue,medincome)|+--------------------------+|0.688075207464692|+--------------------------+
以下代码使用DataFramerandomSplit方法将数据集随机分为两部分,其中80%用于训练,20%用于测试。
valArray(trainingData,testData)=df.randomSplit(Array(0.8,0.2),1234)
下列代码将创建VectorAssembler(一种转换器),此转换器可用于流程中,将一组给定列组合为单个特征向量列。
valfeatureCols=Array("medage","medincome","roomsPhouse","popPhouse","bedrmsPRoom","longitude","latitude")//将特征放入特征矢量列中valassembler=newVectorAssembler().setInputCols(featureCols).setOutputCol("rawfeatures")
以下代码将创建StandardScaler(一种转换器),此转换器可用于流程中,通过使用DataFrame列汇总统计将特征扩展到单位方差,进而将特征标准化。
valscaler=newStandardScaler().setInputCol("rawfeatures").setOutputCol("features").setWithStd(true.setWithMean(true)
在流程中运行这些转换器的结果是将向数据集中添加一个扩展的特征列,如下图所示。
流程中的最后一个元素是RandomForestRegressor(一种估测器),此估测器通过特征向量和标签进行训练,然后返回RandomForestRegressorModel(一种转换器)。
valrf=newRandomForestRegressor().setLabelCol("medhvalue").setFeaturesCol("features")
在以下示例中,我们将VectorAssembler、Scaler和RandomForestRegressor置于一个流程中。流程将多个转换器和估测器相链接,以指定用于训练和使用模型的ML工作流程。
valsteps=Array(assembler,scaler,rf)valpipeline=newPipeline().setStages(steps)
SparkML支持通过一项名为“K折交叉验证”的技术来测试不同的参数组合,从而确定ML算法的哪些参数值可生成最佳模型。通过K折交叉验证,数据将随机分为K个分区。每个分区都将作为测试数据集使用一次,其余分区则用于训练。然后,通过训练集生成模型,通过测试集评估模型,最终生成K模型准确性测量值。使准确性测量值达到最高的模型参数将生成最佳模型。
SparkML通过转换或估测流程支持K折交叉验证,此流程可使用名为“网格搜索”的过程来测试不同的参数组合,您可以在交叉验证工作流程中设置参数供其测试。
以下代码使用ParamGridBuilder构造用于模型训练的参数网格。我们会定义RegressionEvaluator,其通过对比测试medhvalue列与测试预测列,对模型进行评估。我们使用CrossValidator来选择模型。CrossValidator使用流程、参数网格和评估器来拟合训练数据集,并返回最佳模型。CrossValidator使用ParamGridBuilder来迭代RandomForestRegressor估测器的maxDepth、maxBins和numbTrees参数,并评估模型,为得到可靠的结果,每个参数值重复三次。
valparamGrid=newParamGridBuilder().addGrid(rf.maxBins,Array(100,200)).addGrid(rf.maxDepth,Array(2,7,10)).addGrid(rf.numTrees,Array(5,20)).build()valevaluator=newRegressionEvaluator().setLabelCol("medhvalue").setPredictionCol("prediction").setMetricName("rmse")valcrossvalidator=newCrossValidator().setEstimator(pipeline).setEvaluator(evaluator).setEstimatorParamMaps(paramGrid).setNumFolds(3)//拟合训练数据集并返回模型valpipelineModel=crossvalidator.fit(trainingData)
接下来,我们可获得最佳模型,从而理清特征重要程度。结果表明,收入中值、每个房屋的人口数和经度是最重要的特征。
valfeatureImportances=pipelineModel.bestModel.asInstanceOf[PipelineModel].stages(2).asInstanceOf[RandomForestRegressionModel].featureImportancesassembler.getInputCols.zip(featureImportances.toArray).sortBy(-_._2).foreach{case(feat,imp)=>println(s"feature:$feat,importance:$imp")}result:feature:medincome,importance:0.4531355014139285feature:popPhouse,importance:0.12807843645878508feature:longitude,importance:0.10501162983981065feature:latitude,importance:0.1044621179898163feature:bedrmsPRoom,importance:0.09720295935509805feature:roomsPhouse,importance:0.058427239343697555feature:medage,importance:0.05368211559886386
在以下示例中,我们利用交叉验证过程获得了可生成最佳随机森林模型的参数,即此过程的返回值:最大深度为2,最大箱数为50,且棵树为5。
valbestEstimatorParamMap=pipelineModel.getEstimatorParamMaps.zip(pipelineModel.avgMetrics).maxBy(_._2)._1println(s"Bestparams:\n$bestEstimatorParamMap")result:rfr_maxBins:50,rfr_maxDepth:2,rfr_-numTrees:5
接下来,我们使用测试DataFrame来测量模型的准确性,测试DataFrame是从原始DataFrame随机分割的数据,占原始DataFrame的20%,且未用于训练。
在以下代码中,我们在流程模型上调用转换,此操作将依照流程步骤将测试DataFrame传入特征提取阶段,通过由模型调整选出的随机森林模型进行估测,然后将预测结果返回到新的DataFrame列。
valpredictions=pipelineModel.transform(testData)predictions.select("prediction","medhvalue").show(5)result:+------------------+---------+|prediction|medhvalue|+------------------+---------+|104349.59677450571|94600.0||77530.43231856065|85800.0||111369.71756877871|90100.0||97351.87386020401|82800.0|+------------------+---------+
Withthepredictionsandlabelsfromthetestdata,wecannowevaluatethemodel.Toevaluatethelinearregressionmodel,youmeasurehowclosethepredictionsvaluesaretothelabelvalues.Theerrorinaprediction,shownbythegreenlinesbelow,isthedifferencebetweentheprediction(theregressionlineYvalue)andtheactualYvalue,orlabel.(Error=prediction-label)。
平均绝对误差(MAE)是标签值与模型预测值之间的平均绝对差值。绝对值会消除所有负号。MAE=sum(absolute(prediction-label))/观察次数)。
TheMeanSquareError(MSE)isthesumofthesquarederrorsdividedbythenumberofobservations.Thesquaringremovesanynegativesignsandalsogivesmoreweighttolargerdifferences.(MSE=sum(squared(prediction-label))/观察次数)。
均方根误差(RMSE)是MSE的平方根。RMSE是预测误差的标准偏差。误差表示的是标签数据点距回归线的距离,而RMSE则表示误差的分散程度。
ThefollowingcodeexampleusestheDataFramewithColumntransformation,toaddacolumnfortheerrorinprediction:error=prediction-medhvalue。然后,我们显示预测值、房价中值和误差的汇总统计信息(以千美元计)。
以下代码示例使用SparkRegressionEvaluator,计算预测DataFrame的MAE,并返回36636.35(千美元)。
valmaevaluator=newRegressionEvaluator().setLabelCol("medhvalue").setMetricName("mae")valmae=maevaluator.evaluate(predictions)result:mae:Double=36636.35
以下代码示例使用SparkRegressionEvaluator,计算预测DataFrame的RMSE,并返回52724.70。
valevaluator=newRegressionEvaluator().setLabelCol("medhvalue").setMetricName("rmse")valrmse=evaluator.evaluate(predictions)result:rmse:Double=52724.70
现在,我们可以将拟合的流程模型保存到分布式文件存储中,供以后在生产中使用。此操作可同时保存特征提取阶段和模型调整所选择的随机森林模型。
pipelineModel.write.overwrite().save(modeldir)
保存流程模型会得到一个元数据的JSON文件和一个模型数据的Parquet。我们可以使用加载命令重新加载模型,原始模型和重新加载的模型相同:
valsameModel=CrossValidatorModel.load(“modeldir")
本章中,我们讨论了回归、决策树和随机森林算法,介绍了SparkML流程的基础知识,并通过实际示例来预测房价中值。