FlinkCDC:新一代实时数据集成框架

FlinkCDC是一个数据集成框架,它基于数据库日志的CDC(变更数据捕获)技术实现了统一的增量和全量数据读取。结合Flink出色的管道能力和丰富的上下游生态系统,FlinkCDC可以高效地实现海量数据的实时集成。

(1)FlinkCDC使用场景

FlinkCDC可以应用在多种场景中。比如数据同步,可以将上游数据库中的数据同步至下游数据仓库、数据湖等。用户还可以借助FlinkCDCsource实现实时物化视图,结合下游Flink作业处理逻辑实现更丰富的业务场景。此外用户还可以使用FlinkCDC捕获的变更数据基于业务逻辑进行数据分发。

作为一款数据集成框架,FlinkCDC对接了非常丰富的上下游数据库、数据湖仓和消息队列等外部系统,如MySQL、PostgreSQL、Kafka、Paimon等。

(2)与传统数据集成流水线比较

一个传统的数据集成流水线通常由两套系统构成:全量同步和增量同步。其中全量同步会使用DataX、Sqoop等系统,增量同步需要使用另外一套系统,如Debezium、Canal等等。在全量同步完成后,可能还需要额外的一步合并操作将增量表和全量表进行合并,最终得到与上游一致的快照。这种架构的组件构成较为复杂,为系统维护带来了很多困难。

相比于传统数据集成流水线,FlinkCDC提供了全量和增量一体化同步的解决方案,对于一个同步任务,只需使用一个Flink作业即可将上游的全量数据和增量数据一致地同步到下游系统。此外FlinkCDC使用了增量快照算法,无需任何额外配置即可实现全量和增量数据的无缝切换。

FlinkCDC诞生于2020年7月,中间经过不断迭代优化,发布了多个大版本。2021年8月,FlinkCDC发布了2.0版本,首次为MySQLCDCsource引入增量快照算法,实现了全增量同步无缝切换。2022年11月,FlinkCDC发布2.3版本,将大多数connector对接至增量快照框架。2023年12月,FlinkCDC推出3.0版本,正式将FlinkCDC项目升级为实时数据集成框架,提供YAMLAPI,为数据同步提供端到端解决方案。

(1)FlinkCDCYAML

在FlinkCDC2.x的时代,FlinkCDC只提供一些Flinksource,用户仍然需要自己开发FlinkDataStream或SQL作业实现数据同步逻辑。如果用户对Flink不够熟悉,经常会遇到棘手的数据正确性和乱序问题。此外FlinkCDC2.x不支持schema变更,而schema变更是用户的业务系统中很常见而且很重要的场景。通过对用户使用场景的调研,我们发现绝大多数使用FlinkCDC的作业都是较为简单的数据ETL。结合上述问题,我们决定为用户提供一个全新的框架,设计一套全新的API,专注于数据同步场景。

(2)FlinkCDC整体设计

FlinkCDC基于Flinkruntime实现,因此可以充分复用Flink的资源管理和在不同环境上部署的能力。针对各种数据集成场景,FlinkCDC深度定制了多种自定义算子,如schemaoperator、router、transformer等。为了将不同的算子进行协调和组合,FlinkCDC引入了composer组件,可根据用户定义的数据同步逻辑构建Flink作业。依托于Flink丰富的生态系统,开发者只需简单地封装即可快速将现有的Flinkconnector对接至FlinkCDC。此外Flink还提供了CLI,只需一个脚本即可将用户的YAML定义使用composer构建成Flink作业,并提交至指定Flink集群。基于以上架构,FlinkCDC为数据集成用户提供schema变更同步、整库同步、分库分表同步等增强能力。

(3)FlinkCDCAPI

FlinkCDCAPI使用YAML语法定义数据同步任务,即易于开发者进行手动开发,又可以高效地使用机器进行处理。YAMLAPI针对数据集成场景设计,用户只需定义同步数据源和数据目标端即可快速搭建起一个实时同步流水线。此外用户还可以在YAML中定义routing和transformation实现自定义数据分发和变换。用户不再需要熟练掌握Flink作业开发与内部实现,即可使用FlinkCDC搭建实时数据集成流水线。

FlinkCDC提供的CLI(flink-cdc.sh)进一步简化了用户提交FlinkCDC任务的流程。用户只需执行一行命令,CDCcomposer会将source、sink、自定义CDCruntime构建成Flink任务,创建FlinkJobGraph后提交至Flink集群。

(4)FlinkCDCPipeline连接器

FlinkCDC定义了自己的数据源和目标端连接器的接口,以适配FlinkCDC内部的数据结构。FlinkCDCpipelineconnector基于Flinkconnector,只需进行简单的数据转换封装,即可快速复用现有的Flinkconnector,将其对接到FlinkCDC生态系统中。为了实现schema变更处理能力,FlinkCDC定义了MetadataAccessor和MetadataApplier,分别对源端和目标端的schema等元信息进行获取和处理,实现schema变更的实时同步。

(5)FlinkCDCSource增量快照算法

为了实现全量和增量的一体化同步,FlinkCDCsource使用增量快照算法,既实现了全增量同步的无缝切换,而且采用了无锁设计,避免全量同步时的锁表动作对上游业务的影响。在增量快照算法中,数据库的全量数据被切分为独立的数据块(chunk),分发给source的各个并发进行读取。考虑到在全量读取过程中数据还有可能发生变化,在开始读取前,source将binlog的当前位点记为低水位线(lowwatermark),在全量读取结束后再次将binlog最新位点记录为高水位线(highwatermark),随后读取高、低水位线之间的变更数据,将其合并到已读取的全量数据块中,从而构建一个与上游完全一致的数据块。在完成全部数据块的读取之后,source会根据记录的高水位线确定切换位点,实现全量和增量的无缝切换。

(6)FlinkCDC对Schema变更的支持

FlinkCDC通过定制化的schemaoperator以及schemaregistry的协调,实现对上游schema变更的实时同步。当schemaoperator感知到上游发生schema变更后,会将变更信息同步给schemaregistry,并暂停数据流的处理。schemaregistry首先插入Flush事件将下游数据全部从sink推出,在收到全部sink的确认后,通过MetadataApplier将schema变更应用在下游系统中,在完成schema变更后,schemaregistry通知schemaoperator,并恢复数据流处理,完成整个schema变更的流程。

(7)FlinkCDC对数据分发处理的支持

FlinkCDC定制了router算子,实现对变更数据的分发和合并。用户可以在YAML中使用route字段修改变更数据的目标数据库和表名,将数据同步至指定目标端,同样也可以通过指定多对一的路由规则,将多个表合并为目标端中的一张表。

(8)FlinkCDC对数据变换的支持

通过在YAML中使用transform字段,用户可以在数据流上定义投影、过滤、增加元信息列等数据变换操作,调整数据内容后同步至下游。transform使用类SQL语法,既可以让用户简单上手开发,又保留了对更多类型变换支持的可扩展性。

(9)FlinkCDC数据结构

FlinkCDC在数据流中定义了数据和schema信息的协议:

数据流以CreateTableEvent开始来描述起始schema

后续所有的DataChangeEvent需要遵循其前方的schema

当schema发生变更时,需要在数据流中发送一个新的SchemaChangeEvent以描述schema变化

这种设计的优势在于实现了数据和schema的分离,大大降低了数据的序列化成本。此外FlinkCDC为数据变更事件使用了压缩的二进制格式,进一步提升了性能。

目前FlinkCDC已经有超过160位贡献者,项目获得5k+star,1000+commit。未来FlinkCDC将着力于扩展生态,对接至更多的外部系统,如PostgreSQL、Iceberg等,并且将支持更多的schema变更类型和数据类型。另外FlinkCDC也会持续提升生产稳定性,包括对异常处理方式进行自定义配置、提升测试覆盖等等。作为ApacheFlink的子项目,FlinkCDC使用与Flink一致的贡献流程。欢迎各位用户和贡献者在Flink邮件列表中咨询和讨论,使用ApacheJIRA创建issue,在GitHub上提交PR!

THE END
1.什么是数据增量同步?增量同步有哪些应用嘲?在数据库迁移过程中,全量迁移结合增量同步功能可以平滑迁移数据库,完成数据对象异构迁移与数据迁移,并持续跟踪采集源库变更,进行增量的数据同步,有效缩短停机窗口,降低迁移风险。 异构数据库同步 增量同步可以实现异构关系数据库之间的实时单向数据同步、双向数据同步、数据共享等,满足在不同应用程序之间分布和整合数据的需求https://www.fanruan.com/bw/doc/178516
2.深度解析在大数据领域,数据同步是一项至关重要的任务,它关乎到数据的一致性、实时性和完整性。 在数据同步的场景中,全量同步每次都会复制整个数据集,这在大规模数据迁移和实时性要求较高的场景下显得尤为低效。 数据增量同步是一种高效的数据同步策略,它只传输自上次同步以来数据库中发生变更的数据。这种同步方式能够显著减少数https://blog.csdn.net/2301_77993839/article/details/143877902
3.基于Spark的数据库增量准实时同步AET摘要:为了实现将传统关系型数据库中的增量数据快速导入同构或者异构目的库,在使用已有的增量提取方法的基础上,提出了通过增加并行度和流式计算的方法加快同步速度。此方法不仅支持插入、更新和删除的增量数据同步,而且可以抽取出数据库表结构信息动态支持表结构变更。与传统单点抽取方式相比,大大提高了目的库数据的新鲜度http://www.chinaaet.com/article/3000055765
4.实时同步任务任务创建《数据传输用户手册数据传输-实时同步任务:“引用参数组”配置项选取需引用的参数组。1)如参数组的使用场景是:任务常用的高级配置-自定义参数配置为参数组,可实现不同任务间的自定义参数复用,则“引用参数组”配置项选取需引用的参数组即可。2)如参数组的使用场景是:任务导入导出时对库名、Topic名等进行替换,除“引用参数组”配置https://study.sf.163.com/documents/read/easytransfer/easytransfer_realtime_sync_task.md
5.数据同步:Canal数据库增量订阅是阿里巴巴开源的一个数据同步工具,用于实时抓取数据库的增量日志,解析后推送到 MQ、Kafka、Storm 等消息队列中。Canal 主要用于数据库数据的增量订阅,实时获取数据库变更,将变更数据同步到目标数据存储中。 的优势 可以解决传统数据库数据同步的一些痛点,如实时性、准确性等,具有以下优势: https://www.jianshu.com/p/ca4c302e3bc1
6.数据库实时同步工具Syncnavigator详细使用教程注册机使用:https://jingyan.baidu.com/article/f71d6037cc7e301ab741d143.htmlSyncNavigator是一款高效的数据库同步工具,支持sqlserver数据库和mysql数据库,采用增量同步的方式实时保存数据库数据。SyncNavigator的基本功能:自动同步数据/定时同步数据无论是实时同步/24小时不间断同步。因为是增量同步,记录每次同步时间戳https://jingyan.baidu.com/article/fb48e8bec406986e632e146c.html
7.整库实时同步MySQL到StarRocks整库实时同步方案为您先进行全量数据迁移,然后再实时同步增量数据至目标端。本文为您介绍如何创建整库实时同步MySQL到StarRocks的任务。 前提条件 已完成数据源配置。您需要在数据集成同步任务配置前,配置好您需要同步的源端和目标端数据库,以便在同步任务配置过程中,可通过选择数据源名称来控制同步任务的读取和写入数据http://help.ceden.cn/?zh/dataworks/user-guide/configure-and-manage-real-time-synchronization-tasks
8.企业级数据迁移服务OMS数据库工具OceanBase 数据迁移工具(OceanBase Migration Service,OMS)是 OceanBase 数据库一站式数据传输和同步的产品。它支持多种关系型数据库、消息队列与 OceanBase 数据库之间的数据复制,是集数据迁移、实时数据同步和增量数据订阅于一体的数据传输服务,OMS 帮助您低风险、低成本、高效率的实现 OceanBase 的数据流通,助力构建安全https://www.oceanbase.com/product/oms
9.集成开源调度系统,支持分布式增量同步数据实时查看运行DataX集成可视化页面,选择数据源即可一键生成数据同步任务,支持批量创建RDBMS数据同步任务,集成开源调度系统,支持分布式、增量同步数据、实时查看运行日志、监控执行器资源、KILL运行进程、数据源信息加密等。https://gitee.com/WeiYe-Jing/datax-web
10.数据集成整库任务配置概览操作指南文档中心整库迁移支持来源端的数据及结构监控,可将源端所有库表下的全量或增量数据实时同步至目标端,同时支持目标端自动建表、字段变更同步等特性。支持 MySQL、Doris、DLC、Kafka 等数据源。 前提条件 1.已配置好来源及目标端的数据源以备后续任务使用。详情请参见数据源管理与配置方式。 https://cloud.tencent.com/document/product/1580/85931
11.消息队列RocketMQ顺序消息消息队列RocketMQ功能特性用户购买商品生成订单为例,此时若以普通消息发送,则下游订单系统可能消息消费顺序混乱,例如订单出库先执行,后生成订单,从而系统数据不正确。顺序消息就能解决此问题,上游系统发送顺序消息,下游订单系统在同一消费组,会依次按顺序消费,执行相应的逻辑。 典型场景二: 数据实时增量同步 https://ecloud.10086.cn/op-help-center/doc/article/67007
12.将Oracle同步到GaussDB(forMySQL)数据复制服务DRS数据库账号权限要求 在使用DRS进行同步时,连接源库和目标库的数据库账号需要满足以下权限要求,才能启动实时同步任务。不同类型的同步任务,需要的账号权限也不同,详细可参考表2进行赋权。DRS会在“预检查”阶段对数据库账号权限进行自动检查,并给出处理建议。 https://support.huaweicloud.com/intl/zh-cn/realtimesyn-drs/drs_04_0110.html
13.Dinky在Doris实时整库同步和模式演变的探索实践对于上述架构存在的问题,Flink CDC 的出现为数据入仓入湖架构提供了一些新思路。借助 Flink CDC 技术的全增量一体化实时同步能力,结合 Doris 提供的更新能力,整个架构变得非常简洁。我们可以直接使用 Flink CDC 读取 MySQL 的全量和增量数据,并直接写入和更新到 Doris 中。 https://blog.51cto.com/zhangxueliang/12910802
14.深入浅出阿里数据同步神器:Canal原理+配置+实战全网最全解析canal翻译为管道,主要用途是基于 MySQL 数据库的增量日志 Binlog 解析,提供增量数据订阅和消费。 早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业https://maimai.cn/article/detail?fid=1779084320&efid=Mz5tMXwzMK7UBT3YsxxvGw
15.4种MySQL数据同步ES方案详解Mysql对数据库有一定的轮询压力,一种改进方法是将轮询放到压力不大的从库上。 经典方案:借助 Logstash 实现数据同步,其底层实现原理就是根据配置定期使用 SQL 查询新增的数据写入 ES 中,实现数据的增量同步。 2.4 基于 Binlog 实时同步 上面三种方案要么有代码侵入,要么有硬编码,要么有延迟,那么有没有一种方案既能保证https://www.jb51.net/database/301406wc1.htm