Alink到目前已经发布了四个Release版本:
上图是Alink在发布1.0版本的时候,所有的算法以及功能,简单来说,Alink的批式功能是和SparkML对应的,SparkML有的功能,Alink基本都提供了。相较于SparkML,除了批式的功能,Alink还提供了流式的功能。
Alink在近半年,功能上整体没有大的变化,下面列举一些正在研发测试,即将开源的一些功能:
在本章,将按照版本的发布顺序,逐步介绍Alink的特性,设计原理,以及使用技巧等内容。
从Alink1.1.0开始,使用Maven中央仓库即可构建Alink项目,下面是POM文件示例。Flink1.10版本依赖:
从1.1.0开始,Alink已经发布到了PyPI,安装更加方便了,请参考如下链接:
在讲Notebook示例之前,我们先来了解一下PyAlink的设计背景:
基于这样的设计背景,我们来看下一现在Notebook上进行Alink开发的实践。
本地运行代码示例:
frompyalink.alinkimport*##一个Batch作业的例子useLocalEnv(2)##preparedataimportnumpyasnpimportpandasaspddata=np.array([[0,0.0,0.0,0.0],[1,0.1,0.1,0.1],[2,0.2,0.2,0.2],[3,9,9,9],[4,9.1,9.1,9.1],[5,9.2,9.2,9.2]])df=pd.DataFrame({"id":data[:,0],"f0":data[:,1],"f1":data[:,2],"f2":data[:,3]})inOp=BatchOperator.fromDataframe(df,schemaStr='iddouble,f0double,f1double,f2double')FEATURE_COLS=["f0","f1","f2"]VECTOR_COL="vec"PRED_COL="pred"vectorAssembler=(VectorAssembler().setSelectedCols(FEATURE_COLS).setOutputCol(VECTOR_COL))kMeans=(KMeans().setVectorCol(VECTOR_COL).setK(2).setPredictionCol(PRED_COL))pipeline=Pipeline().add(vectorAssembler).add(kMeans)pipeline.fit(inOp).transform(inOp).firstN(9).collectToDataframe()集群运行代码示例:
frompyalink.alinkimport*##一个Batch作业的例子useRemoteEnv("10.101.**.**",31805,2,shipAlinkAlgoJar=False)##preparedataimportnumpyasnpimportpandasaspddata=np.array([[0,0.0,0.0,0.0],[1,0.1,0.1,0.1],[2,0.2,0.2,0.2],[3,9,9,9],[4,9.1,9.1,9.1],[5,9.2,9.2,9.2]])df=pd.DataFrame({"id":data[:,0],"f0":data[:,1],"f1":data[:,2],"f2":data[:,3]})inOp=BatchOperator.fromDataframe(df,schemaStr='iddouble,f0double,f1double,f2double')FEATURE_COLS=["f0","f1","f2"]VECTOR_COL="vec"PRED_COL="pred"vectorAssembler=(VectorAssembler().setSelectedCols(FEATURE_COLS).setOutputCol(VECTOR_COL))kMeans=(KMeans().setVectorCol(VECTOR_COL).setK(2).setPredictionCol(PRED_COL))pipeline=Pipeline().add(vectorAssembler).add(kMeans)pipeline.fit(inOp).transform(inOp).firstN(9).collectToDataframe()我们可以看到本地和远程代码上的差别,就只有第4行代码不一样,本地使用的是useLocalEnv(2),远程使用的是useRemoteEnv("10.101..",31805,2,shipAlinkAlgoJar=False)。相较于本地环境,集群环境需要指定Flink集群的ip地址和端口。
本地运行示例:
集群运行模式中,流式任务和批式任务的设置有点差别,流式的任务需要指定本地的IP地址(localIp),我们使用Notebook进行交互式开发时,一般需要看到运行结果,批式任务使用Flink现有的机制,是可以直接看到运行结果的。但是流式任务数据流是无边界的,为了将流任务的运行结果返回回来,让用户可以实时看到,我们单独建立了一个通路进行数据传输,因此我们需要设置这个本地的IP地址,和集群进行交互。当然,这个本地IP地址的参数,在4月份的版本(Alink1.1.1版本)中,已经可以自动检测到了,可以省略掉了。
本节中,重点介绍两点PyAlink和PyFlink的兼容特性。
下面是一段Alink和PyFlink代码混用的示例:
###get_mlenv.pyfrompyalink.alinkimport*env,btenv,senv,stenv=getMLEnv()###使用PyFlink接口,与Table进行互转table=stenv.from_elements([(1,2),(2,5),(3,1)],['a','b'])source=TableSourceStreamOp(table)source.print()StreamOperator.execute()我们可以看到,使用PyFlink构建Table(第5行),可以直接转化为PyAlink的数据源算子(第6行)。
下面是提交任务的示例:
###直接运行脚本pythonkmeans.py###向集群提交作业PYFLINK_PATH=`python-c"importpyflink;print(pyflink.__path__[0])"`${PYFLINK_PATH}/bin/flinkrun-m10.101.**.**:31805-pykmeans.py-p4■读写Kafka我们在FlinkKafkaConnector基础上,为Kafka的输入输出包装了Source和Sink组件,让大家读写Kafka数据更加方便。下面是一个从数据读入,数据解析,对数据进行逻辑回归预测,将结果写入Kafka的任务的代码示例:
我们也可以看到,在Alink1.1.0这个版本中,数据的解析还是有一点麻烦,在本文后面的部分还会介绍对数据解析部分的简化,让整个流程更简洁。
本章开始,我们将详细介绍Alink1.1.1版本的一些优化的点,以及重要特性等。
在我们使用算法组件的时候,经常会遇到有些属性是枚举类型的,在Python中,一般是通过字符串输入枚举值,实际在使用的过程中,这些枚举值很难全部记住,经常需要去查询Alink的文档。为了我们编写代码更加顺畅,在新版本中,我们优化了代码的提示信息,我们可以尝试填写一个替代值,虽然会抛异常,但在运行结果中,可以看到枚举值的明确提示。
以卡方筛选算子为例,卡方筛选算子的SelectorType可以填写NumTopFeatures,Percentil,FPR等,是枚举类型变量,我们如果使用'aaa'值代替,看下会有什么效果,代码如下:
###Python代码selector=ChiSqSelectorBatchOp()\.setSelectorType("aaa")\.setSelectedCols(["f_string","f_long","f_int","f_double"])\.setLabelCol("f_boolean")\.setNumTopFeatures(2)在Alink1.1.1之前的版本,会返回下图:
异常信息中打出SelectorType输出错误的值AAA,但异常信息不明显,也没有指出是哪个参数写错了。
Alink1.1.1中,则会出现下图的结果:
异常信息中会有哪个参数填写错误,以及会提示可能的值是什么,这样我们使用Alink算子的时候更加便捷。
上面是Python代码的枚举类型的错误提示,对于JAVA来说,有代码自动提示,编写时会非常方便:
我们进行机器学习开发,算法中往往会有很多列名参数,列名输错情况很常见,如下图所示:
我们可能将text字段错误的写成了text1,在1.1.1版本里,不仅会指出哪列不存在,也会提示最可能的列名,帮助用户做修正,见下图所示。
这样,用户可以更快的定位错误,排查问题。JAVA的行为也相同:
输出提示如下:
我们在使用Python时,更多是用DataFrame来操作数据,在使用PyAlink时,有一个DataFrame向AlinkTable转换的过程,转换的速度会直接影响整个任务的执行时长,为了给用户一个比较好的用户体验,我们在转化上面,做了比较大的性能优化。
以下面的示例代码为例:
下面是一组JOSN格式的测试数据。
{"sepal_width":3.4,"petal_width":0.2,"sepal_length":4.8,"category":"Iris-setosa","petal_length":1.6}{"sepal_width":4.1,"petal_width":0.1,"sepal_length":5.2,"category":"Iris-setosa","petal_length":1.5}{"sepal_width":2.8,"petal_width":1.5,"sepal_length":6.5,"category":"Iris-versicolor","petal_length":4.6}{"sepal_width":3.0,"petal_width":1.8,"sepal_length":6.1,"category":"Iris-virginica","petal_length":4.9}{"sepal_width":2.9,"petal_width":1.8,"sepal_length":7.3,"category":"Iris-virginica","petal_length":6.3}我们需要将其解析为下图这样的结构化数据。
Alink1.1.1之前,我们可能需要编写下面这样的代码:
json_parser=JsonValueStreamOp().setSelectedCol("message").setOutputCols(["sepal_length","sepal_width","petal_length","petal_width","category"]).setJsonPath(["$.sepal_length","$.sepal_width",data=data.link(\JsonToColumnsStreamOp().setSelectedCol("message").setSchemaStr("sepal_lengthdouble,sepal_widthdouble,petal_lengthdouble,"+"petal_widthdouble,categorystring").setReservedCols([]))"$.petal_length","$.petal_width","$.category"])data=data.link(json_parser)data=data.select(\"CAST(sepal_lengthASDOUBLE)ASsepal_length,"+"CAST(sepal_widthASDOUBLE)ASsepal_width,"+"CAST(petal_lengthASDOUBLE)ASpetal_length,"+"CAST(petal_widthASDOUBLE)ASpetal_width,category")在Alink1.1.1版本中,我们添加了JsonToColumnsStreamOp组件,代码变成这样:
data=data.link(\JsonToColumnsStreamOp().setSelectedCol("message").setSchemaStr("sepal_lengthdouble,sepal_widthdouble,petal_lengthdouble,"+"petal_widthdouble,categorystring").setReservedCols([]))我们可以看到,代码精简了很多。
最后,介绍一个日志解析的例子,我们知道,日志的格式没有一个完整的规律,不是一个JSON格式,也不是KV格式,这就需要用现有工具进行组合来解决。
下面是一段日志记录的内容:
source.select("SUBSTRING(textFROM1ForPOSITION('['intext)-2)ASpart1,"+"REGEXT_EXTRACT(text,'(\\[)(.*)(\\])',2)ASlog_time,"+"SUBSTRING(textFROM2+POSION(']'INtext))ASpart2").link(newCsvToColumnBatchOp().setSelectCol("part1").setFieldDelimiter("").setSchemaStr("ipstring,col1string,col2string")).link(newCsvToColumnBatchOp().setSelectCol("part2").setFieldDelimiter("").setSchemaStr("cmdstring,responseint,bytesizeint,col3string,col4string")).link(newCsvToColumnBatchOp().setSelectCol("cmd").setFieldDelimiter("").setSchemaStr("req_methodstring,urlString,protocolstring")).select("ip,col1,col2,log_time,req_method,url,protocol,response,bytesize,col3,col4")上面的代码思路如下:
以上。Alink是基于Flink的机器学习算法平台,欢迎访问Alink的GitHub链接获取更多信息。也欢迎加入Alink开源用户群进行交流~