Alink漫谈(一):从KMeans算法实现不同看Alink设计思想罗西的思考

Alink是阿里巴巴基于实时计算引擎Flink研发的新一代机器学习算法平台,是业界首个同时支持批式算法、流式算法的机器学习平台。本文将带领大家从多重角度出发来分析推测Alink的设计思路。

因为Alink的公开资料太少,所以以下均为自行揣测,肯定会有疏漏错误,希望大家指出,我会随时更新。

ApacheFlink是由Apache软件基金会开发的开源流处理框架,它通过实现了GoogleDataflow流式计算模型实现了高吞吐、低延迟、高性能兼具实时流式计算框架。

其核心是用Java和Scala编写的分布式流数据流引擎。Flink以数据并行和流水线方式执行任意流数据程序,Flink的流水线运行时系统可以执行批处理和流处理程序。此外,Flink的运行时本身也支持迭代算法的执行。

借助Flink在批流一体化方面的优势,Alink能够为批流任务提供一致性的操作。在2017年初,阿里团队通过调研团队看到了Flink在批流一体化方面的优势及底层引擎的优秀性能,于是基于Flink重新设计研发了机器学习算法库,即Alink平台。该平台于2018年在阿里集团内部上线,随后不断改进完善,在阿里内部错综复杂的业务场景中锻炼成长。

因为目前关于Alink设计的公开资料比较少,我们手头只有其源码,看起来只能从代码反推。但是世界上的事物都不是孤立的,我们还有其他角度来帮助我们判断推理。所以下面就让我们来进行推断。

FlinkML是Flink社区现存的一套机器学习算法库,这一套算法库已经存在很久而且更新比较缓慢。

Alink团队起初面临的抉择是:是否要基于FlinkML进行开发,或者对FlinkML进行更新。

经过研究,Alink团队发现,FlinkML其仅支持10余种算法,支持的数据结构也不够通用,在算法性能方面做的优化也比较少,而且其代码也很久没有更新。所以,他们放弃了基于旧版FlinkML进行改进、升级的想法,决定基于Flink重新设计研发机器学习算法库。

所以我们要分析的就是如何从无到有设计出一个新的机器学习平台/框架。

因为Alink是市场的新进入者,所以Alink的最大问题就是如何替代市场上的现有产品。

迈克尔·波特用“替代品威胁”来解释用户的整个替代逻辑,当新产品能牢牢掌握住这一点,就有可能在市场上获得非常好的表现,打败竞争对手。

假如现在想从0到1构建一个机器学习库或者机器学习框架,那么我们需要从商业意识和商业逻辑出发,来思考这个产品的价值所在,就能对这个产品做个比较精确的定义,从而能够确定产品路线。

产品需要解决应用环境下的综合性问题,产品的价值体现,可以分拆了三个维度。

下面就让我们逐一分析。

这个就是换用成本的问题,一旦换用成本过高,这个产品就很难成功。

Alink大略有两种用户:算法工程师,应用工程师。

Alink算法工程师特指实现机器学习算法的工程师。Alink应用工程师就是应用AlinkAI算法做业务的工程师。这两类用户的换用成本都是Alink需要考虑的。

新产品对于用户来说,有两个大的问题:产品底层逻辑和开发工具。一个优秀的新产品绝对不能在这两个问题上增加用户的换用成本。

Flink这个平台博大精深,无论是熟悉其API还是深入理解系统架构都不是容易的事情。如果Alink用户还需要熟悉Flink,那势必造成ALink用户的换用成本,所以这点应该尽量避免。

综上所述,Alink的原则之一应该是:算法的归算法,Flink的归Flink,尽量屏蔽AI算法和Flink之间的联系。

开发工具就是究竟用什么语言开发。Flink的开发语言主要是JAVA,SCALA,Python。而机器学习世界中主要还是Python。

在PyAlink中,算法组件提供的接口基本与JavaAPI一致,即通过默认构造方法创建一个算法组件,然后通过setXXX设置参数,通过link/linkTo/linkFrom与其他组件相连。这里利用Jupyter的自动补全机制可以提供书写便利。

另外,如果采用JAVA或者Python,肯定有大量现有代码可以修改复用。如果采用SCALA,就难以复用之前的积累。

综上所述,Alink的原则之一应该是:采用最简单,最常见的开发语言和设计思维。

Alink的竞争对手大略可以认为是SparkML,FlinkML,Scikit-learn。

他们是市场上的现有力量,拥有大量的用户。用户已经熟悉了这些竞争对手的设计思路,开发策略,基本概念和API。除非Alink能够提供一种神奇简便的API,否则Alink应该在设计上最大程度借鉴这些竞争对手。

比如机器学习开发中有如下常见概念:Transformer,Estimator,PipeLine,Parameter。这些概念Alink应该尽量提供。

综上所述,**Alink的原则之一应该是:尽量借鉴市面上通用的设计思路和开发模式,让开发者无缝切换**。

从Alink的目录结构中,我们可以看出,Alink确实提供了这些常见概念。

./java/com/alibaba/alink:common operator params pipeline./java/com/alibaba/alink/params:associationrule evaluation nlp regression statisticsclassification feature onlinelearning shared tuningclustering io outlier similarity udfdataproc mapper recommendation sql validators./java/com/alibaba/alink/pipeline:EstimatorBase.java ModelBase.java Trainer.java featureLocalPredictable.java ModelExporterUtils.java TransformerBase.java nlpLocalPredictor.java Pipeline.java classification recommendationMapModel.java PipelineModel.java clustering regressionMapTransformer.java PipelineStageBase.java dataproc tuning5.企业角度看设计这是成本结构和收益的规模性问题。从而决定了Alink在开发时候,必须尽量提高开发工程师的效率,提高生产力。前面提到的弃用SCALA,部分也出于这个考虑。

挑战集中在:

举个例子:

我们需要针对八十万禁军,让林冲林教头设计出一套适合正规作战的枪棒打法。或者针对背嵬军,让岳飞岳元帅设计一套马军冲阵机制。

因此,**Alink的原则之一应该是:构建一套战术打法(middleware或者adapter),即屏蔽了Flink,又可以利用好Flink,还可以让用户基于此可以快速开发算法**。

我们想想看大概有哪些基础工作需要做:

让我们看看Alink做了哪些努力,这点从其目录结构可以看出有queue,operator,mapper等等构建架构所必须的数据结构:

./java/com/alibaba/alink/common:MLEnvironment.java linalgMLEnvironmentFactory.java mapperVectorTypes.java modelcomqueue utilsio./java/com/alibaba/alink/operator:AlgoOperator.java commonbatch stream其中最重要的概念是BaseComQueue,这是把通信或者计算抽象成ComQueueItem,然后把ComQueueItem串联起来形成队列。这样就形成了面向迭代计算场景的一套迭代通信计算框架。其他数据结构大多是围绕着BaseComQueue来具体运作。

/***Baseclassforthecom(Computation&&Communicate)queue.*/publicclassBaseComQueue>implementsSerializable{ /** *Allcomputationorcommunicationfunctions. */ privatefinalListqueue=newArrayList<>(); /** *sessionIdforsharedobjectswithinthisBaseComQueue. */ privatefinalintsessionId=SessionSharedObjs.getNewSessionId(); /** *Thefunctionexecutedtodecidewhethertobreaktheloop. */ privateCompareCriterionFunctioncompareCriterion; /** *Thefunctionexecutedwhenclosingtheiteration */ privateCompleteResultFunctioncompleteResult; /** *Maxiterationcount. */ privateintmaxIter=Integer.MAX_VALUE; privatetransientExecutionEnvironmentexecutionEnvironment;}MLEnvironment是另外一个重要的类。其封装了Flink开发所必须要的运行上下文。用户可以通过这个类来获取各种实际运行环境,可以建立table,可以运行SQL语句。

/***TheMLEnvironmentstoresthenecessarycontextinFlink.*EachMLEnvironmentwillbeassociatedwithauniqueID.*TheoperationsassociatedwiththesameMLEnvironmentID*willsharethesameFlinkjobcontext.*/publicclassMLEnvironment{privateExecutionEnvironmentenv;privateStreamExecutionEnvironmentstreamEnv;privateBatchTableEnvironmentbatchTableEnv;privateStreamTableEnvironmentstreamTableEnv;}6.设计原则总结下面我们可以总结下Alink部分设计原则

Flink和Alink源码中,都提供了KMeans算法例子,所以我们就从KMeans入手看看Flink原生算法和Alink算法实现的区别。为了统一标准,我们都选用JAVA版本的算法实现。

KMeans算法的思想比较简单,假设我们要把数据分成K个类,大概可以分为以下几个步骤:

K-Means是迭代的聚类算法,初始设置K个聚类中心

下面给出部分代码,具体算法解释可以在注释中看到。

这里主要采用了Flink的批量迭代。其调用DataSet的iterate(int)方法创建一个BulkIteration,迭代以此为起点,返回一个IterativeDataSet,可以用常规运算符进行转换。迭代调用的参数int指定最大迭代次数。

IterativeDataSet调用closeWith(DataSet)方法来指定哪个转换应该反馈到下一个迭代,可以选择使用closeWith(DataSet,DataSet)指定终止条件。如果该DataSet为空,则它将评估第二个DataSet并终止迭代。如果没有指定终止条件,则迭代在给定的最大次数迭代后终止。

这里是算法主程序,这里倒是看起来十分清爽干净,但实际上是没有这么简单,Alink在其背后做了大量的基础工作。

可以看出,算法实现的主要工作是:

publicfinalclassKMeansTrainBatchOpextendsBatchOperator implementsKMeansTrainParams{ staticDataSetiterateICQ(...省略...){ returnnewIterativeComQueue() .initWithPartitionedData(TRAIN_DATA,data) .initWithBroadcastData(INIT_CENTROID,initCentroid) .initWithBroadcastData(KMEANS_STATISTICS,statistics) .add(newKMeansPreallocateCentroid()) .add(newKMeansAssignCluster(distance)) .add(newAllReduce(CENTROID_ALL_REDUCE)) .add(newKMeansUpdateCentroids(distance)) .setCompareCriterionOfNode0(newKMeansIterTermination(distance,tol)) .closeWith(newKMeansOutputModel(distanceType,vectorColName,latitudeColName,longitudeColName)) .setMaxIter(maxIter) .exec(); }}KMeansPreallocateCentroid预先分配聚类中心

publicclassKMeansPreallocateCentroidextendsComputeFunction{publicvoidcalc(ComContextcontext){if(context.getStepNo()==1){ListinitCentroids=(List)context.getObj("initCentroid");Listlist=(List)context.getObj("statistics");IntegervectorSize=(Integer)list.get(0);context.putObj("vectorSize",vectorSize);FastDistanceMatrixDatacentroid=(FastDistanceMatrixData)initCentroids.get(0);Preconditions.checkArgument(centroid.getVectors().numRows()==vectorSize,"Initcentroiderror,sizenotequal!");context.putObj("centroid1",Tuple2.of(context.getStepNo()-1,centroid));context.putObj("centroid2",Tuple2.of(context.getStepNo()-1,newFastDistanceMatrixData(centroid)));context.putObj("k",centroid.getVectors().numCols());}}}KMeansAssignCluster为每个点(point)计算最近的聚类中心,为每个聚类中心的点坐标的计数和求和

/***Updatethecentroidsbasedonthesumofpointsandpointnumberbelongingtothesamecluster.*/publicclassKMeansUpdateCentroidsextendsComputeFunction{@Overridepublicvoidcalc(ComContextcontext){IntegervectorSize=context.getObj(KMeansTrainBatchOp.VECTOR_SIZE);Integerk=context.getObj(KMeansTrainBatchOp.K);double[]sumMatrixData=context.getObj(KMeansTrainBatchOp.CENTROID_ALL_REDUCE);Tuple2stepNumCentroids;if(context.getStepNo()%2==0){stepNumCentroids=context.getObj(KMeansTrainBatchOp.CENTROID2);}else{stepNumCentroids=context.getObj(KMeansTrainBatchOp.CENTROID1);}stepNumCentroids.f0=context.getStepNo();context.putObj(KMeansTrainBatchOp.K,updateCentroids(stepNumCentroids.f1,k,vectorSize,sumMatrixData,distance));}}4.区别代码量通过下面的分析可以看出,从实际业务代码量角度说,其实差别不大。

所以Alink代码只能说比Flink原生实现略大。

这里指的是与Flink的耦合度。能看出来Flink的KMeans算法需要大量的Flink类。而Alink被最大限度屏蔽了。

importorg.apache.flink.api.common.functions.MapFunction;importorg.apache.flink.api.common.functions.ReduceFunction;importorg.apache.flink.api.common.functions.RichMapFunction;importorg.apache.flink.api.java.DataSet;importorg.apache.flink.api.java.ExecutionEnvironment;importorg.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;importorg.apache.flink.api.java.operators.IterativeDataSet;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.api.java.tuple.Tuple3;importorg.apache.flink.api.java.utils.ParameterTool;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.api.common.functions.MapFunction;importorg.apache.flink.api.java.DataSet;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.ml.api.misc.param.Params;importorg.apache.flink.types.Row;importorg.apache.flink.util.Preconditions;

THE END
1.AI在自然语言处理中的突破:从理论到应用腾讯云开发者社区自然语言处理(Natural Language Processing,NLP)是人工智能(AI)的一个重要分支,旨在实现计算机与人类语言的交互。近年来,随着深度学习和大规模语言模型的发展,自然语言处理取得了显著突破,从理论研究到实际应用,推动了多个领域的进步。本文将介绍NLP的核心技术及其突破,并通过代码示例展示其应用。 https://cloud.tencent.com/developer/article/2479408
2.社区Edge AI是边缘计算的研究方向之一,它将人工智能算法和模型推送到边缘设备,使其具备处理复杂数据的能力。随着硬件的不断进步,越来越多的智能设备能够在本地进行推理和决策,而无需将数据发送到云端。Intel和NVIDIA等公司也在加速边缘计算硬件的研发,提升计算能力以应对复杂的AI任务。 https://open.alipay.com/portal/forum/post/192201027
3.LeNet介绍1989年,Yang LeCun等人提出了LeNet网络,这是最早的卷积神经网络,极大的推动了深度学习的发展,Yang LeCun也被称为卷积网络之父。 应用使用CNN(LeNET-5)对MNIST数据集进行训练和分类 import torch from torc…https://zhuanlan.zhihu.com/p/9948140417
4.美团移动端推荐的onlinelearning实践孔东营孔东营作为美团推荐算法工程师,分享了如何在美团移动端推荐系统中应用Online Learning(在线学习)进行重排序。 孔东营回顾了美团移动端O2O推荐的发展历程。2015年上半年,美团的交易额达到470亿,其中推荐金额占比达到10%。随着移动端流量的增加,推荐系统在移动端的作用变得更加重要。移动端的推荐相比PC端,位置信息变得尤为https://download.csdn.net/download/happytofly/9275355
5.机器学习PAI全新功效——实时新闻热点OnlineLearning实践针对这种场景,PAI平台开创性的提出来Online-Learning的解决方案,通过流式算法和离线算法的结合,既能够发挥离线训练对大规模数据的强大处理能力,又能够发挥流式机器学习算法对实时模型的更新能力,做到流批同跑,完美解决模型时效性的问题。今天就以实时热点新闻挖掘案例为例,为大家介绍PAI OnlineLearning的解决方案。 https://maimai.cn/article/detail?fid=1092991292&efid=q4lYsgkD4uccYLTNjKAn9A
6.online?learning而且需要指出的是,这种方法没有严格证明,只是模仿batch mode adaboost. 我把这个算法用在uci的训练数据上,效果不是很好。作者的主页是:http://www.vision.ee./~hegrabne/. 这个是他用online learning 做tracking的项目主页:http://www.vision.ee./boostingTrackers/。有现成代码和demo。http://www.360doc.com/content/12/0611/17/3571299_217488645.shtml
7.transferlearning/doc/awesomepaper.mdatmaster·smileAAAI-15 Online Boosting Algorithms for Anytime Transfer and Multitask Learning 一种通用的在线迁移学习方法,可以适配在现有方法的后面 IJSR-13 Knowledge Transfer Using Cost Sensitive Online Learning Classification 探索在线迁移方法,用样本cost Zero-shot / few-shot learningFewhttps://github.com/smile-ttxp/transferlearning/blob/master/doc/awesome_paper.md
8.learning题目答案解析,learning题目答案解析2Q-learning算法中,Q函数是()。(难度) A、 状态-动作值函数 B、 状态函数 C、 估值函数 D、 奖励函数 免费查看参考答案及解析 题目: 运行STP协议的交换机,端口在Learning状态下需要等待转发延时后才能转化为Forwarding状态。 A、 正确 B、 错误 免费查看参考答案及解析 题目: RSTP协议定义了不同的端口状https://www.12tiku.com/newtiku/so-learning-l2.html
9.深度学习面试题图像算法.doc如果模型是非常稀疏的,那么优先考虑自适应学习率的算法; 在模型设计实验过程中,要快速验证新模型的效果,用Adam进行快速实验优化; 在模型上线或者结果发布前,可以用精调的SGD进行模型的极致优化。 1.14batchsize和epoch的平衡 1.15SGD每步做什么,为什么能onlinelearning? onlinelearning强调的是学习是实时的,流式的,每次https://max.book118.com/html/2024/1008/6051131122010231.shtm
10.CourseraDegrees,Certificates,&FreeOnlineCoursesLearn new job skills in online courses from industry leaders like Google, IBM, & Meta. Advance your career with top degrees from Michigan, Penn, Imperial & more.https://www.coursera.org/
11.清华大学出版社O'Reilly Online Learning提供的Deitel Pearson产品 你所在的公司或大学或许已经订阅了O'Reilly Online Learning,因而能免费访问我们Deitel在培生出版的所有电子书和LiveLessons视频。除此之外,还能免费访问Paul Deitel提供的为期一天的Full Throttle培训课程。个人可在以下网址注册10天免费试用: http://www.tup.tsinghua.edu.cn/bookscenter/preface.html?id=09847001
12.科技资讯数据资讯针对这种场景,PAI平台开创性的提出来Online-Learning的解决方案,通过流式算法和离线算法的结合,既能够发挥离线训练对大规模数据的强大处理能力,又能够发挥流式机器学习算法对实时模型的更新能力,做到流批同跑,完美解决模型时效性的问题。今天就以实时热点新闻挖掘案例为例,为大家介绍PAI OnlineLearning的解决方案。 实验http://www.forenose.com/column/blocked/10.html?mid=2&p=12
13.布洛托上校博弈模型及求解方法研究进展offlineandonlinestrategylearning,strategygameandrelatedsolutionconG cepts,onlineoptimizationandregretvaluearebrieflyintroduced.Secondly,sixtypesofColonelBlottogame models(continuous Blottogame,discreteColonelLottogame,generalizedColonelBlottogame,generalizedLottoBlottogame,generalizedruleColonel Lottogameandonlinediscretehttps://www.jsjkx.com/CN/PDF/10.11896/jsjkx.230600011