今天大部分流行的机器学习的推理和训练程序基本都是由数据科学家用Python来编写的,比如流行的TensorFlow、PyTorch以及一些传统机器学习场景中用到的scikit-learn等等。我们希望支持数据科学家继续使用熟悉的Python编写特征工程代码来完成端到端机器学习链路的开发与部署,并且能够使用他们所熟悉的Python生态环境中的库。
越来越多的机器学习应用在往实时方向发展,通过实时处理可以提高机器学习的效率和准确度。为了达到目标,需要生成实时特征。这里不仅仅是去实时获取查询特征,而是要实时生成特征。例如需要实时获取用户在最近两分钟内的点击次数,为此需要使用流式计算引擎完成实时特征计算。
越来越多的中小型公司希望做到多云部署,以得到生产的安全保证,以及获得云厂商之间的竞价优势。因此我们的方案不要求用户绑定一个云厂商,而是要让用户能够自由地在不同云厂商之间做选择,甚至在私有云部署特征工程作业。
这是FeatHub项目设立之初所希望满足的一些条件。
①特征穿越
①需要手动翻译
很多公司会有一个专门的平台团队把数据科学家写的单进程Python作业翻译成可分布式执行的Flink或者Spark作业,来实现高性能高可用的部署。其翻译过程会增加整个开发生命周期长度。并且因为还需要额外的人力去做翻译工作,增加了开发成本,更进一步带来了引入Bug的可能。另一拨人将数据科学家的作业翻译之后的逻辑未必和原先的逻辑保持一致,这样就带来更多的Debug工作量。
①特征分布变化
特征工程作业的整个质量和效率不只是取决于作业有没有Bug,还依赖于上游的输入数据数值分布能满足一些特性,例如能接近于训练时的数据数值分布。很多作业的推理效果下降,经常是由于上游作业生产的数据分布发生了变化。这种情况下,需要开发者去追踪整个链路,一段段去看在哪个地方的特征数据分布发生了变化,根据具体情况再去看是否需要重新训练或者解决Bug。这部分人力工作量过大也是一个痛点。
①开发工作重复
虽然很多特征计算作业的开发团队和场景不同,但其实用了类似甚至相同的特征定义。很多公司中没有一个很好的渠道,让公司内不同团队能查询和复用已有特征。这就导致不同团队经常需要做重复开发,甚至对于相同特征需要重复跑作业去生成一些特征。这带来了人力和计算/储存资源的浪费,因为需要更多的计算、内存、存储空间去生成相同特征。
②point-in-timecorrect语义
接下来介绍FeatHub作为一个FeatureStore,对于整个特征开发周期的每一阶段试图解决的问题和提供的工具。
在特征开发阶段,FeatHub会提供一个基于Python的具有高易用性的SDK,让用户能简洁地表达特征的计算逻辑。特征计算本质是一个特征的ETL。开发阶段最重要的是SDK的易用性和简洁性。
在特征部署阶段,FeatHub会提供执行引擎,实现高性能,低延迟的特征计算逻辑的部署,并且能对接不同的特征存储。部署阶段最重要的是执行引擎的性能和对接不同特征存储的能力。
在FeatureStore领域内已经有一些具有代表性的FeatureStore项目,例如今年初LinkedIn开源的Feathr,以及开源了多年的Feast。我们调研了这些项目,发现他们并不能很好地达成我们提出的目标场景。
FeatHub相比现有方案,带来的额外价值包括:
①简单易用的PythonSDK。FeatHub的SDK参考了已有的FeatureStore项目的SDK,能支持这些项目的核心功能,并进一步提升了SDK的抽象能力和易用性,
②支持单机上的开发和实验。开发者不需要对接分布式的Flink或Spark集群来跑实验,而只需要使用单机上的CPU或者内存资源就可以进行开发和实验,并能使用scikit-learn等单机上的机器学习算法库。
③无需修改代码即可切换执行引擎。当用户完成单机上的开发后,可以将单机执行引擎切换到Flink或Spark等分布式执行引擎,而无需修改表达特征计算逻辑的代码。使用Flink作为执行引擎可以让Feathub支持高吞吐、低延时的实时特征计算。FeatHub将来会进一步支持使用Spark作为执行引擎,让用户在离线场景中可以得到潜在的更好的吞吐性能,根据场景自由选择最合适的执行引擎。
④提供执行引擎的扩展能力。FeatHub不仅可以支持以Flink、Spark作为执行引擎,还支持开发者自定义执行引擎,使用公司内部自研的执行引擎进行特征ETL。
⑤代码开源,使得用户可以自由选择部署FeatHub的云厂商,也可以在私有云中进行部署。
以上是包含FeatHub主要模块的架构图。最上层提供了一套PythonSDK,支持用户定义数据源、数据终点以及特征计算逻辑。由SDK所定义的特征可以注册到特征元数据中心,支持其他用户和作业来查询和复用特征,甚至可以基于特征元数据进一步分析特征血缘。特征定义包含了特征的source、sink,以及常见的计算逻辑,例如UDF调用、特征拼接,基于over窗口与滑动窗口的聚合等。当需要取生成用户所定义的特征时,FeatHub会提供一些内置的FeatureProcessor,也就是执行引擎,去执行已有特征的计算逻辑。当用户需要在单机上做实验时,可以使用LocalProcessor使用单机上的资源,无需对接一个远程的集群。当需要生成实时特征时,可以使用FlinkProcessor完成高吞吐、低延时的流式特征计算。
将来也可以支持类似于LambdaFunction的FeatureService来实现在线的特征计算,以及对接Spark来完成高吞吐的离线特征计算。执行引擎可以对接不同的离线和在线特征储存系统,例如用Redis完成在线特征储存,用HDFS完成离线特征储存,以及用Kafka完成近线特征储存。
上图展现了FeatHub如何被用户使用,以及对接下游的机器学习训练和推理程序,用户或开发者将通过SDK来表达所希望计算的特征,然后提交到执行引擎上进行部署。特征经过计算后,需要输出到特征储存,例如Redis和HDFS。一个机器学习离线训练程序可以直接读取HDFS中的数据去做批量训练。一个在线的机器学习推理程序可以直接读取Redis中的数据进行在线推理。
上图展现了FeatHub中的核心概念之间的关系。一个TableDescriptor表达一组特征的集合。TableDescriptor经过逻辑转换可以生产一个新的TableDescriptor。
TableDescriptor分为两类。其中FeatureTable表达的是具有特定物理地址的表,例如可以是一个在Redis中的表,也可以是一个在HDFS中的表。FeatureView则是一些不一定有物理地址的逻辑表,通常是从一个FeatureTable经过一连逻辑串转换后得到的。
FeatureView有如下3个子类:
①DerivedFeatureView输出的特征表和其输入的特征表(i.e.source)的行基本是一对一的。它可以支持表达单行转换逻辑(e.g.加减乘除),overwindow聚合逻辑,以及特征拼接逻辑。它可用于生成训练数据。例如在之前所介绍的例子中,需要将训练样本去拼接来自不同维表的特征以得到实际的训练数据,就可以使用DerivedFeatureView来完成。
③OnDemandFeatureView可以与FeatureService用在一起,支持在线特征计算。例如在使用高德地图时,开发者可能会希望在收到用户的请求之后,根据用户当前的物理位置与上一次发送请求时的物理位置,计算出用户移动的速度和方向速度,来协助推荐路线的决策。这些特征必须在收到用户请求的时候进行在线计算得到。OnDemandFeatureView可以用于支持这类场景。
Transform表达的是特征计算逻辑。FeatHub当前支持如下5种特征计算逻辑:
①Expression支持用户基于一个DSL语言表达单行的特征计算逻辑。其表达能力接近SQL语言中的select语句,可以支持加减乘除和内置函数调用,可以让熟悉SQL的开发者快速上手。
②Join表达的是特征拼接逻辑。开发者可以指定维表的名字和需要拼接的特征名字等信息。
③PythonUDF支持用户自定义Python函数来计算特征。
④OverWindow表达的是Over窗口聚合逻辑。例如在收到一行数据时,用户希望根据之前的5行数据,进行聚合并计算有多少条数据符合某个规则。
⑤SlidingWindow表达的是滑动窗口聚合逻辑。
从上图中可以看到,通常一个特征ETL作业会从特征源表读取特征,经过多次特征计算逻辑产生新的特征,并将生成的特征输出到特征结果表。特征源表可以对接不同的特征存储,例如有FileSystem,Kafka,Hive等。类似的,特征结果表也可以对接FileSystem,Kafka,Redis等特征储存。
Processor包括LocalProcessor、FlinkProcessor、SparkProcessor,分别可以使用单机物理资源,分布式的Flink集群,以及分布式Spark集群,去执行用户所定义的特征计算逻辑。
在介绍了FeatHub的架构和核心概念后,我们将通过一些样例程序来展现FeatHubSDK的表达能力以及易用性。对于特征开发SDK来说,其最核心的能力就是如何表达新的特征计算逻辑。FeatHubSDK支持特征拼接、窗口聚合、内置函数调用以及自定义Python等能力,将来还可以支持基于JAVA或者C++的UDF调用。
滑动窗口聚合与Over窗口聚合比较类似,API上唯一区别是可以额外指定step_size。如果step_size=1分钟,则窗口会在每分钟进行滑动并产生新的特征值。
在PythonUDF调用的代码片段中,用户可以自定义一个Python函数,对输入的特征进行任意的处理,例如产生小写的字符串。
通过以上几个代码片段,我们可以看出FeatHub的API是比较简洁易用的。用户只需要设置计算逻辑所必须的参数,而无需了解处理引擎的细节。
以上代码片段展示了一个样例FeatHub应用所需要完成的步骤。
①首先用户需要创建一个FeatHubClient并设置processor_type。如果是本地实验,可以设置成Local,如果是远程分布式生产部署,可以设置成Flink。
③用户可以创建一个FeatureView来表达特征拼接和聚合的逻辑。如果要做拼接,用户可以item_price_events.price来表达希望拼接的特征。FeatHub会找到名字为item_price_events的表并从中拿到名字为price的特征。用户还可以使用OverWindowTransform来完成Over窗口聚合,定义一个名为total_payment_last_two_minutes的特征。其中window_size=2分钟表示对于两分钟内的数据应用指定的表达式和聚合函数来计算特征。
④对于已经定义的FeatureView,如果用户想做本地开发和实验,并使用scikit-learn算法库进行单机上的训练,可以使用to_pandas()API来将数据以PandasDataFrame格式获取到单机的内存中。
⑤当用户需要完成特征的生产部署时,可以使用FileSystemSink指定用于存放数据的离线特征储存。然后调用execute_insert()将特征输出到所指定的Sink当中。
FeatHub的基本价值是提供SDK来方便用户开发特征,并且提供执行引擎来计算特征。除此之外,FeatHub还将提供执行引擎的性能优化,让用户在特征部署阶段获得更多的收益。例如对于基于滑动窗口聚合的特征,目前如果使用原生的FlinkAPI来计算,Flink会在每个滑动的step_size都输出对应的特征值,无论特征的数值是否发生了变化。对于window_size=1小时,step_size=1秒这样的滑动窗口,大部分情况下Flink可能会输出相同的特征数值。这样会浪费网络流量、下游存储等资源。FeatHub中支持用户配置滑动窗口的行为,允许滑动窗口只在特征数值发生变化的时候输出特征,来优化特征计算作业的资源使用量。
另外FeatHub还将进一步优化滑动窗口的内存和CPU使用量。在某些场景中,用户会定于许多类似的滑动窗口特征。这些特征只有windowsize不一样。例如我们可能希望得到每个用户最近1分钟,5分钟,和10分钟内的购买商品的花费总数。如果使用原生的FlinkAPI来计算,作业可能会使用三个聚合算子来分别计算这3个特征。每个聚合算子会有单独的内存空间。考虑到这些算子所处理的数据和计算逻辑具有较大的重合,FeatHub可以用一个自定义算子,统一完成这些特征的计算,来达到节约内存和CPU资源的目标。
FeatHub目前已经在GitHub开源,能够支持一些基本的LocalProcessor和FlinkProcessor的功能。我们会进一步完善FeatHub的核心功能来方便用户特征工程的开发和落地。其中包括支持更多常用的离线储存、在线存储,对接Notebook,提供WebUI来可视化特征的元数据,支持用户做特征的注册、搜索、复用,以及支持使用Spark作为FeatHub的执行引擎。
FeatHub代码库目前放在github/alibaba目录下。为了方便大家学习使用FeatHub,并快速找到和参照满足所需场景需求的代码片段,我们在flink-extended/feathub-examples代码库中提供额外代码示例,大家可以自由使用尝试。欢迎大家提供反馈,以及贡献PR。
A1:原则上都有,即使数据没有乱序,如果在Join时没有考虑到timestamp字段,就可能导致乱序。在实际场景中,源数据可能也会乱序。这时候可以使用类似于Flink中的watermark策略来等待晚到的数据,降低乱序的影响。另外我们可以用定期的离线作业来backfill在线特征数据,从而进一步降低数据乱序的影响。
A2:FeatHubAPI是能支持回放的,但目前这部分功能还没有经过生产验证。FeatHub将支持使用Flink和Spark作为执行引擎,因此可以复用Flink和Spark的计算能力来完成历史数据的回放。例如,我们可以启动一个Spark作业,设置Source来处理过去一个月内所有的HDFS上的数据,并执行所定义的特征拼接和聚合逻辑,然后将计算得到的特征输出。
A3:特征计算分为离线、近线和在线,Flink是一个近线执行引擎,可以实时计算例如最近5分钟内的用户点击次数这样的特征,同时也可以支持离线计算。因此FeatHub可以支持离线和近线特征计算。FeatHub将来有计划去支持在线特征计算,使用基于FeatureService的架构,来计算OnDemandFeatureView所表达的特征。
A4:FeatHub将会支持所有Flink所支持的Source/Sink,包括ODPS,Holo等阿里云提供的服务。目前FeatHub只支持Kafka和FileSystem。我们会逐步添加更多的储存支持。