我们使用Iceberg构建湖仓一体平台的初衷是希望解决业务方在使用Hive数仓时的一些痛点。主要包括以下几大方面:
(1)现有的数据同步只支持全量同步,同步大表速度慢。
(2)Hive的时效性不好,即使使用FIink流式的引擎写入,延迟也会在小时级别。
(3)Hive扫描数据效率低。Hive查一个分区的话是需要将分区下所有文件都扫描一遍然后进行分析,而实际上我可能只对某些文件感兴趣。所以hive全量文件扫描相对低效。
(4)Lambda架构建设的实时数仓存在较多问题。如维护两套代码运维监控成本高、实时离线数据对不上、实时链路kafka无法存储海量数据,无法基于OLAP查询kafka中的数据。
(5)不能友好支持高效更新场景。HDFS只支持追加写,不支持更新。
(6)不可靠的更新操作。在ETL过程中执行insertoverwrite之类操作,这种操作会先把相应分区的数据删除,再把生成的文件加载到分区中。在移除文件时,读取的话会发生异常报文件不存在。
我们希望我们的湖仓一体平台能够解决这些痛点,我们希望可以实现:
(1)首先,湖仓平台要是互联互通的,要支持多引擎的访问。
(2)第二,查询要高效,以满足交互式分析的要求。
(3)第三,使用要尽可能的便捷,尽可能降低业务方的门槛。
(4)第四,基于HDFS海量存储实现准实时数仓,存储离线和实时数据。
(5)最后,支持高效Upsert、支持Schema变更
Iceberg是一个由Netflix开发开源的、用于庞大分析数据集的表格式。它是基于计算层(flink、spark)和存储层(orc、parquet)的一个中间层,它并不定义数据存储方式,而是定义了数据、元数据的组织方式,向上提供统一的“表”的语义,我们可以把它定义成一种“数据组织格式”,对标Hive的表设计。特性主要是代码抽象程度高,不绑定任何的引擎。
现在国内的实时数仓建设围绕Flink的情况会多一点。所以能够基于Flink扩展生态,是我们选择Iceberg一个比较重要的点。
Iceberg有很多的历史版本,会占用大量的存储资源,虽然它有一定的价值,但是也应该得到定期的清理。比如设定最多保留三天,超过三天数据就应该被清理。网易开源Arctic的ExpireSnapshots就是实现这样的操作。
(1)合并碎片文件。在实时场景下,这个非常重要,实时场景下会频繁地往表中提交数据,这样就会产生很多小文件,这些小文件需要进行治理,以提升表上查询性能并减少对存储系统的压力。
(2)减少数据冗余。在使用了行级更新的场景下,删除操作通过独立的deletefile文件来标记,这就会造成数据冗余,冗余数据过多会大大降低表上的查询性能。故需要通过合并数据文件操作把delete文件和datafile做合并。
Iceberg使用catalog来管理所有的表,catalog中存储了当前有哪些表,以及这些表如何按照namespace拆分。Iceberg提供了丰富的catalog实现,也提供了API方便你来扩展自己的catalog实现,以便你把表的元数据存储在任何想存储的地方。
(1)HadoopCatalog,直接使用文件目录管理表信息。
(2)HiveCatalog,将表的元数据信息存储在HiveMetastore中。
(3)JdbcCatalog,把表的元数据信息存储在数据库中。
glink01服务执行gomysqltx_testhive
Iceberg数据库信息
Iceberg表信息
建表语句
对应HDFS文件:/usr/hive/warehouse/iceberg_test.db/test3/metadata/00001-063d2d17-58df-41c4-af5f-899add03281c.metadata.json
(2)Manifestlist:存储的是manifestfile列表,用来链接大量的manifestfile文件。
对应HDFS文件(带snap前缀):/usr/hive/warehouse/iceberg_test.db/test3/metadata/snap-2379142480402238200-1-5183f5e9-7516-4ee8-a454-9c6263b7e624.avro
(3)Manifestfile:存储当前快照下的所有datafile,每行存储一个文件的信息。Manifestfile使用avro格式,为了提升读取文件信息的并发度,manifestfile一般会有多个。
对应HDFS文件(不带snap前缀):/usr/hive/warehouse/iceberg_test.db/test3/metadata/5183f5e9-7516-4ee8-a454-9c6263b7e624-m0.avro
(4)Schemas:表所有的历史的schema信息都会存储在这里,每个schema会有一个自己的schemaid,每个写入的文件都有自己的schemaid信息,当表结构发生变更,新老文件用的是不同的schemaid,这样就不需要把所有的数据重写。
(5)Partitionspecs:分区配置,核心是各种transform。Hive的分区是配置分区字段,然后写入和读取的时候要读写这些分区字段。Iceberg则是在已有的表字段上配置各种转换函数,比如表里有一个timestamp字段,需要用timestamp字段按天做分区,那就在timestamp字段上使用天的转换函数。写数据的时候,不需要再写入额外转换之后天的信息,只需要写入timestamp,Iceberg内部就会把这timestamp字段的值转换成分区的天的信息,在查询的时候,只要在timestamp字段做一个范围查询就能实现高效的基于分区的数据过滤。
Iceberg现阶段有两个版本的tableformat,面向大体量数据分析的formatversion1与提供行级更新能力的formatversion2,formatversion2相较于formatversion1,功能上有较多增强。
(1)Row-levelDelete:行级更新删除能力。以前数据写入都是文件级别的,更新文件里面的部分数据,需要对这个文件进行完整的重写,把老的文件删除,把新的文件写进去。这种方式的缺点是开销大,Flink流式场景下基本不可用。formatV2新增行级更新删除,实现方式是新增了一种deletefile文件类型,文件里记录了表中需要删除的数据信息。比如去做一个update操作,只需去找出这次update需要更新的数据在原来的文件里面的什么位置并记录到positiondeletefile中新数据插入新文件即可。或者直接地将这次update的条件信息记录在equalitydeletefile中,再将更新后的内容写入新的datafile中,将产生的deletefile和datafile提交到表中即完成了本次的update操作。
(2)Positiondeletefile:记录删除一行数据在文件中的位置。通过被删除数据的位置信息来标记删除内容的deletefile,positiondeletefile记录被删除数据的filepath和position,并且在文件内所有数据会按照顺序排序后写入,这样在与datafile合并时会有更好的合并性能。
(3)Equalitydeletefile:记录删除数据的条件信息。通过被删除数据的等值条件来标记输出内容的deletefile,比如ID=5的数据要删除,Equalitydeletefile存储的就是ID=5这样的信息,在Merge-on-Read时就会把ID=5的数据全部过滤掉。IcebergFlinkconnector当前会混合写入positiondeletefile和equalitydeletefile。
(4)Filesequence:标记文件的版本。每次写入后递增;deletefile只对比自己filesequence更小的文件起作用。比如filesequence等于2的deletefile只对filesequence小于等于1的datafile起作用。每次写入的文件拥有相同的filesequence,且随着写入顺序递增。
(5)Rewritefiles(compaction):数据合并。把要删除的数据真实地删除掉,得到删除之后的datafile,这样的datafile读取更高效。这个叫做Iceberg的rewrite过程。需要注意rewrite不会修改文件的文件版本信息。
左侧图是一个抽象的数据处理系统,右侧是对应的现实中的组件
表schema:定义了一个表支持字段和类型,比如int、string、long以及复杂数据类型等。
表中文件组织形式:最典型的是Partition模式,是RangePartition还是HashPartition。
表的读写API实现:封装表的读写API,层引擎通过对应的API读取或者写入表中的数据
都支持int、string、decimal、timestamp、map常用数据类型
HiveMetastore:metastore中的partition不能是表字段。partition本质上是一个目录结构,不是用户表中的一列数据。基于metastore用户想定位到一个partition下的所有数据,首先需要在metastore中定位出该partition对应的所在目录位置信息,然后再到HDFS上执行list命令获取到这个分区下的所有文件,对这些文件进行扫描得到这个partition下的所有数据。
HiveMetastore:一张表的统计信息是表/分区级别粒度的统计信息,如一张表的记录数量、平均长度、为null记录数量、最大值、最小值等。
Iceberg:统计信息精确的文件粒度,每个数据文件都会记录所有列的记录数量、平均长度、最大值、最小值等。
对比总结:Iceberg文件粒度统计信息对于查询中谓词过滤(where条件)会更有效。
Metastore:上层引擎写好一批文件,调用metastore的addpartition接口将这些文件添加到某个分区下。
Iceberg:上层业务写好一批文件,调用Iceberg的commit接口提交本次写入,形成一个新的snapshot快照。这种提交方式保证表的ACID语义。同时基于快照提交可以实现增量拉取。
对比总结:metastore只能添加一批文件加分区。Iceberg可以增量提交。
新partition模式:避免查询时多次调用namenode的list方法,降低namenode压力,提升查询性能。
新metadata模式:文件级别的统计信息可以更快的根据where条件进行文件过滤。可以减少扫描文件数,提高查询性能。
新的Api模式:存储流批一体。支持流式写入-增量拉取(满足批量读取和增量订阅)。支持流批同时读写同一张表,防止insertoverwrite操作时读取报FileNotFoundException异常。
1、建表时增加排序
2、建表时增加索引(基数高的字段如:id)索引类型:
(1)BloomFilter:计算比较简单,占用空间也比较小。存在falsepositive的问题,只支持等值的查询。
(3)TokenBloomFilter、NgramBloomFilter,TokenBitmap、NgramBitmap:是针对token的索引,是为日志场景设计的。相当于对日志做一些分词的操作。分词完成以后,构建BloomFilter或者Bitmap这样的索引。TokenBloomFilter和TokenBitmap针对的是英文的分词,Ngram针对的是中文的分词。
1、spark建表-已完成
2、hive\spark插入数据-已完成
3、hive、spark新增数据到iceberg-已完成
4、hive、spark查询数据-已完成
5、iceberg元数据存储位置查看(hdfs和hive表),元数据管理方式-已完成
6、java、flink实时方式导入数据到iceberg
7、数据同步工具waterdrop、datax离线导入mysql表数据到iceberg
8、腾讯云数据集成工具导入mysql数据到iceberg
9、数据排序写入是否提高查询速度。Z-ORDER、Hibert(B站优先使用),非整形字段用BoundaryIndex,最高4个排序字段。因写入时排好序,根据常用字段查询、抽取时扫描文件少,更快命中文件
10、Iceberg表创建索引(基数高的字段--基本不重复如id、user_id),加快速度。
11、Flink到Iceberg时小文件异步合并测试。
11、Spark到Iceberg时小文件异步合并测试。
12、Iceberg数据上和元数据上怎么治理。
13、数据上是列式存储还是行存储,用的Parquet列式存储格式,但是查询数据文件多列数据都可以查到。可以查看Parquest文件原理。
Header:每个Parquet的首尾各有一个大小为4bytes,内容为PAR1的MagicNumber,用来标识这个文件是Parquet文件。
DataBlock:中间的DataBlock是具体存放数据的区域,由多个行组(RowGroup)组成
行组(RowGroup):是按照行将数据在物理上分成多个单元,每一个行组包含一定的行数。比如一个文件有10000条数据,被划分成两个RowGroup,那么每个RowGroup有5000行数据。
列块(ColumnChunk):在每个行组(RowGroup)中,数据按列连续的存储在这个行组文件中,每列的所有数据组合成一个ColumnChunk(列块),一个列块拥有相同的数据类型,不同的列块可以有不同的压缩格式。
Page:在每个列块(ColumnChunk)中,数据按Page为最小单元来存储,Page按内容分为Datapage和IndexPage。
Footer:包含非常重要的信息,包括Schema和每个RowGroup的Metadata
14、如何把Iceberg表数据固定。因Iceberg是实时写入,如当天10月21日下游使用了Iceberg数据,在10月30日下游回溯21日历史数据,还想用21日的数据。抽数据到hive分区是其中一个方法,还有没有其它方式。
15、***Iceberg表可以单独开项目方便管理,也可以和现在项目结合,同一个库下既有hive也有Iceberg表,需要和数仓讨论下用法。前期只做数据同步可以使用单独项目去管理同步任务,同一张表只同步一份。
16、Iceberg有哪些快照可以用,是否可以整点|天生成快照
17、腾讯Iceberg表管理工具
18、实时在写担心有问题,历史每天零时跑覆盖snapshot。有咩有问题
1、数据湖Iceberg官网
2、秒级响应!B站基于Iceberg的湖仓一体平台构建实践
3、网易:Flink+Iceberg数据湖探索与实践
4、如何使用Arctic更好地玩转ApacheIceberg
5、索引使用-非紧急重要-待看
7、爱奇艺数据湖平台建设实践
8、avro格式详解
11、数据湖Iceberg技术在小米的落地与场景应用
12、谈一谈ApacheIceberg的数据读取过程
13、Iceberg数据湖介绍
14、Spark读写Iceberg在腾讯的实践和优化
15、DLC原生表核心能力
16、Hive+Spark+Flink集成Iceberg功能POC
17、Spark合并Iceberg小文件内存溢出问题定位和解决方案
18、实践数据湖iceberg第三十课mysql->iceberg,不同客户端有时区问题