丰富的线上&线下活动,深入探索云世界
做任务,得社区积分和周边
最真实的开发者用云体验
让每位学生受益于普惠算力
让创作激发创新
资深技术专家手把手带教
遇见技术追梦人
技术交流,直击现场
海量开发者使用工具、手册,免费下载
极速、全面、稳定、安全的开源镜像
开发手册、白皮书、案例集等实战精华
为开发者定制的Chrome浏览器插件
结构化数据源对数据定义了一种模式。通过这些关于底层数据的额外信息,结构化数据源提供高效的存储和性能。例如,列式数据存储Parquet和ORC,使得从一个列子集中提取数据更加容易。当数据查询只需要获取一少部分列的数据时,通过遍历每行数据的方式需要查询出过多的数据。基于行的存储格式,如Avro通过高效的序列化存储数据提供了存储优势。但是,这种优势是以复杂性为代价的。例如,由于结构不够灵活,模式转换将成为挑战。
半结构化数据源是每行记录一个结构,但不需要对整体记录有一个全局的模式定义。因此,每行记录是通过其自身的模式信息对其进行扩充。JSON和XML就是其中最流行的例子。半结构化数据的优势在于通过每行记录自身的描述信息,增强了展示数据信息的灵活性。由于有很多轻量级的解析器用于处理这些记录,因此半结构化数据格式在很多应用中普遍被使用,并且在可读性上存在优势。但是,它的主要缺陷也在于会产生额外的解析开销,不能专门应用于即席查询。
列式存储(Column-orientedStorage)并不是一项新技术,最早可以追溯到1983年的论文Cantor。然而,受限于早期的硬件条件和使用场景,主流的事务型数据库(OLTP)大多采用行式存储,直到近几年分析型数据库(OLAP)的兴起,列式存储这一概念又变得流行。总的来说,列式存储的优势一方面体现在存储上能节约空间、减少IO,另一方面依靠列式数据结构做了计算上的优化。
行式存储通过逐行组织数据,所有的数据在存储介质上通过首位相连、逐条存储,行式存储是一种传统的组织数据的方法。
ApacheParquet是Hadoop生态系统中通用的列式存储格式,独立于数据处理框架、数据模型、编程语言;Parquet的灵感来自于2010年Google发表的Dremel论文,文中介绍了一种支持嵌套结构的存储格式,并且使用了列式存储的方式提升查询性能。
如果有大批量的Json格式数据需要转为Parquet格式数据,参考以下代码;
importosimportmultiprocessingfromjson2parquetimportconvert_jsondefsplit_file(file_name,path):result_path="parquet/"file_path=path+file_nameres_path=result_path+file_name+".parquet"convert_json(file_path,res_path)defmain():path="data/"file_list=os.listdir(path)pool=multiprocessing.Pool(processes=20)forfile_nameinfile_list:pool.apply_async(split_file,(file_name,path,))pool.close()pool.join()if__name__=='__main__':main()Parquet格式运行任务使用parquet数据格式,来运行作业,使用sparkreadapi中的parquet接口;其中包括可以读指定的单个文件,或者一组文件;
spark.read.parquet("yourparquetfileorfiles")读取单个parquet文件方法
/***LoadsaParquetfile,returningtheresultasa`DataFrame`.Seethedocumentation*ontheotheroverloaded`parquet()`methodformoredetails.**@since2.0.0*/defparquet(path:String):DataFrame={//Thismethodensuresthatcallsthatexplicitneedsingleargumentworks,seeSPARK-16009parquet(Seq(path):_*)}读取一组paruqet文件方法
/***LoadsaParquetfile,returningtheresultasa`DataFrame`.**YoucansetthefollowingParquet-specificoption(s)forreadingParquetfiles:*
- *
- `mergeSchema`(defaultisthevaluespecifiedin`spark.sql.parquet.mergeSchema`):sets*whetherweshouldmergeschemascollectedfromallParquetpart-files.Thiswilloverride*`spark.sql.parquet.mergeSchema`. *
importorg.apache.spark.sql.{Row,SparkSession}importorg.apache.spark.sql.expressions.Windowimportorg.apache.spark.sql.functions.row_numberobjectOSSExample{defmain(args:Array[String]):Unit={valspark=SparkSession.builder.appName("OSSExample").getOrCreate()valdata=spark.read.parquet.load("oss://your-bucket-name/parquetfile")valdata1=data.groupBy("subject","level").count()valwindow=Window.partitionBy("subject").orderBy(org.apache.spark.sql.functions.col("count").desc)valdata2=data1.withColumn("topn",row_number().over(window)).where("topn<=1")data2.write.format("parquet").save("yourstorepath")}}作业性能对比
1、可以跳过不符合条件的数据,只读取需要的数据,降低IO数据量,提升作业运行性能;2、压缩编码可以降低磁盘存储空间。由于同一列的数据类型是一样的,可以使用更高效的压缩编码(例如RunLengthEncoding和DeltaEncoding)进一步节约存储空间;这样能够更少的使用OSS存储空间,减少数据存储成本;3、只读取需要的列,支持向量运算,能够获取更好的扫描性能。