丰富的线上&线下活动,深入探索云世界
做任务,得社区积分和周边
最真实的开发者用云体验
让每位学生受益于普惠算力
让创作激发创新
资深技术专家手把手带教
遇见技术追梦人
技术交流,直击现场
海量开发者使用工具、手册,免费下载
极速、全面、稳定、安全的开源镜像
开发手册、白皮书、案例集等实战精华
为开发者定制的Chrome浏览器插件
使用分布式系统就无法避免对节点管理的问题(需要实时感知节点的状态、对节点进行统一管理等),而由于这些问题处理起来可能相对麻烦和提高了系统的复杂性,ZooKeeper作为一个能够通用解决这些问题的中间件就应运而生了。应用场景:
1)每个子目录项如NameService都被称作为znode,这个znode是被它所在的路径唯一标识,如Server1这个znode的标识为/NameService/Server1。2)znode可以有子节点目录,并且每个znode可以存储数据,注意EPHEMERAL(临时的)类型的目录节点不能有子节点目录。3)znode是有版本的(version),每个znode中存储的数据可以有多个版本,也就是一个访问路径中可以存储多份数据,version号自动增加。4)znode的类型:
为了保证事务的顺序一致性,zookeeper采用了递增的事务id号(zxid)来标识事务。所有的提议(proposal)都在被提出的时候加上了zxid。实现中zxid是一个64位的数字,它高32位是epoch用来标识leader关系是否改变,每次一个leader被选出来,它都会有一个新的epoch,标识当前属于那个leader的统治时期。低32位用于递增计数。
Leader选举算法采用了Paxos协议;Paxos核心思想:当多数Server写成功,则任务数据写成功如果有3个Server,则两个写成功即可;如果有4或5个Server,则三个写成功即可。Server数目一般为奇数(3、5、7)如果有3个Server,则最多允许1个Server挂掉;如果有4个Server,则同样最多允许1个Server挂掉。由此,我们看出3台服务器和4台服务器的的容灾能力是一样的,所以为了节省服务器资源,一般我们采用奇数个数,作为服务器部署个数。
服务器1(myid=1)启动,当前只有一台服务器,无法完成Leader选举服务器2(myid=2)启动,此时两台服务器能够相互通讯,开始进入Leader选举阶段
①NameNode:HDFS的管理节点,负责客户端的请求响应,存放元数据;②SecondaryNameNode:(辅助节点)当编辑日志和映像文件需要合并时,在同一个NameNode上执行合并操作会耗费大量内存和计算能力,因此合并操作一般会在另一台机器上执行,即(SecondaryNamenode);③DataNode:HDFS的工作节点,受客户端和NameNode的调度,检索并存放数据块。没有NameNode,DataNode将无法使用;
③nnHA在hadoop2.0之前,namenode只有一个,存在单点问题(虽然hadoop1.0有secondarynamenode,checkpointnode,buckcupnode这些,但是单点问题依然存在),在hadoop2.0引入了HA机制。hadoop2.0的HA机制有两个namenode,一个是activenamenode,状态是active;另外一个是standbynamenode,状态是standby。两者的状态是可以切换的,但不能同时两个都是active状态,最多只有1个是active状态。只有activenamenode提供对外的服务,standbynamenode是不对外服务的。activenamenode和standbynamenode之间通过NFS或者JN(journalnode,QJM方式)来同步数据。
NameNode被格式化之后,将在../hadoop/hdfs/namenode/current/目录中产生如下文件①Fsimage文件:HDFS文件系统元数据的一个永久性的检查点,其中包含HDFS文件系统的所有目录和文件inode的序列化信息②Edits文件:存放HDFS文件系统的所有更新操作的路径,文件系统客户端执行的所有写操作首先会被记录到Edits文件中③seentxid文件保存的是一个数字,就是最后一个edits的数字④每次NameNode启动的时候都会将Fsimage文件读入内存,加载Edits里面的更新操作,保证内存中的元数据信息是最新的、同步的,可以看成NameNode启动的时候就将Fsimage和Edits文件进行了合并
JN1、JN2、JN3等是JournalNode集群的节点,QJM(QuoromJournalManager)的基本原理是用2N+1台JournalNode存储EditLog,每次写数据操作有N/2+1个节点返回成功,那么本次写操作才算成功,保证数据高可用。当然这个算法所能容忍的是最多有N台机器挂掉,如果多于N台挂掉,算法就会失效。ZooKeeper也是作为分布式协调的组件,namenodeHA用了JournalNode,而没有用ZooKeeper,原因是Zookeeper不适合存储,znode中可以存储的默认最大数据大小为1MB。
健康检测:zkfc会周期性的向它监控的namenode(只有namenode才有zkfc进程,并且每个namenode各一个)发生健康探测命令,从而鉴定某个namenode是否处于正常工作状态,如果机器宕机,心跳失败,那么zkfc就会标记它处于不健康的状态。会话管理:如果namenode是健康的,zkfc机会保持在zookeeper中保持一个打开的会话,如果namenode是active状态的,那么zkfc还会在zookeeper中占有一个类型为短暂类型的znode,当这个namenode挂掉时,这个znode将会被删除,然后备用的namenode得到这把锁,升级为主的namenode,同时标记状态为active,当宕机的namenode,重新启动,他会再次注册zookeeper,发现已经有znode了,就自动变为standby状态,如此往复循环,保证高可靠性,但是目前仅支持最多配置两个namenode。master选举:如上所述,通过在zookeeper中维持一个短暂类型的znode,来实现抢占式的锁机制,从而判断哪个namenode为active状态。
在安全模式下集群在进行恢复元数据,即在合并fsimage和editslog,并且接受datanode的心跳信息,恢复block的位置信息,将集群恢复到上次关机前的状态。
①用户向YARN中提交应用程序,其中包括ApplicationMaster程序、启动ApplicationMaster的命令、用户程序等②ResourceManager为该应用程序分配第一个Container,并与对应的NodeManager通信,要求它在这个Container中启动应用程序的ApplicationMaster。③ApplicationMaster首先向ResourceManager注册,这样用户可以直接通过ResourceManager查看应用程序的运行状态,然后它将为各个任务申请资源,并监控它的运行状态,直到运行结束,即重复4~7。④ApplicationMaster采用轮询的方式通过RPC协议向ResourceManager申请和领取资源。⑤ApplicationMaster申请到资源后,便与对应的NodeManager通信,要求它启动任务。⑥NodeManager为任务设置好运行环境(包括环境变量、JAR包、二进制程序等)后,将任务启动命令写到一个脚本中,并通过运行该脚本启动任务。⑦各个任务通过某个RPC协议向ApplicationMaster汇报自己的状态和进度,以让ApplicationMaster随时掌握各个任务的运行状态,从而可以在任务失败时重新启动任务。在应用程序运行过程中,用户可随时通过RPC向ApplicationMaster查询应用程序的当前运行状态。⑧应用程序运行完成后,ApplicationMaster向ResourceManager注销并关闭自己。
Collector将数据写入后端存储Reader是与Collector分开的独立守护进程,专用于通过RESTAPI提供查询③RegistryDNS由Yarn服务注册表支持的YarnDNS服务器,可通过其标准DNS在Yarn上查找服务通过DNS向外提供已有的service-discovery信息,将YARNServiceregistryrecords转换为DNS记录,从而使用户可以通过标准的DNS客户端机制(例如DNSSRV记录,描述host:port)查询YARNApplciation信息
Spark计算框架在处理数据时,所有的中间数据都保存在内存中,从而减少磁盘读写操作,提高框架计算效率。同时Spark还兼容HDFS、Hive,可以很好地与Hadoop系统融合,从而弥补MapReduce高延迟的性能缺点。
SparkSQL构建在SparkCore之上,专门用来处理结构化数据(不仅仅是SQL)。即SparkSQL是SparkCore封装而来的!SparkSQL在SparkCore的基础上针对结构化数据处理进行很多优化和改进。
DriverProgram是用户编写的数据处理逻辑,这个逻辑中包含用户创建的SparkContext。SparkContext是用户逻辑与Spark集群主要的交互接口,它会和ClusterManager交互,包括向它申请计算资源等。ClusterManager负责集群的资源管理和调度,现在支持Standalone、ApacheMesos和Hadoop的YARN。WorkerNode是集群中可以执行计算任务的节点。Executor是在一个WorkerNode上为某应用启动的一个进程,该进程负责运行任务,并且负责将数据存在内存或者磁盘上。Task是被送到某个Executor上的计算单元,每个应用都有各自独立的Executor,计算最终在计算节点的Executor中执行。
Spark可以跑在很多集群上,比如跑在local上,跑在Standalone上,跑在ApacheMesos上,跑在HadoopYARN上等等。不管Spark跑在什么上面,它的代码都是一样的,区别只是master的时候不一样。其中SparkonYARN是工作中或生产上用的非常多的一种运行模式。Spark可以和Yarn整合,将Application提交到Yarn上运行,Yarn有两种提交任务的方式。
Yarn-Cluster主要用于生产环境中,因为Driver运行在Yarn集群中某一台NodeManager中,每次提交任务的Driver所在的机器都是随机的,不会产生某一台机器网卡流量激增的现象,缺点是任务提交后不能看到日志。只能通过yarn查看日志。
RDD[Person]:以Person为类型参数,但不了解其内部结构DataFrame:提供了详细的结构信息schema列的名称和类型,更像是一张表DataSet[Person]:不光有schema信息,还有类型信息
ApacheFlink是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。Flink设计为在所有常见的集群环境中运行,以内存速度和任何规模执行计算。
ApacheFlink是一个分布式系统,需要计算资源才能执行应用程序。Flink与所有常见的集群资源管理器(如HadoopYARN,ApacheMesos和Kubernetes)集成,但也可以设置为作为独立集群运行。Flink旨在很好地适用于之前列出的每个资源管理器。这是通过特定于资源管理器的部署模式实现的,这些模式允许Flink以其惯用的方式与每个资源管理器进行交互。部署Flink应用程序时,Flink会根据应用程序配置的并行性自动识别所需资源,并从资源管理器请求它们。如果发生故障,Flink会通过请求新资源来替换发生故障的容器。提交或控制应用程序的所有通信都通过REST调用进行。这简化了Flink在许多环境中的集成。
Flink旨在以任何规模运行有状态流应用程序。应用程序可以并行化为数千个在集群中分布和同时执行的任务。因此,应用程序可以利用几乎无限量的CPU,主内存,磁盘和网络IO。而且,Flink可以轻松维护非常大的应用程序状态。其异步和增量检查点算法确保对处理延迟的影响最小,同时保证一次性状态一致性。
JobManager的功能主要有:
TaskManager的功能主要有:负责具体任务的执行过程,在JobManager申请到资源之后开始启动。TaskManager里面的主要组件有:
Flink与Yarn的关系与MapReduce和Yarn的关系是一样的。Flink通过Yarn的接口实现了自己的AppMaster。当在Yarn中部署了Flink,Yarn就会用自己的Container来启动Flink的JobManager(也就是AppMaster)和TaskManager。启动新的FlinkYARN会话时,客户端首先检查所请求的资源(容器和内存)是否可用。之后,它将包含Flink和配置的jar上传到HDFS(步骤1)。客户端的下一步是请求(步骤2)YARN容器以启动ApplicationMaster(步骤3)。由于客户端将配置和jar文件注册为容器的资源,因此在该特定机器上运行的YARN的NodeManager将负责准备容器(例如,下载文件)。完成后,将启动ApplicationMaster(AM)。该JobManager和AM在同一容器中运行。一旦它们成功启动,AM就知道JobManager(它自己的主机)的地址。它正在为TaskManagers生成一个新的Flink配置文件(以便它们可以连接到JobManager)。该文件也上传到HDFS。此外,AM容器还提供Flink的Web界面。YARN代码分配的所有端口都是临时端口。这允许用户并行执行多个FlinkYARN会话。之后,AM开始为Flink的TaskManagers分配容器,这将从HDFS下载jar文件和修改后的配置。完成这些步骤后,即可建立Flink并准备接受作业。
基于StreamAligning操作能够实现ExactlyOnce语义,但是也会给流处理应用带来延迟,因为为了排列对齐Barrier,会暂时缓存一部分Stream的记录到Buffer中。通常以最迟对齐Barrier的一个Stream做为处理Buffer中缓存记录的时刻点。可通过开关,选择是否使用StreamAligning,如果关掉则ExactlyOnce会变成Atleastonce。StateBackend(数据持久化方案)flink的StateBackend是实现快照持久化的重要功能,flink将StateBackend抽象成一种插件,支持三种StateBackend。a、MemoryStateBackend:基于内存实现,将数据存储在堆中。数据过大会导致OOM问题,不建议生产环境使用,默认存储的大小为4M。b、FsStateBackend:将数据持久化到文件系统包括(本地,hdfs,Amazon,阿里云),通过地址进行指定。c、RocksDBStateBackend:RocksDB是一种嵌入式Key-Value数据库,数据实际保存在本地磁盘上。比起FsStateBackend的本地状态存储在内存中,RocksDB利用了磁盘空间,所以可存储的本地状态更大。从RocksDB中读写数据都需要进行序列化和反序列化,读写成本更高。允许增量快照,每次快照时只对发生变化的数据增量写到分布式存储上,而不是将所有的本地状态都拷贝过去。
HA共有5种模式
Kerberos核心概念:Principal:大致可以认为是Kerberos世界的用户名,用于标识身份。principal主要由三部分构成:primary,instance(可选)和realm。
Keytab:"密码本"。包含了多个principal与密码的文件,用户可以利用该文件进行身份认证。TicketCache:客户端与KDC交互完成后,包含身份认证信息的文件,短期有效,需要不断renew。Realm:Kerberos系统中的一个namespace。不同Kerberos环境,可以通过realm进行区分。
Hive是一个SQL解析引擎,将SQL语句转译成MRJob,然后再Hadoop平台上运行,达到快速开发的目的。Hive中的表是纯逻辑表,就只是表的定义等,即表的元数据。本质就是Hadoop的目录/文件,达到了元数据与数据存储分离的目的Hive本身不存储数据,它完全依赖HDFS和MapReduce。Hive的内容是读多写少,不支持对数据的改写和删除。
Hive外部连接提供的是一个shell客户端,是直接启动了一个org.apache.hadoop.hive.cli.cliDriver的进程,这个进程主要包含了两块内容,一个是提供交互的cli,另外一个就是Driver驱动引擎,这种情况下如果有多个客户端的情况下,就需要多个Driver,但如果通过HiveServer2连接就可以共享Driver,一方面可以简化客户端的设计降低资源损耗,另外一方面还能降低对MetaStore的压力,减少连接的数量。在生产环境中使用Hive,强烈建议使用HiveServer2来提供服务,好处很多:
MapReduce实现基本SQL操作的原理
a.两种表进行了join命令,而joinon所用的uid正是两个map任务中的key。同时,将表中的其他数据连同各个表的标识id放到value中;b.通过这种key的选择方式,就可以用shuffle将相同key的不同表的数据聚合在一起;c.通过shuffle操作后,相同key的数据被聚合到一起,接下来使用reduce把不同表的数据做整合就得到了最后的查询结果。
用Hive直接编写MR程序,假设有四个有依赖关系的MR作业,上图中,绿色是ReduceTask,云状表示写屏蔽,需要将中间结果持久化写到HDFS。Tez可以将多个有依赖的作业转换为一个作业,这样只需写一次HDFS,且中间节点较少,从而大大提升作业的计算性能。
Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。
高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒可扩展性:kafka集群支持热扩展持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)高并发:支持数千个客户端同时读写
日志收集:一个公司可以用Kafka可以收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、Hbase、Solr等消息系统:解耦和生产者和消费者、缓存消息等用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到hadoop、数据仓库中做离线分析和挖掘运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告流式处理:比如sparkstreaming和storm事件源
如上图,这个partition有三组segment文件,每个log文件的大小是一样的,但是存储的message数量是不一定相等的(每条的message大小不一致)。文件的命名是以该segment最小offset来命名的,如000.index存储offset为0~368795的消息,kafka就是利用分段+索引的方式来解决查找效率的问题。Message结构上面说到log文件就实际是存储message的地方,我们在producer往kafka写入的也是一条一条的message,那存储在log中的message是什么样子的呢?消息主要包含消息体、消息大小、offset、压缩类型……等等!重点需要知道的是下面三个:
在Kafka集群中,某个Broker将被选举出来担任一种特殊的角色,其用于管理和协调Kafka集群,即管理集群中的所有分区的状态并执行相应的管理操作。每个Kafka集群任意时刻都只能有一个Controller。当集群启动时,所有Broker都参与Controller的竞选,最终有一个胜出,一旦Controller在某个时刻崩溃,集群中的其他的Broker会收到通知,然后开启新一轮的Controller选举,新选举出来的Controller将承担起之前Controller的所有工作。集群状态的维护是Controller保持运行状态一致性的基本要素,如果要持续稳定的对外提供服务,除了集群状态还有其他的工作需要Controller负责,如下:
HBase是一种面向列存储的非关系型数据库,是在Hadoop之上的NoSQL的Key/value数据库,不支持join的、也不支持ACID、对事务支持有限,无schema(创建表的时候,无需去指定列、列类型)、原生就支持分布式存储的,所以可以用来存储海量数据,同时也兼顾了快速查询、写入的功能。是大数据领域中Key-Value数据结构存储最常用的数据库方案。HBase作为NoSQL数据库的代表,属于三驾马车之一BigTable的对应实现,HBase的出现很好地弥补了大数据快速查询能力的空缺。Hbase利用Hadoop的HDFS作为其文件存储系统,利用Hadoop的MapReduce来处理Hbase中的海量数据,利用zookeeper作为其协调工具。HBase依赖于:ZooKeeper、HDFS,在启动HBase之前必须要启动ZK、HDFS。
HBaseClient为用户提供了访问HBase的接口,可以通过元数据表来定位到目标数据的RegionServer,另外HBaseClient还维护了对应的cache来加速Hbase的访问,比如缓存元数据的信息。HMasterHBase集群的主节点,负责整个集群的管理工作,主要工作职责如下:
MemStore刷盘HBase会根据MemStore配置的刷盘策略定时将数据刷新到StoreFile中,完成数据持久化存储。为什么要把WAL加载到MemStore中,再刷写成HFile呢?WAL(Write-Ahead-Log)预写日志是HBase的RegionServer在处理数据插入和删除过程中用来记录操作内容的一种日志。每次Put、Delete等一条记录时,首先将其数据写入到RegionServer对应的HLog文件中去。而WAL是保存在HDFS上的持久化文件,数据到达Region时先写入WAL,然后被加载到MemStore中。这样就算Region宕机了,操作没来得及执行持久化,也可以再重启的时候从WAL加载操作并执行。从写入流程中可以看出,数据进入HFile之前就已经被持久化到WAL了,而WAL就是在HDFS上的,MemStore是在内存中的,增加MemStore并不能提高写入性能,为什么还要从WAL加载到MemStore中,再刷写成HFile呢?数据需要顺序写入,但HDFS是不支持对数据进行修改的;WAL的持久化为了保证数据的安全性,是无序的;Memstore在内存中维持数据按照rowkey顺序排列,从而顺序写入磁盘;所以MemStore的意义在于维持数据按照RowKey的字典序排列,而不是做一个缓存提高写入效率。
HBase的数据删除操作并不会立即将数据从磁盘上删除,因为HBase的数据通常被保存在HDFS中,而HDFS只允许新增或者追加数据文件,所以删除操作主要对要被删除的数据进行标记。当执行删除操作时,HBase新插入一条相同的Key-Value数据,但是keyType=Delete,这便意味着数据被删除了,直到发生Major_compaction操作,数据才会真正地被从磁盘上删除。HBase这种基于标记删除的方式是按顺序写磁盘的的,因此很容易实现海量数据的快速删除,有效避免了在海量数据中查找数据、执行删除及重建索引等复杂的流程。
列式存储不仅具有按需查询来提高效率的优势,由于同一列的数据属于同一种类型,如数值类型,字符串类型等,相似度很高,还可以选择使用合适的编码压缩可减少数据的存储空间,进而减少IO提高读取性能。总的来说,行式存储和列式存储没有说谁比谁更优越,只能说谁更适合哪种应用场景。