从Map产生输出开始到Reduce取得数据作为输入之前的过程称作shuffle。
1).Collect阶段:将MapTask的结果输出到默认大小为100M的环形缓冲区,保存的是key/value,Partition分区信息等。
2).Spill阶段:当内存中的数据量达到一定的阀值的时候,就会将数据写入本地磁盘,在将数据写入磁盘之前需要对数据进行一次排序的操作,如果配置了combiner,还会将有相同分区号和key的数据进行排序。
3).Merge阶段:把所有溢出的临时文件进行一次合并操作,以确保一个MapTask最终只产生一个中间数据文件。
4).Copy阶段:ReduceTask启动Fetcher线程到已经完成MapTask的节点上复制一份属于自己的数据,这些数据默认会保存在内存的缓冲区中,当内存的缓冲区达到一定的阀值的时候,就会将数据写到磁盘之上。
5).Merge阶段:在ReduceTask远程复制数据的同时,会在后台开启两个线程对内存到本地的数据文件进行合并操作。
6).Sort阶段:在对数据进行合并的同时,会进行排序操作,由于MapTask阶段已经对数据进行了局部的排序,ReduceTask只需保证Copy的数据的最终整体有效性即可。
5.MapReduce的Partition和Combine有什么区别?
1)combine分为map端和reduce端,作用是把同一个key的键值对合并在一起,可以自定义,该类的主要功能是合并相同的key键
2)partition是分割map每个节点的结果,按照key分别映射给不同的reduce,也是可以自定义的,partition的作用就是把这些数据归类
生产者重要参数参数名称描述bootstrap.servers生产者连接集群所需的broker地址清单,可以设置1个或者多个,中间用逗号隔开。注意这里并非需要所有的broker地址,因为生产者从给定的broker里查找到其他broker信息。key.serializer和value.serializer指定发送消息的key和value的序列化类型。一定要写全类名。buffer.memoryRecordAccumulator缓冲区总大小,默认32m。batch.size缓冲区一批数据最大值,默认16k。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加。linger.ms如果数据迟迟未达到batch.size,sender等待linger.time之后就会发送数据。单位ms,默认值是0ms,表示没有延迟。生产环境建议该值大小为5-100ms之间。acks0:生产者发送过来的数据,不需要等数据落盘应答。1:生产者发送过来的数据,Leader收到数据后应答。-1(all):生产者发送过来的数据,Leader+和isr队列里面的所有节点收齐数据后应答。默认值是-1,-1和all是等价的。max.in.flight.requests
.per.connection
-----
1.HBase:
2.ClickHouse:支持物化视图
存宽表
表之间的join比较弱。社区较为活跃,
3.Druid:
社区最不活跃,一直处于孵化阶段
1Spark作业运行流程①构建SparkApplication的运行环境(启动SparkContext),SparkContext向资源管理器(YARN)注册并申请运行Executor资源;
②资源管理器分配并启动Executor,Executor的运行情况将随着心跳发送到资源管理器上;
③SparkContext构建成DAG图,将DAG图分解成Stage(Taskset),并把Taskset发送给TaskScheduler。Executor向SparkContext申请Task;
④TaskScheduler将Task发放给Executor运行,同时SparkContext将应用程序代码发放给Executor;
⑤Task在Executor上运行,运行完毕释放所有资源。
1)客户端client向ResouceManager提交Application,ResouceManager接受Application,并根据集群资源状况选取一个node来启动Application的任务调度器driver(ApplicationMaster)
2)ResouceManager找到那个node,命令其该node上的nodeManager来启动一个新的JVM进程运行程序的driver(ApplicationMaster)部分,driver(ApplicationMaster)启动时会首先向ResourceManager注册,说明由自己来负责当前程序的运行。
4)ResouceManager接受到driver(ApplicationMaster)提出的申请后,会最大化的满足资源分配请求,并发送资源的元数据信息给driver(ApplicationMaster);
5)driver(ApplicationMaster)收到发过来的资源元数据信息后会根据元数据信息发指令给具体机器上的NodeManager,让其启动具体的container。
6)NodeManager收到driver发来的指令,启动container,container启动后必须向driver(ApplicationMaster)注册。
7)driver(ApplicationMaster)收到container的注册,开始进行任务的调度和计算,直到任务完成。
Checkpoint机制
.SqlParser:将sql语句通过javacc解析成AST(语法树),在calcite中用SqlNode表示AST;
2.SqlValidator:结合数字字典(catalog)去验证sql语法;
3.生成LogicalPlan:将sqlNode表示的AST转换成LogicalPlan,用relNode表示;
4.生成optimizedLogicalPlan:先基于calciterules去优化logicalPlan,基于flink定制的一些优化rules去优化logicalPlan;
5.生成FlinkPhysicalPlan:这里也是基于flink里头的rules将,将optimizedLogicalPlan转成成Flink的物理执行计划;
6.将物理执行计划转成FlinkExecutionPlan:就是调用相应的tanslateToPlan方法转换和利用CodeGen元编程成Flink的各种算子。
TableAPI来提交任务的话,基本流程和运行SQL类似,稍微不同的是:tableapiparser:flink会把tableapi表达的计算逻辑也表示成一颗树,用treeNode去表式;在这棵树上的每个节点的计算逻辑用Expression来表示。
nclient发起查询请求,某个DataNode接收到请求,该DataNode就会成为协调节点(CoordinatingNode)
n协调节点(CoordinatingNode)将查询请求广播到每一个数据节点,这些数据节点的分片会处理该查询请求。协调节点会轮询所有的分片来自动进行负载均衡
n每个分片进行数据查询,将符合条件的数据放在一个优先队列中,并将这些数据的文档ID、节点信息、分片信息返回给协调节点
n协调节点将所有的结果进行汇总,并进行全局排序
n协调节点向包含这些文档ID的分片发送get请求,对应的分片将文档数据返回给协调节点,最后协调节点将数据返回给客户端
(1)客户端通过REST接口,将作业提交给分发器。
(2)分发器启动JobMaster,并将作业(包含JobGraph)提交给JobMaster。
(3)JobMaster向资源管理器请求资源(slots)。
(4)资源管理器向YARN的资源管理器请求container资源。
(5)YARN启动新的TaskManager容器。
(6)TaskManager启动之后,向Flink的资源管理器注册自己的可用任务槽。
(7)资源管理器通知TaskManager为新的作业提供slots。
(8)TaskManager连接到对应的JobMaster,提供slots。
(9)JobMaster将需要执行的任务分发给TaskManager,执行任务。
0.17HBase读写流程?阿善背诵读:①HRegionServer保存着meta表以及表数据,要访问表数据,首先Client先去访问zookeeper,从zookeeper里面获取meta表所在的位置信息,即找到这个meta表在哪个HRegionServer上保存着。②接着Client通过刚才获取到的HRegionServer的IP来访问Meta表所在的HRegionServer,从而读取到Meta,进而获取到Meta表中存放的元数据。③Client通过元数据中存储的信息,访问对应的HRegionServer,然后扫描所在HRegionServer的Memstore和Storefile来查询数据。④最后HRegionServer把查询到的数据响应给Client。
写:①Client先访问zookeeper,找到Meta表,并获取Meta表元数据。②确定当前将要写入的数据所对应的HRegion和HRegionServer服务器。③Client向该HRegionServer服务器发起写入数据请求,然后HRegionServer收到请求并响应。④Client先把数据写入到HLog,以防止数据丢失。⑤然后将数据写入到Memstore。⑥如果HLog和Memstore均写入成功,则这条数据写入成功⑦如果Memstore达到阈值64M,会把Memstore中的数据flush到Storefile中。⑧当Storefile越来越多,会触发Compact合并操作,把过多的Storefile合并成一个大的Storefile。⑨当Storefile越来越大,Region也会越来越大,达到阈值后,会触发Split操作,将Region一分为二。
-----技术架构图
-----tiaokai
--200万*100条*1k=10亿k=191G
voidbubbleSort(intarr[],intn){inti,j,temp;for(i=0;i