失败率重启策略可以在flink-conf.yaml中设置下面的配置参数来启用:
例子:
restart-strategy.failure-rate.max-failures-per-interval:3restart-strategy.failure-rate.failure-rate-interval:5minrestart-strategy.failure-rate.delay:10s
失败率重启策略也可以在程序中设置:
Job直接失败,不会尝试进行重启
restart-strategy:none
无重启策略也可以在程序中设置
valenv=ExecutionEnvironment.getExecutionEnvironment()env.setRestartStrategy(RestartStrategies.noRestart())5)案例
需求:输入五次zhangsan,程序挂掉。
代码:
1.需求:
假定用户需要每隔1秒钟需要统计4秒中窗口中数据的量,然后对统计的结果值进行checkpoint处理
2.数据规划:
使用自定义算子每秒钟产生大约10000条数据。产生的数据为一个四元组(Long,String,String,Integer)—------(id,name,info,count)。数据经统计后,统计结果打印到终端输出。打印输出的结果为Long类型的数据。
3.开发思路:
source算子每隔1秒钟发送10000条数据,并注入到Window算子中。window算子每隔1秒钟统计一次最近4秒钟内数据数量。每隔1秒钟将统计结果打印到终端。每隔6秒钟触发一次checkpoint,然后将checkpoint的结果保存到HDFS中。
5.开发步骤:
获取流处理执行环境
设置检查点机制
自定义数据源
数据分组
数据聚合
数据打印
触发执行
示例代码:
当谈及仅一次处理时,我们真正想表达的是每条输入消息只会影响最终结果一次!(影响应用状态一次,而非被处理一次)即使出现机器故障或软件崩溃,Flink也要保证不会有数据被重复处理或压根就没有被处理从而影响状态。
在Flink1.4版本之前,精准一次处理只限于Flink应用内,也就是所有的Operator完全由Flink状态保存并管理的才能实现精确一次处理。但Flink处理完数据后大多需要将结果发送到外部系统,比如Sink到Kafka中,这个过程中Flink并不保证精准一次处理。
在Flink1.4版本正式引入了一个里程碑式的功能:两阶段提交Sink,即TwoPhaseCommitSinkFunction函数。该SinkFunction提取并封装了两阶段提交协议中的公共逻辑,自此Flink搭配特定Source和Sink(如Kafka0.11版)实现精确一次处理语义(英文简称:EOS,即Exactly-OnceSemantics)。
在Flink中需要端到端精准一次处理的位置有三个:
Flink端到端精准一次处理
Source端:数据从上一阶段进入到Flink时,需要保证消息精准一次消费。
Flink可靠性的基石-checkpoint机制详细解析
Sink端:将处理完的数据发送到下一阶段时,需要保证数据能够准确无误发送到下一阶段。
1)Flink端到端精准一次处理语义(EOS)
以下内容适用于Flink1.4及之后版本
对于Source端:Source端的精准一次处理比较简单,毕竟数据是落到Flink中,所以Flink只需要保存消费数据的偏移量即可,如消费Kafka中的数据,Flink将KafkaConsumer作为Source,可以将偏移量保存下来,如果后续任务出现了故障,恢复的时候可以由连接器重置偏移量,重新消费数据,保证一致性。
对于Sink端:Sink端是最复杂的,因为数据是落地到其他系统上的,数据一旦离开Flink之后,Flink就监控不到这些数据了,所以精准一次处理语义必须也要应用于Flink写入数据的外部系统,故这些外部系统必须提供一种手段允许提交或回滚这些写入操作,同时还要保证与FlinkCheckpoint能够协调使用(Kafka0.11版本已经实现精确一次处理语义)。
我们以Flink与Kafka组合为例,Flink从Kafka中读数据,处理完的数据在写入Kafka中。
为什么以Kafka为例,第一个原因是目前大多数的Flink系统读写数据都是与Kafka系统进行的。第二个原因,也是最重要的原因Kafka0.11版本正式发布了对于事务的支持,这是与Kafka交互的Flink应用要实现端到端精准一次语义的必要条件。
当然,Flink支持这种精准一次处理语义并不只是限于与Kafka的结合,可以使用任何Source/Sink,只要它们提供了必要的协调机制。
2)Flink与Kafka组合
Flink应用示例
如上图所示,Flink中包含以下组件:
一个Source,从Kafka中读取数据(即KafkaConsumer)
一个Sink,将结果写入到Kafka(即KafkaProducer)
若要Sink支持精准一次处理语义(EOS),它必须以事务的方式写数据到Kafka,这样当提交事务时两次Checkpoint间的所有写入操作当作为一个事务被提交。这确保了出现故障或崩溃时这些写入操作能够被回滚。
当然了,在一个分布式且含有多个并发执行Sink的应用中,仅仅执行单次提交或回滚是不够的,因为所有组件都必须对这些提交或回滚达成共识,这样才能保证得到一个一致性的结果。Flink使用两阶段提交协议以及预提交(Pre-commit)阶段来解决这个问题。
3)两阶段提交协议(2PC)
两阶段提交协议(Two-PhaseCommit,2PC)是很常用的解决分布式事务问题的方式,它可以保证在分布式事务中,要么所有参与进程都提交事务,要么都取消,即实现ACID中的A(原子性)。
在数据一致性的环境下,其代表的含义是:要么所有备份数据同时更改某个数值,要么都不改,以此来达到数据的强一致性。
两阶段提交协议中有两个重要角色,协调者(Coordinator)和参与者(Participant),其中协调者只有一个,起到分布式事务的协调管理作用,参与者有多个。
顾名思义,两阶段提交将提交过程划分为连续的两个阶段:表决阶段(Voting)和提交阶段(Commit)。
两阶段提交协议过程如下图所示:
两阶段提交协议
第一阶段:表决阶段
协调者向所有参与者发送一个VOTE_REQUEST消息。
当参与者接收到VOTE_REQUEST消息,向协调者发送VOTE_COMMIT消息作为回应,告诉协调者自己已经做好准备提交准备,如果参与者没有准备好或遇到其他故障,就返回一个VOTE_ABORT消息,告诉协调者目前无法提交事务。
第二阶段:提交阶段
协调者收集来自各个参与者的表决消息。如果所有参与者一致认为可以提交事务,那么协调者决定事务的最终提交,在此情形下协调者向所有参与者发送一个GLOBAL_COMMIT消息,通知参与者进行本地提交;如果所有参与者中有任意一个返回消息是VOTE_ABORT,协调者就会取消事务,向所有参与者广播一条GLOBAL_ABORT消息通知所有的参与者取消事务。
每个提交了表决信息的参与者等候协调者返回消息,如果参与者接收到一个GLOBAL_COMMIT消息,那么参与者提交本地事务,否则如果接收到GLOBAL_ABORT消息,则参与者取消本地事务。
4)两阶段提交协议在Flink中的应用
Flink的两阶段提交思路:
我们从Flink程序启动到消费Kafka数据,最后到Flink将数据Sink到Kafka为止,来分析Flink的精准一次处理。
当Checkpoint启动时,JobManager会将检查点分界线(checkpointbattier)注入数据流,checkpointbarrier会在算子间传递下去,如下如所示:
Flink精准一次处理:Checkpoint启动
Source端:FlinkKafkaSource负责保存Kafka消费offset,当Chckpoint成功时Flink负责提交这些写入,否则就终止取消掉它们,当Chckpoint完成位移保存,它会将checkpointbarrier(检查点分界线)传给下一个Operator,然后每个算子会对当前的状态做个快照,保存到状态后端(StateBackend)。
对于Source任务而言,就会把当前的offset作为状态保存起来。下次从Checkpoint恢复时,Source任务可以重新提交偏移量,从上次保存的位置开始重新消费数据,如下图所示:
Flink精准一次处理:checkpointbarrier及offset保存Slink端:从Source端开始,每个内部的transform任务遇到checkpointbarrier(检查点分界线)时,都会把状态存到Checkpoint里。数据处理完毕到Sink端时,Sink任务首先把数据写入外部Kafka,这些数据都属于预提交的事务(还不能被消费),此时的Pre-commit预提交阶段下DataSink在保存状态到状态后端的同时还必须预提交它的外部事务,如下图所示:
Flink精准一次处理:预提交到外部系统
当所有算子任务的快照完成(所有创建的快照都被视为是Checkpoint的一部分),也就是这次的Checkpoint完成时,JobManager会向所有任务发通知,确认这次Checkpoint完成,此时Pre-commit预提交阶段才算完成。才正式到两阶段提交协议的第二个阶段:commit阶段。该阶段中JobManager会为应用中每个Operator发起Checkpoint已完成的回调逻辑。
本例中的DataSource和窗口操作无外部状态,因此在该阶段,这两个Opeartor无需执行任何逻辑,但是DataSink是有外部状态的,此时我们必须提交外部事务,当Sink任务收到确认通知,就会正式提交之前的事务,Kafka中未确认的数据就改为“已确认”,数据就真正可以被消费了,如下图所示:
Flink精准一次处理:数据精准被消费
注:Flink由JobManager协调各个TaskManager进行Checkpoint存储,Checkpoint保存在StateBackend(状态后端)中,默认StateBackend是内存级的,也可以改为文件级的进行持久化保存。
最后,一张图总结下Flink的EOS:
此图建议保存,总结全面且简明扼要,再也不怂面试官!
5)Exactly-Once案例
Kafka来实现End-to-EndExactly-Once语义:
Redis实现End-to-EndExactly-Once语义:
代码开发步骤:
FlinkSQL是Flink实时计算为简化计算模型,降低用户使用实时计算门槛而设计的一套符合标准SQL语义的开发语言。自2015年开始,阿里巴巴开始调研开源流计算引擎,最终决定基于Flink打造新一代计算引擎,针对Flink存在的不足进行优化和改进,并且在2019年初将最终代码开源,也就是我们熟知的Blink。Blink在原来的Flink基础上最显著的一个贡献就是FlinkSQL的实现。
FlinkSQL是面向用户的API层,在我们传统的流式计算领域,比如Storm、SparkStreaming都会提供一些Function或者DatastreamAPI,用户通过Java或Scala写业务逻辑,这种方式虽然灵活,但有一些不足,比如具备一定门槛且调优较难,随着版本的不断更新,API也出现了很多不兼容的地方。
在这个背景下,毫无疑问,SQL就成了我们最佳选择,之所以选择将SQL作为核心API,是因为其具有几个非常重要的特点:
SQL属于设定式语言,用户只要表达清楚需求即可,不需要了解具体做法;
SQL可优化,内置多种查询优化器,这些查询优化器可为SQL翻译出最优执行计划;
SQL易于理解,不同行业和领域的人都懂,学习成本较低;
SQL非常稳定,在数据库30多年的历史中,SQL本身变化较少;
流与批的统一,Flink底层Runtime本身就是一个流与批统一的引擎,而SQL可以做到API层的流与批统一。
1.FlinkSQL常用算子
SELECT:
SELECT用于从DataSet/DataStream中选择数据,用于筛选出某些列。
示例:
SELECT*FROMTable;//取出表中的所有列
SELECTname,ageFROMTable;//取出表中name和age两列
与此同时SELECT语句中可以使用函数和别名,例如我们上面提到的WordCount中:
SELECTword,COUNT(word)FROMtableGROUPBYword;
WHERE:
WHERE用于从数据集/流中过滤数据,与SELECT一起使用,用于根据某些条件对关系做水平分割,即选择符合条件的记录。
SELECTname,ageFROMTablewherenameLIKE‘%小明%’;
SELECT*FROMTableWHEREage=20;
WHERE是从原数据中进行过滤,那么在WHERE条件中,FlinkSQL同样支持=、<、>、、>=、<=,以及AND、OR等表达式的组合,最终满足过滤条件的数据会被选择出来。并且WHERE可以结合IN、NOTIN联合使用。举个例子:
SELECTname,ageFROMTableWHEREnameIN(SELECTnameFROMTable2)
DISTINCT:
DISTINCT用于从数据集/流中去重根据SELECT的结果进行去重。
SELECTDISTINCTnameFROMTable;
对于流式查询,计算查询结果所需的State可能会无限增长,用户需要自己控制查询的状态范围,以防止状态过大。
GROUPBY:
GROUPBY是对数据进行分组操作。例如我们需要计算成绩明细表中,每个学生的总分。
SELECTname,SUM(score)asTotalScoreFROMTableGROUPBYname;
UNION和UNIONALL:
UNION用于将两个结果集合并起来,要求两个结果集字段完全一致,包括字段类型、字段顺序。不同于UNIONALL的是,UNION会对结果数据去重。
SELECT*FROMT1UNION(ALL)SELECT*FROMT2;
JOIN:
JOIN用于把来自两个表的数据联合起来形成结果表,Flink支持的JOIN类型包括:
JOIN-INNERJOIN
LEFTJOIN-LEFTOUTERJOIN
RIGHTJOIN-RIGHTOUTERJOIN
FULLJOIN-FULLOUTERJOIN
这里的JOIN的语义和我们在关系型数据库中使用的JOIN语义一致。
JOIN(将订单表数据和商品表进行关联)
SELECT*FROMOrdersINNERJOINProductONOrders.productId=Product.id
LEFTJOIN与JOIN的区别是当右表没有与左边相JOIN的数据时候,右边对应的字段补NULL输出,RIGHTJOIN相当于LEFTJOIN左右两个表交互一下位置。FULLJOIN相当于RIGHTJOIN和LEFTJOIN之后进行UNIONALL操作。
SELECT*FROMOrdersLEFTJOINProductONOrders.productId=Product.idSELECT*FROMOrdersRIGHTJOINProductONOrders.productId=Product.idSELECT*FROMOrdersFULLOUTERJOINProductONOrders.productId=Product.id
GroupWindow:
根据窗口数据划分的不同,目前ApacheFlink有如下3种BoundedWindow:
Tumble,滚动窗口,窗口数据有固定的大小,窗口数据无叠加;
Hop,滑动窗口,窗口数据有固定大小,并且有固定的窗口重建频率,窗口数据有叠加;
Session,会话窗口,窗口数据没有固定的大小,根据窗口数据活跃程度划分窗口,窗口数据无叠加。
TumbleWindow:
Tumble滚动窗口有固定大小,窗口数据不重叠,具体语义如下:
Tumble滚动窗口对应的语法如下:
SELECT[gk],[TUMBLE_START(timeCol,size)],[TUMBLE_END(timeCol,size)],agg1(col1),...aggn(colN)FROMTab1GROUPBY[gk],TUMBLE(timeCol,size)
其中:
[gk]决定了是否需要按照字段进行聚合;
size表示窗口的大小,如秒、分钟、小时、天。
举个例子,假如我们要计算每个人每天的订单量,按照user进行聚合分组:
SELECTuser,TUMBLE_START(rowtime,INTERVAL‘1’DAY)aswStart,SUM(amount)FROMOrdersGROUPBYTUMBLE(rowtime,INTERVAL‘1’DAY),user;
HopWindow:
Hop滑动窗口和滚动窗口类似,窗口有固定的size,与滚动窗口不同的是滑动窗口可以通过slide参数控制滑动窗口的新建频率。因此当slide值小于窗口size的值的时候多个滑动窗口会重叠,具体语义如下:
Hop滑动窗口对应语法如下:
SELECT[gk],[HOP_START(timeCol,slide,size)],[HOP_END(timeCol,slide,size)],agg1(col1),...aggN(colN)FROMTab1GROUPBY[gk],HOP(timeCol,slide,size)
每次字段的意思和Tumble窗口类似:
slide表示每次窗口滑动的大小;
size表示整个窗口的大小,如秒、分钟、小时、天。
举例说明,我们要每过一小时计算一次过去24小时内每个商品的销量:
SessionWindow:
Seeeion会话窗口对应语法如下:
SELECT[gk],SESSION_START(timeCol,gap)ASwinStart,SESSION_END(timeCol,gap)ASwinEnd,agg1(col1),...aggn(colN)FROMTab1GROUPBY[gk],SESSION(timeCol,gap)
gap表示窗口数据非活跃周期的时长。
SELECTuser,SESSION_START(rowtime,INTERVAL‘12’HOUR)ASsStart,SESSION_ROWTIME(rowtime,INTERVAL‘12’HOUR)ASsEnd,SUM(amount)FROMOrdersGROUPBYSESSION(rowtime,INTERVAL‘12’HOUR),user
TableAPI和SQL捆绑在flink-tableMaven工件中。必须将以下依赖项添加到你的项目才能使用TableAPI和SQL:
另外,你需要为Flink的Scala批处理或流式API添加依赖项。对于批量查询,您需要添加:
2.FlinkSQL实战案例1)批数据SQL
用法:
构建Table运行环境将DataSet注册为一张表使用Table运行环境的sqlQuery方法来执行SQL语句
示例:使用FlinkSQL统计用户消费订单的总金额、最大金额、最小金额、订单总数。
订单id用户名订单日期消费金额1Zhangsan2018-10-2015:30358.5
测试数据(订单ID、用户名、订单日期、订单金额):
Order(1,"zhangsan","2018-10-2015:30",358.5),Order(2,"zhangsan","2018-10-2016:30",131.5),Order(3,"lisi","2018-10-2016:30",127.5),Order(4,"lisi","2018-10-2016:30",328.5),Order(5,"lisi","2018-10-2016:30",432.5),Order(6,"zhaoliu","2018-10-2022:30",451.0),Order(7,"zhaoliu","2018-10-2022:30",362.0),Order(8,"zhaoliu","2018-10-2022:30",364.0),Order(9,"zhaoliu","2018-10-2022:30",341.0)
步骤:
获取一个批处理运行环境获取一个Table运行环境创建一个样例类Order用来映射数据(订单名、用户名、订单日期、订单金额)基于本地Order集合创建一个DataSetsource使用Table运行环境将DataSet注册为一张表使用SQL语句来操作数据(统计用户消费订单的总金额、最大金额、最小金额、订单总数)使用TableEnv.toDataSet将Table转换为DataSet打印测试
流处理中也可以支持SQL。但是需要注意以下几点:
示例:使用FlinkSQL来统计5秒内用户的订单总数、订单的最大金额、订单的最小金额。
步骤
我们在看直播的时候,不管对于主播还是用户来说,非常重要的一项就是弹幕文化。为了增加直播趣味性和互动性,各大网络直播平台纷纷采用弹窗弹幕作为用户实时交流的方式,内容丰富且形式多样的弹幕数据中隐含着复杂的用户属性与用户行为,研究并理解在线直播平台用户具有弹幕内容审核与监控、舆论热点预测、个性化摘要标注等多方面的应用价值。
本文不分析弹幕数据的应用价值,只通过弹幕内容审核与监控案例来了解下FlinkCEP的概念及功能。
在用户发弹幕时,直播平台主要实时监控识别两类弹幕内容:一类是发布不友善弹幕的用户,一类是刷屏的用户。
我们先记住上述需要实时监控识别的两类用户,接下来介绍FlinkCEP的API,然后使用CEP解决上述问题。
本文首发于公众号【五分钟学大数据】,大数据领域原创技术号
1.FlinkCEP是什么
2.FlinkCEPAPI
CEPAPI的核心是Pattern(模式)API,它允许你快速定义复杂的事件模式。每个模式包含多个阶段(stage)或者我们也可称为状态(state)。从一个状态切换到另一个状态,用户可以指定条件,这些条件可以作用在邻近的事件或独立事件上。
介绍API之前先来理解几个概念:
1)模式与模式序列
简单模式称为模式,将最终在数据流中进行搜索匹配的复杂模式序列称为模式序列,每个复杂模式序列是由多个简单模式组成。
每个模式必须具有唯一的名称,我们可以使用模式名称来标识该模式匹配到的事件。
2)单个模式
一个模式既可以是单例的,也可以是循环的。单例模式接受单个事件,循环模式可以接受多个事件。
3)模式示例:
有如下模式:ab+cd
其中a,b,c,d这些字母代表的是模式,+代表循环,b+就是循环模式;代表可选,c就是可选模式;
所以上述模式的意思就是:a后面可以跟一个或多个b,后面再可选的跟c,最后跟d。
其中a、c、d是单例模式,b+是循环模式。
一般情况下,模式都是单例模式,可以使用量词(Quantifiers)将其转换为循环模式。
每个模式可以带有一个或多个条件,这些条件是基于事件接收进行定义的。或者说,每个模式通过一个或多个条件来匹配和接收事件。
了解完上述概念后,接下来介绍下案例中需要用到的几个CEPAPI:
4)案例中用到的CEPAPI:
Begin:定义一个起始模式状态
用法:start=Pattern.
Next:附加一个新的模式状态。匹配事件必须直接接续上一个匹配事件
用法:next=start.next("next");
Where:定义当前模式状态的过滤条件。仅当事件通过过滤器时,它才能与状态匹配
用法:patternState.where(_.message=="yyds");
用法:patternState.within(Time.seconds(10));
Times:一个给定类型的事件出现了指定次数
用法:patternState.times(5);
3.监测用户弹幕行为案例案例一:监测恶意用户
规则:用户如果在10s内,同时输入TMD超过5次,就认为用户为恶意攻击,识别出该用户。
使用FlinkCEP检测恶意用户:
规则:用户如果在10s内,同时连续输入同样一句话超过5次,就认为是恶意刷屏。
使用FlinkCEP检测刷屏用户
除了案例中介绍的几个API外,我们在介绍下其他的常用API:
1)条件API
为了让传入事件被模式所接受,给模式指定传入事件必须满足的条件,这些条件由事件本身的属性或者前面匹配过的事件的属性统计量等来设定。比如,事件的某个值大于5,或者大于先前接受事件的某个值的平均值。
可以使用pattern.where()、pattern.or()、pattern.until()方法来指定条件。条件既可以是迭代条件IterativeConditions,也可以是简单条件SimpleConditions。
FlinkCEP支持事件之间的三种临近条件:
next():严格的满足条件
followedBy():松散的满足条件
followedByAny():非确定的松散满足条件
2)量词API
还记得我们在上面讲解模式的概念时说过的一句话嘛:一般情况下,模式都是单例模式,可以使用量词(Quantifiers)将其转换为循环模式。这里的量词就是指的量词API。
以下这几个量词API,可以将模式指定为循环模式:
pattern.oneOrMore():一个给定的事件有一次或多次出现,例如上面提到的b+。
pattern.times(#ofTimes):一个给定类型的事件出现了指定次数,例如4次。
pattern.times(#fromTimes,#toTimes):一个给定类型的事件出现的次数在指定次数范围内,例如2~4次。
可以使用pattern.greedy()方法将模式变成循环模式,但是不能让一组模式都变成循环模式。greedy:就是尽可能的重复。
使用pattern.optional()方法将循环模式变成可选的,即可以是循环模式也可以是单个模式。
3)匹配后的跳过策略
所谓的匹配跳过策略,是对多个成功匹配的模式进行筛选。也就是说如果多个匹配成功,可能我不需要这么多,按照匹配策略,过滤下就可以。
Flink中有五种跳过策略:
NO_SKIP:不过滤,所有可能的匹配都会被发出。
SKIP_TO_NEXT:丢弃与开始匹配到的事件相同的事件,发出开始匹配到的事件,即直接跳到下一个模式匹配到的事件,以此类推。
SKIP_PAST_LAST_EVENT:丢弃匹配开始后但结束之前匹配到的事件。
SKIP_TO_FIRST[PatternName]:丢弃匹配开始后但在PatternName模式匹配到的第一个事件之前匹配到的事件。
SKIP_TO_LAST[PatternName]:丢弃匹配开始后但在PatternName模式匹配到的最后一个事件之前匹配到的事件。
怎么理解上述策略,我们以NO_SKIP和SKIP_PAST_LAST_EVENT为例讲解下:
5.FlinkCEP的使用场景
除上述案例场景外,FlinkCEP还广泛用于网络欺诈,故障检测,风险规避,智能营销等领域。
1)实时反作弊和风控
对于电商来说,羊毛党是必不可少的,国内拼多多曾爆出100元的无门槛券随便领,当晚被人褥几百亿,对于这种情况肯定是没有做好及时的风控。另外还有就是商家上架商品时通过频繁修改商品的名称和滥用标题来提高搜索关键字的排名、批量注册一批机器账号快速刷单来提高商品的销售量等作弊行为,各种各样的作弊手法也是需要不断的去制定规则去匹配这种行为。
2)实时营销
3)实时网络攻击检测
当下互联网安全形势仍然严峻,网络攻击屡见不鲜且花样众多,这里我们以DDOS(分布式拒绝服务攻击)产生的流入流量来作为遭受攻击的判断依据。对网络遭受的潜在攻击进行实时检测并给出预警,云服务厂商的多个数据中心会定时向监控中心上报其瞬时流量,如果流量在预设的正常范围内则认为是正常现象,不做任何操作;如果某数据中心在10秒内连续5次上报的流量超过正常范围的阈值,则触发一条警告的事件;如果某数据中心30秒内连续出现30次上报的流量超过正常范围的阈值,则触发严重的告警。
6.FlinkCEP的原理简单介绍
ApacheFlink在实现CEP时借鉴了EfficientPatternMatchingoverEventStreams论文中NFA的模型,在这篇论文中,还提到了一些优化,我们在这里先跳过,只说下NFA的概念。
在这篇论文中,提到了NFA,也就是Non-determinedFiniteAutomaton,叫做不确定的有限状态机,指的是状态有限,但是每个状态可能被转换成多个状态(不确定)。
非确定有限自动状态机
先介绍两个概念:
状态:状态分为三类,起始状态、中间状态和最终状态。
转换:take/ignore/proceed都是转换的名称。
在NFA匹配规则里,本质上是一个状态转换的过程。三种转换的含义如下所示:
Take:主要是条件的判断,当过来一条数据进行判断,一旦满足条件,获取当前元素,放入到结果集中,然后将当前状态转移到下一个的状态。
Proceed:当前的状态可以不依赖任何的事件转移到下一个状态,比如说透传的意思。
Ignore:当一条数据到来的时候,可以忽略这个消息事件,当前的状态保持不变,相当于自己到自己的一个状态。
NFA的特点:在NFA中,给定当前状态,可能有多个下一个状态。可以随机选择下一个状态,也可以并行(同时)选择下一个状态。输入符号可以为空。
7.规则引擎
规则引擎:将业务决策从应用程序代码中分离出来,并使用预定义的语义模块编写业务决策。接受数据输入,解释业务规则,并根据业务规则做出业务决策。使用规则引擎可以通过降低实现复杂业务逻辑的组件的复杂性,降低应用程序的维护和可扩展性成本。
1)Drools
Drools是一款使用Java编写的开源规则引擎,通常用来解决业务代码与业务规则的分离,它内置的DroolsFusion模块也提供CEP的功能。
优势:
功能较为完善,具有如系统监控、操作平台等功能。规则支持动态更新。
劣势:
Aviator是一个高性能、轻量级的Java语言实现的表达式求值引擎,主要用于各种表达式的动态求值。
支持大部分运算操作符。支持函数调用和自定义函数。支持正则表达式匹配。支持传入变量并且性能优秀。
没有ifelse、dowhile等语句,没有赋值语句,没有位运算符。3)EasyRules
EasyRules集成了MVEL和SpEL表达式的一款轻量级规则引擎。
轻量级框架,学习成本低。基于POJO。为定义业务引擎提供有用的抽象和简便的应用。支持从简单的规则组建成复杂规则。4)Esper
Esper设计目标为CEP的轻量级解决方案,可以方便的嵌入服务中,提供CEP功能。
轻量级可嵌入开发,常用的CEP功能简单好用。EPL语法与SQL类似,学习成本较低。
Flink是一个流式系统,具有高吞吐低延迟的特点,FlinkCEP是一套极具通用性、易于使用的实时流式事件处理方案。
CDC是ChangeDataCapture(变更数据获取)的简称。核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。
在广义的概念上,只要能捕获数据变更的技术,我们都可以称为CDC。通常我们说的CDC技术主要面向数据库的变更,是一种用于捕获数据库中数据变更的技术。
CDC技术应用场景非常广泛:
数据同步,用于备份,容灾;
数据分发,一个数据源分发给多个下游;
数据采集(E),面向数据仓库/数据湖的ETL数据集成。
2.CDC的种类
CDC主要分为基于查询和基于Binlog两种方式,我们主要了解一下这两种之间的区别:
基于查询的CDC基于Binlog的CDC开源产品Sqoop、KafkaJDBCSourceCanal、Maxwell、Debezium执行模式BatchStreaming是否可以捕获所有数据变化否是延迟性高延迟低延迟是否增加数据库压力是否3.传统CDC与FlinkCDC对比1)传统CDCETL分析
2)基于FlinkCDC的ETL分析
2)基于FlinkCDC的聚合分析
2)基于FlinkCDC的数据打宽
4.Flink-CDC案例
Flink社区开发了flink-cdc-connectors组件,这是一个可以直接从MySQL、PostgreSQL等数据库直接读取全量数据和增量变更数据的source组件。
虽然实时计算在最近几年才火起来,但是在早期也有部分公司有实时计算的需求,但是数据量比较少,所以在实时方面形成不了完整的体系,基本所有的开发都是具体问题具体分析,来一个需求做一个,基本不考虑它们之间的关系,开发形式如下:
早期实时计算
如上图所示,拿到数据源后,会经过数据清洗,扩维,通过Flink进行业务逻辑处理,最后直接进行业务输出。把这个环节拆开来看,数据源端会重复引用相同的数据源,后面进行清洗、过滤、扩维等操作,都要重复做一遍,唯一不同的是业务的代码逻辑是不一样的。
随着产品和业务人员对实时数据需求的不断增多,这种开发模式出现的问题越来越多:
数据指标越来越多,“烟囱式”的开发导致代码耦合问题严重。
需求越来越多,有的需要明细数据,有的需要OLAP分析。单一的开发模式难以应付多种需求。
每个需求都要申请资源,导致资源成本急速膨胀,资源不能集约有效利用。
缺少完善的监控系统,无法在对业务产生影响之前发现并修复问题。
大家看实时数仓的发展和出现的问题,和离线数仓非常类似,后期数据量大了之后产生了各种问题,离线数仓当时是怎么解决的离线数仓通过分层架构使数据解耦,多个业务可以共用数据,实时数仓是否也可以用分层架构呢当然是可以的,但是细节上和离线的分层还是有一些不同,稍后会讲到。
2.实时数仓建设
从方法论来讲,实时和离线是非常相似的,离线数仓早期的时候也是具体问题具体分析,当数据规模涨到一定量的时候才会考虑如何治理。分层是一种非常有效的数据治理方式,所以在实时数仓如何进行管理的问题上,首先考虑的也是分层的处理逻辑。
实时数仓的架构如下图:
实时数仓架构
从上图中我们具体分析下每层的作用:
数据源:在数据源的层面,离线和实时在数据源是一致的,主要分为日志类和业务类,日志类又包括用户日志,埋点日志以及服务器日志等。
实时明细层:在明细层,为了解决重复建设的问题,要进行统一构建,利用离线数仓的模式,建设统一的基础明细数据层,按照主题进行管理,明细层的目的是给下游提供直接可用的数据,因此要对基础层进行统一的加工,比如清洗、过滤、扩维等。
汇总层:汇总层通过Flink的简洁算子直接可以算出结果,并且形成汇总指标池,所有的指标都统一在汇总层加工,所有人按照统一的规范管理建设,形成可复用的汇总结果。
我们可以看出,实时数仓和离线数仓的分层非常类似,比如数据源层,明细层,汇总层,乃至应用层,他们命名的模式可能都是一样的。但仔细比较不难发现,两者有很多区别:
与离线数仓相比,实时数仓的层次更少一些:
从目前建设离线数仓的经验来看,数仓的数据明细层内容会非常丰富,处理明细数据外一般还会包含轻度汇总层的概念,另外离线数仓中应用层数据在数仓内部,但实时数仓中,app应用层数据已经落入应用系统的存储介质中,可以把该层与数仓的表分离。
应用层少建设的好处:实时处理数据的时候,每建一个层次,数据必然会产生一定的延迟。
与离线数仓相比,实时数仓的数据源存储不同:
在建设离线数仓的时候,基本整个离线数仓都是建立在Hive表之上。但是,在建设实时数仓的时候,同一份表,会使用不同的方式进行存储。比如常见的情况下,明细数据或者汇总数据都会存在Kafka里面,但是像城市、渠道等维度信息需要借助Hbase,MySQL或者其他KV存储等数据库来进行存储。3.Lambda架构的实时数仓
Lambda和Kappa架构的概念已在前文中解释,不了解的小伙伴可点击链接:一文读懂大数据实时计算
下图是基于Flink和Kafka的Lambda架构的具体实践,上层是实时计算,下层是离线计算,横向是按计算引擎来分,纵向是按实时数仓来区分:
Lambda架构的实时数仓
Lambda架构是比较经典的架构,以前实时的场景不是很多,以离线为主,当附加了实时场景后,由于离线和实时的时效性不同,导致技术生态是不一样的。Lambda架构相当于附加了一条实时生产链路,在应用层面进行一个整合,双路生产,各自独立。这在业务应用中也是顺理成章采用的一种方式。
双路生产会存在一些问题,比如加工逻辑double,开发运维也会double,资源同样会变成两个资源链路。因为存在以上问题,所以又演进了一个Kappa架构。
4.Kappa架构的实时数仓
Kappa架构相当于去掉了离线计算部分的Lambda架构,具体如下图所示:
Kappa架构的实时数仓
Kappa架构从架构设计来讲比较简单,生产统一,一套逻辑同时生产离线和实时。但是在实际应用场景有比较大的局限性,因为实时数据的同一份表,会使用不同的方式进行存储,这就导致关联时需要跨数据源,操作数据有很大局限性,所以在业内直接用Kappa架构生产落地的案例不多见,且场景比较单一。
关于Kappa架构,熟悉实时数仓生产的同学,可能会有一个疑问。因为我们经常会面临业务变更,所以很多业务逻辑是需要去迭代的。之前产出的一些数据,如果口径变更了,就需要重算,甚至重刷历史数据。对于实时数仓来说,怎么去解决数据重算问题
5.流批结合的实时数仓
随着实时OLAP技术的发展,目前开源的OLAP引擎在性能,易用等方面有了很大的提升,如Doris、Presto等,加上数据湖技术的迅速发展,使得流批结合的方式变得简单。
如下图是流批结合的实时数仓:
流批结合的实时数仓
数据从日志统一采集到消息队列,再到实时数仓,作为基础数据流的建设是统一的。之后对于日志类实时特征,实时大屏类应用走实时流计算。对于Binlog类业务分析走实时OLAP批处理。
我们看到流批结合的方式与上面几种架构的存储方式发生了变化,由Kafka换成了Iceberg,Iceberg是介于上层计算引擎和底层存储格式之间的一个中间层,我们可以把它定义成一种“数据组织格式”,底层存储还是HDFS,那么为什么加了中间层,就对流批结合处理的比较好了呢Iceberg的ACID能力可以简化整个流水线的设计,降低整个流水线的延迟,并且所具有的修改、删除能力能够有效地降低开销,提升效率。Iceberg可以有效支持批处理的高吞吐数据扫描和流计算按分区粒度并发实时处理。
十二、Flink面试题1.Flink的容错机制(checkpoint)
Checkpoint机制是Flink可靠性的基石,可以保证Flink集群在某个算子因为某些原因(如异常退出)出现故障时,能够将整个应用流图的状态恢复到故障之前的某一状态,保证应用流图状态的一致性。Flink的Checkpoint机制原理来自“Chandy-Lamportalgorithm”算法。
每个需要Checkpoint的应用在启动时,Flink的JobManager为其创建一个CheckpointCoordinator(检查点协调器),CheckpointCoordinator全权负责本应用的快照制作。
CheckpointCoordinator(检查点协调器),CheckpointCoordinator全权负责本应用的快照制作。
CheckpointCoordinator(检查点协调器)周期性的向该流应用的所有source算子发送barrier(屏障)。
当某个source算子收到一个barrier时,便暂停数据处理过程,然后将自己的当前状态制作成快照,并保存到指定的持久化存储中,最后向CheckpointCoordinator报告自己快照制作情况,同时向自身所有下游算子广播该barrier,恢复数据处理
每个算子按照步骤3不断制作快照并向下游广播,直到最后barrier传递到sink算子,快照制作完成。
2.FlinkCheckpoint与Spark的相比,Flink有什么区别或优势吗
SparkStreaming的Checkpoint仅仅是针对Driver的故障恢复做了数据和元数据的Checkpoint。而Flink的Checkpoint机制要复杂了很多,它采用的是轻量级的分布式快照,实现了每个算子的快照,及流动中的数据的快照。
3.Flink中的Time有哪几种
4.对于迟到数据是怎么处理的
保存延迟数据则是通过sideOutputLateData(outputTag:OutputTag[T])保存
获取延迟数据是通过DataStream.getSideOutput(tag:OutputTag[X])获取
Flink中极其重要的Time与Window详细解析
5.Flink的运行必须依赖Hadoop组件吗
Flink可以完全独立于Hadoop,在不依赖Hadoop组件下运行。但是做为大数据的基础设施,Hadoop体系是任何大数据框架都绕不过去的。Flink可以集成众多Hadooop组件,例如Yarn、Hbase、HDFS等等。例如,Flink可以和Yarn集成做资源调度,也可以读写HDFS,或者利用HDFS做检查点。
6.Flink集群有哪些角色各自有什么作用
有以下三个角色:
JobManager处理器:
也称之为Master,用于协调分布式执行,它们用来调度task,协调检查点,协调失败时恢复等。Flink运行时至少存在一个master处理器,如果配置高可用模式则会存在多个master处理器,它们其中有一个是leader,而其他的都是standby。
TaskManager处理器:
也称之为Worker,用于执行一个dataflow的task(或者特殊的subtask)、数据缓冲和datastream的交换,Flink运行时至少会存在一个worker处理器。
Clint客户端:
Client是Flink程序提交的客户端,当用户提交一个Flink程序时,会首先创建一个Client,该Client首先会对用户提交的Flink程序进行预处理,并提交到Flink集群中处理,所以Client需要从用户提交的Flink程序配置中获取JobManager的地址,并建立到JobManager的连接,将FlinkJob提交给JobManager
7.Flink资源管理中TaskSlot的概念
在Flink中每个TaskManager是一个JVM的进程,可以在不同的线程中执行一个或多个子任务。为了控制一个worker能接收多少个task。worker通过taskslot(任务槽)来进行控制(一个worker至少有一个taskslot)。
8.Flink的重启策略了解吗
Flink支持不同的重启策略,这些重启策略控制着job失败后如何重启:
固定延迟重启策略
失败率重启策略
无重启策略
Job直接失败,不会尝试进行重启。
9.Flink是如何保证Exactly-once语义的
Flink通过实现两阶段提交和状态保存来实现端到端的一致性语义。分为以下几个步骤:
开始事务(beginTransaction)创建一个临时文件夹,来写把数据写入到这个文件夹里面
预提交(preCommit)将内存中缓存的数据写入文件并关闭
正式提交(commit)将之前写完的临时文件放入目标目录下。这代表着最终的数据会有一些延迟
丢弃(abort)丢弃临时文件
若失败发生在预提交成功后,正式提交前。可以根据状态来提交预提交的数据,也可删除预提交的数据。
八张图搞懂Flink端到端精准一次处理语义Exactly-once
10.如果下级存储不支持事务,Flink怎么保证exactly-once
端到端的exactly-once对sink要求比较高,具体实现主要有幂等写入和事务性写入两种方式。
幂等写入的场景依赖于业务逻辑,更常见的是用事务性写入。而事务性写入又有预写日志(WAL)和两阶段提交(2PC)两种方式。
如果外部系统不支持事务,那么可以用预写日志的方式,把结果数据先当成状态保存,然后在收到checkpoint完成的通知时,一次性写入sink系统。
11.Flink是如何处理反压的
Flink内部是基于producer-consumer模型来进行消息传递的,Flink的反压设计也是基于这个模型。Flink使用了高效有界的分布式阻塞队列,就像Java通用的阻塞队列(BlockingQueue)一样。下游消费者消费变慢,上游就会受到阻塞。
12.Flink中的状态存储
Flink在做计算的过程中经常需要存储中间状态,来避免数据丢失和状态恢复。选择的状态存储策略不同,会影响状态持久化如何和checkpoint交互。Flink提供了三种状态存储方式:MemoryStateBackend、FsStateBackend、RocksDBStateBackend。
13.Flink是如何支持流批一体的
这道题问的比较开阔,如果知道Flink底层原理,可以详细说说,如果不是很了解,就直接简单一句话:Flink的开发者认为批处理是流处理的一种特殊情况。批处理是有限的流处理。Flink使用一个引擎支持了DataSetAPI和DataStreamAPI。
14.Flink的内存管理是如何做的
Flink并不是将大量对象存在堆上,而是将对象都序列化到一个预分配的内存块上。此外,Flink大量的使用了堆外内存。如果需要处理的数据超出了内存限制,则会将部分数据存储到硬盘上。Flink为了直接操作二进制数据实现了自己的序列化框架。
15.FlinkCEP编程中当状态没有到达的时候会将数据保存在哪里
在流式处理中,CEP当然是要支持EventTime的,那么相对应的也要支持数据的迟到现象,也就是watermark的处理逻辑。CEP对未匹配成功的事件序列的处理,和迟到数据是类似的。在FlinkCEP的处理逻辑中,状态没有满足的和迟到的数据,都会存储在一个Map数据结构中,也就是说,如果我们限定判断事件序列的时长为5分钟,那么内存中就会存储5分钟的数据,这在我看来,也是对内存的极大损伤之一。