和很多大数据框架一样,Spark也诞生于一篇论文,该论文中提到了一个概念:弹性分布式数据集(RDD),RDD是一种分布式内存抽象,支持在大规模集群中做内存运算,并具备容错性。对于Spark而言,RDD是最核心的数据结构,整个平台都围绕着RDD进行。
关于RDD的具体细节我们一会儿详细说,总之Spark借鉴了MapReduce的思想发展而来,不仅保留了其分布式并行计算的优点,还改进了其缺点。MapReduce的每一次操作,其结果都需要落盘,然后再从磁盘中读取进行下一次操作。而Spark的所有操作都是在内存中进行的,速度要远远快于MapReduce,此外在操作数据方面还提供了非常丰富的API。
那么问题来了,Spark和之前介绍的Hadoop有什么区别呢?它能完全取代Hadoop吗?我们来对比一下两者的差异。
很明显,Spark无法完全替代Hadoop,因为Hadoop由三部分组成:HDFS、MapReduce、YARN,分别对应存储、计算、资源调度,而Spark只负责计算。尽管Spark相较于MapReduce有巨大的性能优势,但HDFS和YARN仍然是许多大数据体系的核心架构,因此如果非要说替代,可以认为Spark替代了Hadoop内部的MapReduce组件。
在使用Spark时,一般会搭配Hadoop,其中HDFS负责存储,YARN负责资源管理,然后Spark负责计算。但要注意的是,Spark不仅可以搭配Hadoop,还可以搭配Mesos、Kubernetes,也支持Standalone独立运行模式。对于数据源而言,Spark不仅可以从HDFS中读取,像HBase、Cassandra、Kafka、关系型数据库等等,也是支持的。
Spark更常见的搭配还是Hadoop,我们这里也会使用Hadoop。
下面我们来搭建Spark环境,我这里准备了5台CentOS虚拟机,主机名和IP地址如下。
然后在/etc/hosts中配置主机名到IP的映射,后续便通过主机名进行访问,5台机器都这么配。
192.168.170.128satori001192.168.170.129satori002192.168.170.130satori003192.168.170.131satori004192.168.170.132satori005关闭每个节点防火墙:
#生成私钥和公钥,一路回车即可#生成的私钥存放在~/.ssh/id_rsa文件中、公钥则存放在~/.ssh/id_rsa.pub文件中ssh-keygen-trsa#进入到家目录的.ssh目录中cd~/.ssh#创建authorized_keys文件touchauthorized_keys在每个节点上都执行上面几个步骤,那么所有节点的.ssh目录中都有id_rsa、id_rsa.pub和authorized_keys这三个文件。如果想实现免密码登陆的话,假设在A节点中远程登陆B节点想不输入密码,那么就把A节点的id_rsa.pub里面的内容添加到B节点的authorized_keys文件中即可。但是注意,这个过程是单向的,如果在B节点中远程登陆A节点也不想输入密码的话,那么就把B节点的id_rsa.pub里面的内容添加到A节点的authorized_keys中。
针对于这个需求,Linux提供了专门的命令:ssh-copy-id。
注意:每个节点的id_rsa.pub里的内容同时也要添加到自身的authorized_keys文件中。
我们所有的软件都安装在/opt目录中,首先安装Java,这里使用的版本是1.8。安装完成后修改~/.bashrc文件,配置环境变量:
exportJAVA_HOME=/opt/jdk1.8.0_221/exportPATH=$JAVA_HOME/bin:$PATHsource一下之后,在终端中输入java-version,如果有以下输出,表示安装成功。
没有问题,下面来安装Hadoop。
完事之后还是修改~/.bashrc文件,配置环境变量:
exportJAVA_HOME=/opt/jdk1.8.0_221/exportPATH=$JAVA_HOME/bin:$PATHexportHADOOP_HOME=/opt/hadoop-3.3.6/exportPATH=$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$PATH接下来修改Hadoop的配置文件,配置文件都位于$HADOOP_HOME/etc/hadoop目录中。
修改hadoop-env.sh
satori002satori003satori004satori005这里指定worker节点,因为satori001是master,所以将它之外的4个节点作为worker。
修改yarn-env.sh:
exportJAVA_HOME=/opt/jdk1.8.0_221exportYARN_RESOURCEMANAGER_USER=rootexportYARN_NODEMANAGER_USER=root修改yarn-site.xml:
到目前为止,关于Java和Hadoop我们就配置完了,然后使用scp命令将/opt目录内的文件拷贝到其它节点上。
scp-r/opt/*root@satori002:/opt/scp-r/opt/*root@satori003:/opt/scp-r/opt/*root@satori004:/opt/scp-r/opt/*root@satori005:/opt/当然还有环境变量,要将剩余4个节点的环境变量也给改了,然后source一下。
好了,一切已经准备就绪,下面我们启动HDFS,不过首先要格式化NameNode。
下面所有操作只需在master节点(satori001)上执行即可。
hdfsnamenode-format如果格式化成功,那么所有节点会自动创建/opt/hadoop-3.3.6/data目录。
然后启动HDFS,只需要在master节点启动即可,会自动启动worker节点。
执行完之后,再次输入jps查看进程,如果一切正常,那么master节点会多出ResourceManager进程,而worker节点则多出NodeManager进程。
结果没有问题,其它worker节点也是一样的,到此HDFS和YARN我们就启动成功了。我们上传一个文件测试一下:
在satori001节点上传文件,在satori002节点进行读取,结果一切正常。
exportJAVA_HOME=/opt/jdk1.8.0_221/exportPATH=$JAVA_HOME/bin:$PATHexportHADOOP_HOME=/opt/hadoop-3.3.6/exportPATH=$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$PATHexportSPARK_HOME=/opt/spark-3.4.2-bin-hadoop3/exportPATH=$SPARK_HOME/bin:$PATHexportPATH=$SPARK_HOME/sbin:$PATH然后修改Spark配置文件,位于$SPARK_HOME的conf目录下。
[root@satori001conf]#cpspark-env.sh.templatespark-env.sh我们修改spark-env.sh,在里面添加如下内容:
RDD指的是弹性分布式数据集(ResilientDistributedDataset),它是Spark计算的核心。尽管现在都使用DataFrame、Dataset进行编程,但是它们的底层依旧是依赖于RDD的。我们来解释一下RDD的这几个单词含义。
RDD是Spark的一个最基本的抽象(如果你看一下源码的话,你会发现RDD在底层是一个抽象类,抽象类显然不能直接使用,必须要继承并实现其内部的一些方法后才可以使用),它代表了不可变的、元素的分区(partition)集合,这些分区可以被并行操作。假设我们有一个包含300万个元素的数组,那么可以将这个数组分成3份,每一份对应一个分区,每个分区都可以在不同的机器上进行运算,这样就能提高运算效率。
RDD有如下五大特性:
Spark在运行的时候,每一个计算任务就是一个Task,另外对于RDD而言,不是一个RDD计算对应一个Task,而是RDD内部的每一个分区计算都会对应一个Task。假设这个RDD具有5个分区,那么对这个RDD进行一次map操作,就会生成5个Task。并且分区的数据是可持久化的,比如:内存、磁盘、内存+磁盘、多副本、序列化。
如果想创建RDD,那么必须要先创建SparkContext对象,它是程序的入口,无论使用哪种编程语言都是如此。作业的提交,任务的分发,应用的注册都会在SparkContext中进行。一个SparkContext对象代表了和Spark的一个连接,只有建立了连接才可以把作业提交到Spark集群当中去,只有实例化了SparkContext之后才能创建RDD、以及后面要说的Broadcast广播变量。至于SparkConf是用来指定配置的,它会作为参数传递给SparkContext。
如何创建SparkContext对象呢?我们可以使用pyspark模块,直接pipinstall安装即可。
frompysparkimportSparkContext,SparkConf#指定配置,setAppName负责指定应用名称,setMaster负责指定运行模式(关于运行模式一会儿介绍)conf=SparkConf().setAppName("satori").setMaster("local[*]")#实例化SparkContext对象sc=SparkContext(conf=conf)#然后就可以使用sc来创建RDD了关于Python的pyspark模块稍后详细说,再来看看Spark提供的Shell。
Spark提供一个pysparkshell,我们启动之后输入sc,发现它默认已经创建了SparkContext对象。至于master表示运行模式,local[*]代表本地运行,其中*表示使用所有的核(如果只想使用两个核,那么就指定为local[2]即可),appName叫做PySparkShell。
当然啦,在启动的时候也可以手动指定master和appName。
RDD是Spark的核心,那么如何创建RDD呢?答案显然是通过SparkContext对象,上面已经说了。我们可以通过编写py文件的方式(后面会说)手动创建SparkContext对象,也可以通过启动pysparkshell,直接使用默认创建好的,对,就是那个sc。由于SparkContext实例对象操作方式都是一样的,所以目前就先使用pysparkshell来进行编程,后面我们会说如何通过编写脚本的方式进行Spark编程,以及如何将作业提交到Spark上运行。
创建RDD有两种方式:
可以将一个已存在的集合(比如元组、列表、集合等)转成RDD。
#调用sc.parallelize方法,可以将已经存在的集合转为RDD>>>rdd1=sc.parallelize(range(10))#显示的是一个RDD对象>>>rdd1PythonRDD[1]atRDDatPythonRDD.scala:53#如果想查看具体内容,可以调用collect方法,这些后面会说>>>rdd1.collect()[0,1,2,3,4,5,6,7,8,9]#进行map操作得到rdd2>>>rdd2=rdd1.map(lambdax:x+1)>>>rdd2.collect()[1,2,3,4,5,6,7,8,9,10]#进行reduce操作>>>rdd2.reduce(lambdax,y:x+y)55>>>我们知道RDD是有分区的,在创建时也可以手动指定分区数量。
如果没有指定分区数量,那么Spark会根据CPU核心数来自己决定。
SparkContext还可以读取存储系统里面的文件来创建RDD,我们演示一下从本地读取文件、和从HDFS上读取文件。
在本地创建一个girl.txt,并上传到HDFS上面。
#读取文件使用textFile方法,传递一个文件路径,同时也可以指定分区#可以从本地读取,读取的格式为"file://文件路径">>>rdd1=sc.textFile("file:///root/girl.txt")>>>rdd1.collect()['hellomatsuri','hellokoishi','hellomatsuri','hellomarisa']#从HDFS上读取,格式为"hdfs://ip:port/文件路径",port就是HDFS集群的NameNode端口#注意格式:"...//girl.txt"和".../girl.txt"都是合法的>>>rdd2=sc.textFile("hdfs://satori001:9000//girl.txt",4)>>>rdd2.collect()['hellomatsuri','hellokoishi','hellomatsuri','hellomarisa']#执行map操作>>>rdd3=rdd1.map(lambdax:x.split())>>>rdd3.collect()[['hello','matsuri'],['hello','koishi'],['hello','matsuri'],['hello','marisa']]我们看到通过textFile读取外部文件的方式创建RDD也是没有问题的,文件的一行对应RDD的一个元素。但需要注意:如果是Spark集群,并且还是通过本地文件的方式,那么你要保证该文件在所有节点上都存在。
目前我们选择单节点Spark,因为对于学习来讲,单节点和多节点都是差不多的,不可能因为用的多节点,语法就变了,只是多节点在操作的时候要考虑到通信、资源等问题。比如这里读取本地的/root/girl.txt,如果你搭建的是集群,那么要保证每个节点都存在/root/girl.txt,否则节点根本获取不到这个数据。所以在学习语法的时候不建议搭建Spark集群,等基础概念了解的差不多了,介绍Spark运行模式的时候,我们再搭建。
对于Spark集群而言,上面问题的解决办法就是把文件拷贝到每一个节点上面,或者使用网络共享的文件系统。
另外textFile不光可以读取文件,还可以读取目录:textFile("/dir"),模糊读取:textFile("/dir/*.txt"),以及读取gz压缩包等等。
既然可以读取文件创建RDD,那么也可以将RDD保存为文件,通过saveAsTextFile方法。
>>>rdd=sc.parallelize(range(8),4)>>>rdd=rdd.map(lambdax:f"甜狗{x}号")#默认保存在本地,当然也可以加上file://>>>rdd.saveAsTextFile("/root/a.txt")#保存到HDFS>>>rdd.saveAsTextFile("hdfs://satori001:9000/a.txt")虽然我们保存的文件名是a.txt,但它并不是一个文本文件,而是一个目录。
里面的每一个part对应一个分区,因为RDD有4个分区,所以有4个part,每个part保存的就是对应分区的数据。所以Spark是把每个分区写到一个文件里面,而RDD在磁盘上会对应一个目录。从这里可以看出,RDD其实是一个抽象的概念,它表示一系列分区的集合,而这些分区可以分布在不同的节点上。
textFile除了本地文件、HDFS文件,还支持S3,比如textFile("S3://....")读取S3文件。
另外我们说过textFile不仅可以读取指定文件,还可以传递一个目录,会将目录里面的所有文件读取出来合并在一起。
#读取指定的单个文件>>>rdd=sc.textFile("hdfs://satori001:9000/a.txt/part-00000")>>>rdd.collect()['甜狗0号','甜狗1号']#也可以指定通配符,这里读取文件名以3结尾的文件,也就是part-00003>>>rdd=sc.textFile("hdfs://satori001:9000/a.txt/*3")>>>rdd.collect()['甜狗6号','甜狗7号']#指定一个目录,会将目录下的文件全部读取>>>rdd=sc.textFile("hdfs://satori001:9000/a.txt")>>>rdd.collect()['甜狗0号','甜狗1号','甜狗2号','甜狗3号','甜狗4号','甜狗5号','甜狗6号','甜狗7号']>>>这里我们将刚才写入的文件又读出来了,但是注意,如果你读取的是一堆的小文件,那么更推荐使用wholeTextFiles方法,该方法会对分区数量做优化,选择一个合适的分区数。因为分区并不是越多越好,分区变多,也会导致shuffle的几率变高。而shuffle是一个比较昂贵的操作,它表示节点之间发生了数据交换,显然这是比较耗时的。所以面对大量的小文件,不可能让每个小文件都对应一个分区,那么究竟多少个小文件被归为一个分区呢?可以让wholeTextFiles方法帮我们决定。
#我在files目录中生成了100个小文件(每个文件里面只有一个字符串"hello")#如果使用textFile读取,那么RDD会有100个分区>>>rdd=sc.textFile("hdfs://satori001:9000/files")>>>rdd.getNumPartitions()100#而使用wholeTextFiles读取,那么只有两个分区>>>rdd=sc.wholeTextFiles("hdfs://satori001:9000/files")>>>rdd.getNumPartitions()2因此当读取的多个文件都是大文件,那么用textFile没有问题,但若是一堆小文件,更推荐使用wholeTextFiles方法,会拥有更好地性能。
然后这两个方法返回的内容也是不一样的:
textFile会将所有文件的内容合并在一起,而wholeTextFiles会对文件作区分,返回的RDD里面的每个元素都是元组(包含文件名和文件内容)。
RDD提供了很多的算子,那么什么是算子呢?
说白了,rdd是一个类RDD的实例对象,而rdd能够调用的方法就是算子,而算子分为两类:transformation和action。
transformation类型的算子是懒加载(lazy)的,当调用该类型的算子时,相当于在构建执行计划,并没有开始执行,返回的是一个新的RDD。
>>>rdd1=sc.parallelize([1,2,3,4])>>>rdd2=rdd1.map(lambdax:x+1)>>>rdd3=rdd2.map(lambdax:x+1)>>>rdd4=rdd3.map(lambdax:x+1)>>>当调用action类型的算子时,构建的执行计划开始工作,此时返回的是具体的执行结果。
>>>rdd4.collect()[4,5,6,7]因此transformation算子会返回新的RDD,并记录RDD之间的依赖关系(专业术语叫血缘关系),可以想象成一条没有通电的流水线。而action算子返回的不是RDD,它相当于给流水线通上电,让整条流水线开始工作,返回的是执行结果。
说完了算子的基本概念,下面我们就来学习具体的常见算子。
该类型的算子在调用之后返回的依旧是RDD。
map算子
这个比较简单,就类似于Python的map。
>>>rdd=sc.parallelize([1,2,3,4])>>>rdd.map(lambdax:f"甜狗{x}号").collect()['甜狗1号','甜狗2号','甜狗3号','甜狗4号']>>>如果RDD有多个分区,那么每个分区都会执行map。
flatMap算子
该类型的算子和map类似,但它会做一些扁平化处理。
>>>rdd=sc.parallelize(["HelloPython","HelloRust"])>>>rdd.map(lambdax:x.split()).collect()[['Hello','Python'],['Hello','Rust']]>>>rdd.flatMap(lambdax:x.split()).collect()['Hello','Python','Hello','Rust']>>>当内部的元素是可迭代对象时,flatMap会将其展开,我们再举个例子。
>>>rdd=sc.parallelize(["abc","def"])>>>rdd.map(lambdax:x).collect()['abc','def']>>>rdd.flatMap(lambdax:x).collect()['a','b','c','d','e','f']reduceByKey算子
针对KV型RDD,会自动按照key进行分组,然后分别对组内数据(value)执行reduce操作。
#内部元素是二元元组的RDD,我们称之为KV型RDD>>>rdd=sc.parallelize([("a",1),("b",1),("a",2),("b",2),("c",4)])>>>rdd.reduceByKey(lambdax,y:x+y).collect()[('b',3),('c',4),('a',3)]比较简单,然后基于以上几个算子,我们来做一个词频统计的案例吧,假设有一个words.txt,内容如下:
hellopythonhellogolanghellorust我们来统计每个单词出现的次数。
>>>rdd=sc.textFile("hdfs://satori001:9000//words.txt")['hellopython','hellogolang','hellorust']#按照空格分隔>>>rdd=rdd.flatMap(lambdax:x.split())>>>rdd.collect()['hello','python','hello','golang','hello','rust']>>>rdd=rdd.map(lambdax:(x,1))>>>rdd.collect()[('hello',1),('python',1),('hello',1),('golang',1),('hello',1),('rust',1)]#分组,然后组内元素相加即可>>>rdd=rdd.reduceByKey(lambdax,y:x+y)>>>rdd.collect()[('python',1),('rust',1),('hello',3),('golang',1)]以上就是一个简单的词频统计,还是比较简单的,我们继续介绍算子。
mapValues算子
针对KV型RDD,但只对value做处理,key保持不变。
>>>rdd=sc.parallelize([("a",1),("b",1),("a",2),("b",2),("c",3)])>>>rdd.mapValues(lambdax:x+1).collect()[('a',2),('b',2),('a',3),('b',3),('c',4)]#该算子可以使用map替代>>>rdd.map(lambdax:(x[0],x[1]+1)).collect()[('a',2),('b',2),('a',3),('b',3),('c',4)]>>>groupBy算子
对RDD的数据进行分组。
>>>rdd=sc.parallelize(range(10))#相当于给每个元素打上了一个标签,然后相同标签的元素归为一组>>>rdd=rdd.groupBy(lambdax:"even"ifx%2==0else"odd")>>>rdd.collect()[('even',
>>>rdd=sc.textFile("hdfs://satori001:9000//words.txt")#分隔之后,进行分组>>>rdd=rdd.flatMap(lambdax:x.split()).groupBy(lambdax:x)>>>rdd.collect()[('python',
filter算子
类似Python的filter,用于对数据进行过滤。
>>>rdd=sc.parallelize(range(10))>>>rdd.collect()[0,1,2,3,4,5,6,7,8,9]#保留值为偶数的元素>>>rdd.filter(lambdax:x%2==0).collect()[0,2,4,6,8]distinct算子
对RDD进行去重,使内部的元素保持唯一。
>>>rdd=sc.parallelize([1,2,3,1,1,2,3,4])>>>rdd.distinct().collect()[2,4,1,3]去重之后不保证顺序。
join算子
对两个KV型RDD执行JOIN操作(类似于SQL)。
>>>rdd1=sc.parallelize([(1001,"a1"),(1002,"b1"),(1003,"c1")])>>>rdd2=sc.parallelize([(1001,"a2"),(1002,"b2"),(1004,"d1")])#key相同的value会被合并在一起>>>rdd1.join(rdd2).collect()[(1001,('a1','a2')),(1002,('b1','b2'))]#类似于SQL的LEFTJOIN>>>rdd1.leftOuterJoin(rdd2).collect()[(1001,('a1','a2')),(1002,('b1','b2')),(1003,('c1',None))]#类似于SQL的RIGHTJOIN>>>rdd1.rightOuterJoin(rdd2).collect()[(1004,(None,'d1')),(1001,('a1','a2')),(1002,('b1','b2'))]intersection算子
对两个RDD取交集。
>>>rdd1=sc.parallelize([1,2,3,4])>>>rdd2=sc.parallelize([3,4,5,6])>>>rdd1.intersection(rdd2).collect()[4,3]union算子
将两个RDD合并为一个RDD,相当于对两个RDD取并集。
>>>rdd1=sc.parallelize([1,2,3])>>>rdd2=sc.parallelize([11,22,33])>>>rdd3=rdd1.union(rdd2)>>>rdd3.collect()[1,2,3,11,22,33]不同类型的RDD也是可以合并的。
>>>rdd1=sc.parallelize([1,2,3])>>>rdd2=sc.parallelize(["a","b","c"])>>>rdd3=rdd1.union(rdd2)>>>rdd3.collect()[1,2,3,'a','b','c']glom算子
以分区为单位,对RDD的数据进行嵌套。
#默认是两个分区>>>rdd=sc.parallelize(range(1,10))>>>rdd.glom().collect()[[1,2,3,4],[5,6,7,8,9]]#改成三个分区>>>rdd=sc.parallelize(range(1,10),3)>>>rdd.glom().collect()[[1,2,3],[4,5,6],[7,8,9]]groupByKey算子
前面介绍过groupBy算子,它接收一个函数,来给内部的元素打上一个标签,然后相同标签的元素归为一组。
>>>rdd=sc.parallelize(range(10))>>>rdd=rdd.groupBy(lambdax:"even"ifx%2==0else"odd")>>>rdd.collect()[('even',
>>>rdd=sc.parallelize([("a",1),("a",2),("a",3),("b",1),("b",2)])>>>rdd=rdd.groupByKey()>>>rdd.collect()[('b',
>>>rdd=sc.parallelize(range(10))>>>rdd=rdd.map(lambdax:("even"ifx%2==0else"odd",x))>>>rdd.collect()[('even',0),('odd',1),('even',2),('odd',3),('even',4),('odd',5),('even',6),('odd',7),('even',8),('odd',9)]>>>rdd.groupByKey().mapValues(lambdax:list(x)).collect()[('even',[0,2,4,6,8]),('odd',[1,3,5,7,9])]当然用groupBy实现groupByKey也是没问题的。
>>>rdd=sc.parallelize([("a",1),("a",2),("a",3),("b",1),("b",2)])>>>rdd=rdd.groupBy(lambdax:x[0])>>>rdd.collect()[('b',
对RDD内的数据进行排序。
>>>rdd=sc.parallelize([-2,-3,-4,-1,3,4,2,1])#传入一个排序函数,这里先按照正负排序,如果符号相同则按照绝对值大小排序>>>rdd.sortBy(lambdax:(x>0,abs(x))).collect()[-1,-2,-3,-4,1,2,3,4]sortBy函数还可以接收一个ascending参数,表示是否升序(默认True),以及一个numPartitions参数,表示用多少个分区排序。如果你希望全局排序,那么分区数显然应该设置为1。
sortByKey算子
针对KV型RDD,按照key进行排序。
>>>rdd=sc.parallelize([(1,"a"),(3,"b"),(2,"c")])>>>rdd.sortByKey().collect()[(1,'a'),(2,'c'),(3,'b')]>>>rdd.sortByKey(ascending=False,numPartitions=2).collect()[(3,'b'),(2,'c'),(1,'a')]显然该算子完全可以用sortBy替代。
mapPartitions算子
它和map有点相似,但map作用于RDD的每个元素,而mapPartitions是作用于RDD的每个分区。也就是说,RDD每次会给map传递一个数据,而给mapPartitions则是以迭代器的形式传递一个分区。
>>>rdd=sc.parallelize([1,2,3,4,5,6],3)>>>rdd.map(lambdax:x+10).collect()[11,12,13,14,15,16]>>>rdd.mapPartitions(lambdax:[_+10for_inx]).collect()[11,12,13,14,15,16]所以对于上面的代码,map里的函数需要调用6次,因为有6个元素。但mapPartitions里的函数则需要调用3次,因为有3个分区。虽然从CPU执行层面来看没有什么差别,但当发生数据传输时,mapPartitions可以有效减少IO。
partitionBy算子
我们说RDD有5大特性,其中第四个特性是针对于KV型RDD可以自定义分区规则,而方式便是使用partitionBy。该算子接收两个参数:
注意分区函数必须返回一个整数,假设分区数为3,那么分区函数可以返回的值便是0、1、2,也就是返回分区号(从0开始)。同一分区号的元素会落在同一个分区当中,我们举例说明。
>>>data=[("a",1),("b",1),("c",1),("d",1),("e",1)]#按照哈希值%3分组的话,"a"、"c"、"e"是一组,"b"是一组,"d"是一组>>>[hash(item[0])%3foritemindata][2,1,2,0,2]>>>rdd=sc.parallelize(data)>>>rdd=rdd.partitionBy(3,lambdax:hash(x)%3)>>>rdd.glom().collect()[[('d',1)],[('b',1)],[('a',1),('c',1),('e',1)]]RDD的分区结果和预想的是一样的。
repartition算子
改变RDD的分区数,可以增加也可以减少,取决于传递的整数。和partitionBy不同的是,repartition算子只改变分区数,分区规则是不变的。但是需要注意:在操作分区数量时一定要慎重,因为改变分区会带来以下两个影响。
以上就是常见的transformation算子,下面介绍action算子。
该类型的算子在调用之后会返回具体的值,不再返回RDD。
countByKey算子
针对KV型RDD,统计key出现的次数。
>>>rdd=sc.parallelize([("a",1),("a",2),("a",3),("b",1),("b",2)])>>>rdd.countByKey()defaultdict(
这个算子我们一直都在用,它负责将RDD各个分区的数据收集起来,合并为一个列表。如果RDD内部的数据量很大,那么可能会撑爆内存。
reduce算子
类似Python的reduce。
>>>rdd=sc.parallelize([1,2,3,4,5])>>>rdd.reduce(lambdax,y:x+y)15>>>rdd=sc.parallelize(["a","b","c","d","e"])>>>rdd.reduce(lambdax,y:x+y)'abcde'fold算子
和reduce一样,对数据进行聚合,但可以接收一个初始值。首先在聚合的时候,每个分区内部会进行聚合,然后再对每个分区聚合的结果进行聚合,而初始值会同时作用在这两个步骤上。什么意思呢?我们举个例子。
>>>rdd=sc.parallelize([1,2,3,4,5,6],3)>>>rdd.fold(10,lambdax,y:x+y)61首先RDD有3个分区,每个分区内部会进行聚合,那么结果是3、7、11,但还要加上初始值,所以是13、17、21。然后每个分区聚合的结果再进行聚合,同时还要加上初始值,因此结果是10+13+17+21,等于61。
first算子
获取RDD的第一个元素。
>>>rdd=sc.parallelize([1,2,3,4,5,6],3)>>>rdd.first()1take算子
获取RDD的前N个元素。
>>>rdd=sc.parallelize([1,2,3,4,5,6])>>>rdd.take(4)[1,2,3,4]top算子
对RDD降序排序,选择前N个。
>>>rdd=sc.parallelize([1,2,3,4,5,6])>>>rdd.top(3)[6,5,4]count算子
返回RDD内部的元素个数。
>>>rdd=sc.parallelize([1,2,3,4,5,6])>>>rdd.count()6takeSample算子
对RDD的数据进行抽样,接收三个参数:
#前两个参数必传,随机种子一般不传,Spark会自动设置一个随机种子>>>rdd=sc.parallelize(range(10))>>>rdd.takeSample(False,5)[4,9,6,8,3]takeOrdered算子
对RDD升序排序,选择前N个。
>>>rdd=sc.parallelize([3,1,5,4,2])>>>rdd.takeOrdered(3)[1,2,3]如果想降序排呢?很简单,使用top即可。或者还可以使用sortBy+take。
>>>rdd=sc.parallelize([3,1,5,4,2])#升序排,选择前3个>>>rdd.sortBy(lambdax:x).take(3)[1,2,3]#降序排,选择前3个>>>rdd.sortBy(lambdax:-x).take(3)[5,4,3]foreach算子
对RDD的每一个元素都执行相同的操作(类似于map),但该函数没有返回值。
>>>rdd=sc.parallelize([1,2,3])>>>rdd.foreach(print)123foreachPartition算子
和foreach类似,但每次处理的是一整个分区的数据,相当于没有返回值的mapPartitions。
>>>rdd=sc.parallelize([1,2,3,4,5,6],2)>>>rdd.foreach(print)123456>>>rdd.foreachPartition(print)
将RDD数据写入到文件系统中,我们在介绍RDD的创建时说过。但需要注意:写入时文件不能存在,否则报错。
以上就是RDD的一些常见算子,还是比较多的。总之这些算子不建议去记,用的时候再去查就行了,而且事实上我们也很少会使用RDD编程,只不过它是上层结构的根基,所以还是有必要了解的。
RDD之间是具有血缘关系的,比如基于RDD1得到RDD2,基于RDD2得到RDD3,基于RDD3得到RDD4,那么当对RDD4执行collect算子的时候,整个RDD链条便会开始计算。那么问题来了,如果再基于RDD3得到RDD5,然后对RDD5执行collect算子时,会发生什么呢?
我们说过调用RDD的transformation算子,相当于在构建执行计划,调用action算子时才会开始执行。因此不管是RDD4还是RDD5,它们在调用collect方法时,都会从RDD1开始逐步执行。
如果执行的逻辑非常复杂,那么每次都从头执行是不是很耗时呢?所以便有了RDD的持久化。对于上面这个例子来说,我们完全可以将RDD3给持久化,这样RDD5调用collect方法时,就不用从头开始执行了。
所以Spark一个最重要的能力就是它可以通过一些操作来持久化(或者缓存)内存中的数据,当你持久化一个RDD,节点就会存储这个RDD的所有分区,以后可以直接在内存中计算、或者在其它的action操作时能够重用。这一特性使得之后的action操作能够变得更快(通常是10个数量级),因此缓存对于迭代式算法或者快速的交互式使用是一个非常有效的工具。
我们可以通过调用persist()或者cache()方法来持久化一个RDD,当第一次action操作触发时,所有分区数据就会被保存到其他节点的内存当中。并且SparkCache具有容错性:如果RDD的某个分区数据丢失了,那么会根据原来创建它的transformation操作重新计算。那么persist和cache有什么区别呢?我们看一下源码。
persist接收一个storageLevel参数,表示缓存级别,默认是MEMORY_ONLY,表示缓存在内存中。而cache则是直接调用了persist,所以如果不传参数,两者是一样的。而如果想缓存到磁盘,那么就需要调用persist方法,并且指定缓存级别。那么缓存级别都有哪些呢?
frompysparkimportStorageLevel#缓存在内存中StorageLevel.MEMORY_ONLY#缓存在内存中,2副本StorageLevel.MEMORY_ONLY_2#缓存在磁盘中StorageLevel.DISK_ONLY#缓存在磁盘中,2副本StorageLevel.DISK_ONLY_2#缓存在磁盘中,3副本StorageLevel.DISK_ONLY_3#先缓存在内存中,如果内存不够,再缓存在磁盘中StorageLevel.MEMORY_AND_DISK#先缓存在内存中,如果内存不够,再缓存在磁盘中,2副本StorageLevel.MEMORY_AND_DISK_2#缓存在堆外内存中(系统内存)StorageLevel.OFF_HEAP注意:persist是惰性的,只有在遇到一次action操作的时候,才会缓存RDD的分区数据。如果不想缓存了,可以调用unpersist,而unpersist是立刻执行的。我们举例说明。
#对于本地文件,也可以不指定file://,直接写成/root/1.txt>>>rdd=sc.textFile("file:///root/1.txt")#持久化RDD,默认保存在内存中,此时等价于rdd.cache()>>>rdd.persist()PythonRDD[24]atRDDatPythonRDD.scala:53#执行action操作>>>rdd.count()10000#我们查看一下文件的大小>>>importos>>>os.stat("/root/1.txt").st_size/102477.044921875然后我们访问webUI(端口是4040),点击Storage,可以查看缓存信息。
RDDName,这个就是我们的文件名;StorageLevel表示缓存级别,默认是基于内存的;CachedPartitions表示缓存的分区数,RDD默认有两个分区;SizeinMemory表示缓存在内存中的大小,比77K的源文件小很多,这是Spark基于缓存做的策略;SizeonDisk表示缓存在磁盘中的大小,因为没有缓存到磁盘,所以是0。
persist表示持久化,还可以调用unpersist取消持久化。
#取消RDD持久化>>>rdd.unpersist()file:///root/1.txtMapPartitionsRDD[11]attextFileatNativeMethodAccessorImpl.java:0>>>如果取消了,那么刷新页面,就没有内容了。
那么问题来了,persist支持这么多的缓存级别,我们应该使用哪一种呢?官方给出了如下建议。
最后保存RDD数据还可以使用checkpoint方法,但它只能保存在硬盘上(一般是HDFS),并且在设计上(被认为)是安全的。那么checkpoint和persist有什么区别呢?
>>>rdd=sc.textFile("file:///root/1.txt")#设置checkpoint的保存路径,如果是local模式,那么可以用本地文件系统#但如果是集群模式,一定要用HDFS>>>sc.setCheckpointDir("hdfs://satori001:9000/checkpoint")#保存起来>>>rdd.checkpoint()我们来看一下HDFS上面有没有数据。
显然成功保存了,这里再来对比一下checkpoint和persist的差异。
目前我们的代码都是通过pysparkshell执行的,但生产上肯定要编写相应的py文件,然后上传到Spark集群中运行。下面我们来看py文件的编写方式。
这个应该是最熟悉的模式了,在学习Spark或者本地开发的时候最为方便,直接搭建一个Spark环境就可以跑了。前面的pysparkshell,便是以local模式启动的。当然,提交作业也可以指定local运行模式。
spark-submit--masterlocal[*]--name万明珠main.py
--master负责指定运行模式,local[*]表示本地模式,并且使用全部的核,--name负责指定应用程序的名称。我们提交一下看看:
输出的内容非常多,程序的结果就隐藏在中间。
那么问题来了,如果代码逻辑比较复杂,涉及多个文件(目录)该怎么办呢?标准库里面的模块可以直接导入,但我们自己写的依赖怎么提交呢?首先多个文件(目录)里面一定存在一个启动文件,用来启动整个程序。假设这个启动文件叫main.py(当然启动文件应该在项目的最外层,如果在项目的包里面,那么它就不可能成为启动文件),那么把除了main.py的其它文件打包成一个zip包或者egg,假设叫做dependency.egg,然后执行的时候就可以这么做:
spark-submit--masterxxx--namexxx--py-filesdependency.eggmain.py
如果我们写的程序需要从命令行中传递参数,那么直接跟在main.py(启动文件)后面就行。
关于通过--py-files提交依赖,一会儿还会单独说。
然后spark-submit还支持很多其它参数,具体可以通过--help查看,不过很多都用不到。因为spark-submit不仅可以提交Python程序,还可以提交Java、Scala程序,所以里面有很多参数是给其它语言准备的,Python用不到。
当然啦,不管是local运行模式还是其它运行模式,提交方式都是一样的,所以程序中我们没有使用SparkConf指定运行模式,因为官方推荐在提交任务的时候通过--master指定。
standalone运行模式则是经典的master&slave模式,它是Spark自带的。所以搭建standalone要保证有多个节点,其中一个是master,剩下的属于worker(slave),下面我们就来演示如何搭建。
首先在$SPARK_HOME的conf目录下有一个spark_env.sh,我们之前还配置了Python环境,将这个文件打开。
#指定启动pysparkshell或提交Python任务时的Python解释器exportPYSPARK_DRIVER_PYTHON=/usr/bin/python3#Spark工作节点使用的Python解释器exportPYSPARK_PYTHON=/usr/bin/python3目前我们只配置了以上两个变量,PYSPARK_DRIVER_PYTHON负责指定启动pysparkshell或提交Python任务时的Python解释器,PYSPARK_PYTHON负责指定Spark工作节点使用的Python解释器。对于standalone模式来说,最好保证所有节点的解释器版本是相同的。
然后我们在此基础上继续进行配置,搭建standalone。
exportPYSPARK_DRIVER_PYTHON=/usr/bin/python3exportPYSPARK_PYTHON=/usr/bin/python3#配置JAVA_HOMEexportJAVA_HOME=/opt/jdk1.8.0_221/#配置master,让当前的satori001节点作为master#然后端口是默认值7077exportSPARK_MASTER_HOST=satori001exportSPARK_MASTER_PORT=7077然后配置workers,首先cpworkers.templateworkers,然后打开workers,里面只有一个默认的localhost。所以即便一个节点,我们依旧可以搭建standalone集群,只不过意义不太大。打开workers之后,在里面写上worker节点。
satori002satori003satori004satori005这里让satori001作为master,其它节点作为worker,当前的satori001节点已经配置完毕,然后通过scp命令将Spark目录拷贝到其它节点中。只要保证框架的版本相同,那么所有节点配置就都是一样的。
如果查看worker节点,会发现多出了Worker进程。
毫无疑问,Spark集群启动成功。
补充:虽然satori001是master,但它同时也可以做worker。如果在配置文件workers中将satori001也加进去,那么satori001节点也会启动Worker进程。只不过那样的话,satori001节点的压力就会比较大。
显示集群状态为ALIVE,并且有4个worker。
然后我们再来说一下,关于Spark端口的问题。目前已经出现了三个端口,分别是:4040、7077、8080,我们来解释一下这些端口的区别。
下面我们就走一个,启动pyspark,连接到Spark集群当中。
pyspark--masterspark://satori001:7077
此时我们就连接到了Spark集群中,再来看看集群的webUI。
注意看里面的ApplicationID和pysparkshell中的appid是一样的,好了,以standalone运行模式启动Spark集群、并使用pyspark去连接,我们已经知道了,那么如何提交任务呢?
答案很简单,把使用local模式提交作业的命令copy下来,然后将local[*]改成我们的Spark集群地址:spark://127.0.0.1:7077就能提交到standalone模式的Spark集群了。非常简单,我们测试一下。
任务显然是执行成功了的,但是日志信息太多了,程序的输出在下面,可以自己测试一下。
在使用Spark集群的时候,有一个很重要的地方需要注意,由于集群有多个节点,所以在读取文件的时候不要使用本地文件。因为在提交任务时,每个节点执行的代码肯定是相同的,那么在读取本地文件时就会从所在的节点中读取,这就需要我们保证每个节点都有相同的文件。所以在读取文件时,更推荐使用HDFS。
再来看一下yarn模式,它是使用Spark的公司采用最多的一个模式。使用yarn模式的时候,Spark充当一个客户端,它需要做的事情就是提交作业到yarn上,然后执行。这里的yarn指的就是Hadoop框架的YARN组件,因此为了更好地理解该运行模式,我们先来介绍一下YARN。在介绍Hadoop的时候我们已经说过YARN了,这里再来说一遍。
里面有很多角色,可以从两个层面进行分类。
资源管理层面
所以NM负责管理单个节点,而每个节点上都有一个NM,那么多个NM就能将所有节点的资源都管理起来,然后这些NM统一去向RM进行汇报。所以整个资源管理,就是RM配合一堆NM完成的。
但是光有资源还不够,因为最终的目的还是要完成计算的,所以必须要有干活的。
任务计算层面
所以整个执行流程如下:
我们再来总结一下这几个角色的作用。
ResourceManager
NodeManager
ApplicationMaster
Task
所以整个YARN的流程应该不复杂,RM管理全局资源,NM管理单个节点资源,AM管理单个任务,Task负责执行任务,Container代表了资源的抽象,AM和Task都运行在Container中。
说完了YARN的原理之后,我们再来看看SparkonYARN模式。前面我们介绍了SparkonStandalone,它需要有多个节点,但多个节点就意味着要多消耗资源,并且为了防止master单点故障,我们还要配置高可用(基于Zookeeper实现)。而服务器的资源总是紧张的,但对于大数据公司来说,基本上都会有Hadoop集群(YARN集群),在这种情况下如果再单独准备Standalone集群,那么对资源的利用率就不高了。
所以大部分公司都会将Spark运行在YARN集群中,因为YARN本身就是一个资源调度框架,负责对运行在内部的计算框架进行资源调度管理。而作为典型的计算框架,Spark本身也可以直接运行在YARN中,并接受YARN调度。因此,对于SparkonYARN来说,无需部署Spark集群,只需要有一个节点,充当Spark客户端,即可提交任务到YARN集群中运行。
下面我们启动pyspark,并连接到YARN集群。
pyspark--masteryarn
但如果你直接启动的话,是会报错的,因为不知道YARN集群的地址。根据错误提示,我们需要在spark-env.sh中配置HADOOP_CONF_DIR或这YARN_CONF_DIR。
#两个参数配置一个即可,当然两个都配置也没关系,值是一样的,都是配置文件所在路径exportHADOOP_CONF_DIR=/opt/hadoop-3.3.6/etc/hadoop/exportYARN_CONF_DIR=/opt/hadoop-3.3.6/etc/hadoop/话说为什么配置了这两个参数就没问题了呢?很简单,Spark会根据配置文件所在目录去找core-site.xml,里面有YARN的集群地址,同理也可以找到HDFS的集群地址。
启动成功,这个过程会有一些慢,而提交作业也很简单,只需要将--master指定为yarn即可。然后我们看一下YARN的webUI,端口是8088。
我们看到客户端应用程序已经正常地运行到YARN当中了,YARN已经做了记录,名称为PySparkShell,类型为SPARK。我们可以在pyspark里面编写代码,整个pysparkshell相当于提交到YARN的应用程序,里面编写的代码就是一个个子任务。
>>>sc.parallelize([1,2,3]).map(lambdax:f"甜狗{x}号").collect()['甜狗1号','甜狗2号','甜狗3号']这些任务可以通过4040端口查看,我们前面说过的。
有一个已完成的任务,就是我们上面刚执行的。另外我们也可以不用手动输入4040端口,在YARN的webUI页面中,可以直接点击跳转。
点击此处可以直接跳转到任务的详情界面,但是端口没有发生变化,这是YARN的跳转服务webproxyserver帮我们完成的,了解一下就好。
然后是提交作业,它和local、standalone没啥区别:
spark-submit--masteryarn--name万明珠--py-filesdependecy.eggmain.py但是在提交到YARN的时候,还可以通过--deploy-mode指定部署模式,可选值为client或cluster,比如:
那么问题来了,这两种部署模式有什么区别呢?
这里面出现了一些概念,我们马上就说,还有Spark的架构等等。目前只需要知道,运行模式不同对代码没有影响,我们的代码只用写一份,然后需要什么模式,直接--master指定即可。
在介绍运行模式的时候,我们提到了部署模式、Driver、Worker,它们都代表什么呢?下面就来剖析一下Spark的内部细节。
首先是整个Spark的功能结构,它包含SparkCore、SparkStreaming、SparkSQL、SparkGraphX、SparkMLlib,而后面四个部分都是建立在SparkCore之上。
SparkCore
Spark的核心,是Spark运行的基础。SparkCore以RDD为数据抽象,提供多种编程语言的API,可以通过编程进行海量离线数据的计算。
SparkStreaming
以SparkCore为基础,提供数据的流式计算。
SparkSQL
基础SparkCore,提供结构化数据的处理模块。SparkSQL支持以SQL语言对数据进行处理,本身针对离线场景,但基于SparkSQL又提供了StructedStreaming模块,能够以SparkSQL为基础,进行数据的流式计算。
SparkMLlib
以SparkCore为基础,进行机器学习计算,内置了大量的机器学习库和API算法,方便用户以分布式的模式进行机器学习计算。
SparkGraphX
以SparkCore为基础,进行图计算,提供了大量的图计算API,方便用户以分布式的模式进行图计算。
前面我们介绍了YARN的架构,下面再来看看Spark的架构。
Application
用户基于Spark开发的应用程序,比如我们前面编写的main.py。一个应用程序由多个作业(Job)组成,这些Job是通过对RDD的操作实现的。
Job
作业,当RDD调用action算子时,就会提交一个作业。作业由多个阶段(Stage)组成,这些阶段会基于数据的shuffle操作来划分,每个作业的目标是生成一个结果集。
Stage
阶段,阶段是作业的物理执行计划的一部分,一个阶段由一组并行执行的任务组成,这些任务在各自的分区上执行预定义好的一系列transformation操作。阶段的划分基于shuffle边界,即每次数据的重新分组和分布都会导致一个新的阶段的开始。
任务,Spark执行的最小单位。每个阶段包含多个任务,每个任务会对分布式数据集的一个分区执行相同的计算。所有任务并行执行,以实现高效的数据处理。
总结一下就是:一个应用程序(Application)由多个作业(Job)组成,每个作业通过action算子的调用产生。作业被分解为多个阶段(Stage),阶段的划分基于shuffle操作。然后每个阶段包含多个任务(Task),任务是并行执行的基本单位。
光用文字描述的话,不好理解,我们用代码来描述一下这个过程:
frompysparkimportSparkContextsc=SparkContext()rdd=sc.parallelize(range(30),3)rdd=rdd.filter(lambdax:x>5)rdd=rdd.map(lambdax:f"甜狗{x}号")rdd.collect()整个py文件就是一个Application,它的生命周期从提交开始,到运行结束。然后当调用rdd.collect()时,会产生一个Job,而Job由一个或多个Stage组成。但因为上面的例子中没有涉及到shuffle操作(比如reduceByKey、groupBy等),或者说没有产生数据交换,只是执行自己当前的分区数据,所以当前只有一个Stage。
因此上面的rdd.filter、rdd.map都位于一个Stage中,然后一个Stage由多个任务组成,每个任务对指定的分区执行计算。因为当前RDD有3个分区,所以会产生3个任务,每个任务在各自的分区中执行定义好的一系列transformation操作(这里是filter、map)。
因此关于Application、Job、Stage、Task这几个概念我们就说清楚了,我们再用一个生活中的例子对比一下。
把Application当成家庭大扫除,打扫房间、打扫厨房、打扫客厅就可以看成是Job。每个Job由多个Stage组成,比如打扫厨房包含:清理垃圾、擦除污渍、物品摆放整齐。然后一个Stage对应多个Task,因此家里的爸爸、妈妈、孩子就是Task,它们并行执行,换句话说就是各自负责一块区域。
好了,我们继续看Spark的其它角色。
DriverProgram
Driver是Spark应用程序的执行入口点,当使用spark-submit提交一个应用程序时,这个过程实际是在Driver中执行的。它会执行应用程序的main函数,并创建SparkContext对象。此外Driver还会将应用程序分解成多个Stage并调度给集群上的Executor执行,基于RDD的DAG计划执行的顺序,并根据shuffle操作将Job分解成多个Stage,每个Stage包含多个并行执行的Task。
Driver还负责监控和管理Executor的生命周期,包括向ClusterManager请求资源,以及在任务执行完毕或失败时释放资源。当然它还负责监控任务的执行情况,并在需要时重试失败的任务或Stage。
ClusterManager
集群管理器,一个用于在集群上分配资源的外部服务,比如要用多少CPU、多少内存等等。我们使用spark-submit的时候可以通过:--driver-memory、--driver-cores、--executor-memory、--executor-cores等等,指定要使用的资源。
Spark可以在多种集群管理器上运行,使其能够有效地利用分布式资源来执行并行计算任务。集群管理器的主要职责包括维护集群资源信息、分配资源给Spark应用程序以及监控和管理执行器(Executor)的生命周期。
Spark支持以下几种类型的集群管理器:
如果使用Standalone模式,那么这里的ClusterManager就可以认为是Master节点,它会启动一个Master进程(之前输入jps看到过),管理整个集群的资源。如果使用YARN模式,那么这里的ClusterManager可以认为是ResourceManager。
不同的集群管理器可能有不同的术语和架构,但在Spark集群中,ClusterManager(无论是SparkStandalone的Master进程,还是YARN的ResourceManager进程)都负责集群资源管理、作业调度和集群监控。
WorkerNode
工作节点,如果是Standalone模式,那么WorkerNode就是当前的工作节点,在工作节点上会启动一个Worker进程,负责管理节点资源,并启动Executor进程。如果是YARN模式,那么就是NodeManager。
每个工作节点上运行一个或多个Executor进程,这些Executor是由Driver动态启动的,用于执行Task。
Executor
一个进程,用于在WorkerNode上运行你的应用程序。它可以执行任务、将数据保存到内存或者磁盘上,每一个应用程序都有自己独立的多个Eexecutor。也就是说,一个Application可以对应多个Executor,但一个Executor只会对应一个Application。
在Standalone模式下,工作节点有Spark自己的集群管理器管理。但在使用YARN、Mesos或Kubernetes等集群管理器时,工作节点的资源(如CPU、内存)被这些外部管理器管理,而Executor则运行在这些资源管理器提供的容器(Container)中。
DeployMode
部署模式,我们说Spark使用Driver进程解析应用程序,将Job切分成Stage并调度Task到Executor执行。但Driver进程的运行地点有两种:客户端和集群内部,对应的部署模式为client和cluster。
默认情况下,Spark的会采用client部署模式运行作业,允许本地机器监控作业执行情况和查看日志输出。但在生产上,我们一定要用cluster部署模式。
关于Spark的角色就说到这里,内容还是比较多的,我们再来梳理一下。
假设用户有一个应用程序,包含了执行的数据处理逻辑,现在要提交到ClusterManager上运行。
由于提交是通过Driver实现的,因此必须要创建Driver,如果部署模式是client,那么Driver进程会在客户端启动,如果部署模式是cluster,那么ClusterManager会指定一个WorkerNode启动Driver。
Driver创建之后,会启动应用程序、创建SparkContext对象,并与ClusterManager建立通信。Driver会向ClusterManager请求资源来启动Executor,请求的资源包括CPU核心数、内存大小等,可以通过spark-submit命令参数或者通过SparkConf进行设置。
ClusterManager收到Driver的资源请求后,就会根据集群的资源情况和调度策略,在满足资源请求的工作节点上分配资源来启动Executor。一旦Executor进程在工作节点上被成功启动,它们就会和Driver建立通信,准备接收和执行任务。
Executor启动之后,会受到ClusterManager和Driver的监控。
随后Driver会解析Spark应用,生成相应的Stage,并将这些Stage包含的任务分配给Executor,在Executor内部启动线程池执行Task。同时Driver会密切监控Executor,如果发现某个Executor执行效率低,会将任务分配给其它的Executor。当执行计划中所有的Stage都执行完毕,各个Worker会向Driver汇报,并释放资源。Driver如果确定都做完了,会再向ClusterManager汇报作业完成,随后中止SparkContext并退出。
怎么样,现在是不是都串起来了呢?每个角色单独介绍的话确实不好理解,但如果将整个过程串起来应该就好理解多了。另外在应用执行的时候,这些信息都可以通过4040端口查看。
所以我们再回顾一下Spark运行模式,会发现运行模式不同,本质上就是角色运行的位置不同。
然后再看一下上面的架构图,从里面还可以得出一些信息。
最后我们将刚才的应用程序main.py,以cluster部署模式提交到YARN上面,并且提交的时候可以指定很多参数。
#--driver-memory
一切正常,但需要注意:在申请资源时,要保证资源充足,否则报错。另外这里将部署模式指定为cluster,那么程序中print打印的输出就看不到了,需要写到日志文件中。
基于以上学习的内容,我们提出两个问题。
先来解释第一个问题,对于SparkOnStandalone来说,需要有多个节点,其中一个节点是Master,剩余节点是Worker。然后在Master节点上会启动一个Master进程,管理整个集群;在Worker节点上启动一个Worker进程,管理当前节点。Driver和Executor都是单独的进程。
如果是SparkOnYARN,那么Master则由ResourceManager代替,Worker由NodeManager代替。Driver可以运行在Container中(cluster部署模式)或客户端进程中(client部署模式),Executor则全部运行在Container中。
那么为什么要选择YARN呢?其实原因前面说过了,为了提高资源利用率,在已有YARN的场景下,让Spark受到YARN的调度可以更好地管理资源并提高利用率。因为生产上不一定只有Spark作业,还可以有MapReduce作业,这在大数据公司是存在的。而这两种作业都可以跑在YARN上,因此就没有必要搞两套集群了,让它们统一接收YARN的调度即可。
Spark比MapReduce的效率要高很多,那么它们之间的差异主要体现在什么地方呢?
MapReduce
Spark
接下来我们说一说PySpark,这里PySpark是一个Python类库,和前面用的$SPARK_HOME/bin/pyspark不是一个东西。这里先来解释一下类库和框架的区别。
多说一句,Pandas在处理中小型数据集的时候非常方便,但如果是大型数据集,则需要使用Spark。并且Pandas和Spark在概念是存在交集的,我们后续会说。
所以bin/pyspark是一个客户端应用程序,提供交互式的Python客户端,用于编写Spark程序。而接下来要说的PySpark则是一个类库,我们需要再py代码里面导入它,使用它里面的功能。
PySpark类库已经内置了Spark框架所有的API,可以通过PySpark类库编写Spark应用程序,并提交到Spark集群运行。当然这里的集群可以是SparkOnStandalone集群,也可以SparkOnYARN集群。
frompysparkimportSparkContextsc=SparkContext(master="yarn",appName="万明珠")我们编写一段代码:
frompysparkimportSparkContextsc=SparkContext()#读取文件rdd=sc.textFile("hdfs://satori001:9000/words.txt")#对单词进行分割,然后扁平化rdd=rdd.flatMap(lambdax:x.split(""))#将单词包装成元组,其中key是单词,value是1rdd=rdd.map(lambdax:(x,1))#按照key来分组,计算所有value的和rdd=rdd.reduceByKey(lambdax,y:x+y)#将RDD的数据打印输出出来print(rdd.collect())sc.stop()以上我们就完成了词频统计,需要注意的是:
Driver是Spark应用程序的执行入口点,它会创建SparkContext对象。然后具体的任务是由Executor执行的,但我们看到读取文件调用的是SparkContext对象的方法,所以Executor必须要能拿到SC对象,于是Driver会对SC对象进行序列化然后发送给各个Executor。
那么问题来了,如果有多个Executor,那么在读文件数据时,多个Executor都会全量读取吗?显然不是的,它们会各自读取一部分。然后每个Executor对自己读取的部分数据进行计算,当执行reduceByKey时(涉及shuffle操作),多个Executor会传输彼此的数据,然后进行整体汇总。当reduceByKey执行完毕,数据还在Executor中,然后执行collect(),再将每个Executor的数据收集到Driver中。
所以整个Spark程序运行,可以分为以下几步:
整个过程从Driver开始,从Driver结束。所以我们不难发现,这和单机执行不同,虽然代码只有一份,但是执行会涉及多个Executor(可以位于多个节点中)。Spark会将代码进行分解,不同的部分交由不同的组件执行,而并行计算便是由Executor负责完成的。
Spark是一个典型的JVM框架,它底层用的是Scala语言,那为什么可以跑Python代码呢?肯定是Spark官方在背后做了很多工作,我们解释一下。
首先不管跑什么任务,都必须要创建Driver和Executor,前者负责执行环境入口的创建以及任务管理,后者负责执行任务。无论是跑Java、Scala还是跑Python,这两个角色都肯定是存在的。
图中的JVMDriver和JVMExecutor通讯,Driver要求Executor干活,Executor向Driver汇报,大家彼此合作。这一套对于Java任务没有任何问题,因为Spark的开发语言Scala本身就构建在JVM之上,但如果Python要混进来,是没办法直接工作的。Python虚拟机和JVM无法混在一起,不可能让JVM的Driver和Executor去执行Python代码。
因为Spark在诞生的时候,Driver和Executor就是基于JVM的,为Java平台而设计。现在要让它跑Python,该怎么办呢?总不能将Spark用Python再重新实现一遍吧。其实完全没有必要,在创建PythonDriver之后,将它翻译成JVMDriver不就行了。PythonDriver无法指挥JVMExecutor干活,因此它必须要翻译成JVMDriver才可以,而翻译的过程便是由Py4J完成的。
那么问题来了,PythonDriver可以翻译成JVMDriver,但PythonExecutor却无法翻译成JVMExecutor,该怎么办呢?于是在JVMExecutor所在的工作节点上会再启动一个pyspark守护进程,作为中转站,将JVMExecutor的工作指令发送给执行的Python进程。
相信整个过程应该很清晰了,整个过程如下:
因此在Driver端,Python代码会全部翻译好,跑的是JVMDriver。在Executor端,则没有完成翻译,而是通过pyspark中转,将JVMDriver调度JVMExecutor的指定转发给Python进程。
不难发现,在PythonOnSpark中,有两套语言在运行。Driver由JVM执行,Executor由Python虚拟机执行。
最后我们再来看看官方给的架构图和描述。
可以看到官方给的架构图要简单很多,并且描述也比较精炼,PySpark的本质就是在Spark架构的外层再包装一层API。因为Python很火,用Python开发Spark应用程序也是一个主流,但Spark官方不可能把底层源码用Python再实现一遍,所以选择了在Spark的外层再包装一层API。这种方式不仅成本低,也比较稳定,并且基于这种方式可以支持非常多的语言。
总结一下就是:如果用Java或Scala开发应用程序(本身就是为它们设计的),那么两个JVM直接玩。如果带上Python(还可以是其它语言,比如R),那么要在JVM的外面再包一层,前面的Driver走翻译,后面的Executor走Socket(中转调度)。
代码在集群上运行,并且代码只有一份,但运行是分布式的。在Spark中,非任务处理部分由Driver执行(非RDD代码),任务处理部分由Executor执行(RDD代码)。因为Driver只有一个,所以它是单机运行,而Executor可以有很多个,处理RDD要并行处理,所以Executor是分布式运行。
然后Driver端由JVM执行,Executor端由JVM做命令转发,底层还是由Python解释器进行工作。
所以一份代码,会被拆分成不同的部分,交给不同的组件执行。
然后再补充一个关键的地方,我们之前提交文件的时候,提交的都是单个文件,但如果提交的文件有依赖怎么办?关于这一点前面提到过,通过--py-files参数指定,但是里面有一些细节需要补充。
#文件名:data.pyDATA=["2020-11-12","2021-08-05","2022-11-28"]#文件名:logic.pyimportredefreformat_date(s):returnre.sub(r"(\d+)-(\d+)-(\d+)",r"\1年\2月\2日",s)#文件名:main.pyfrompysparkimportSparkContextfromdataimportDATAfromlogicimportreformat_datesc=SparkContext()rdd=sc.parallelize(DATA)rdd=rdd.map(reformat_date)print(rdd.collect())sc.stop()假设我们要提交main.py,但它依赖data.py和logic.py,那么在提交的时候如何指定呢?
spark-submit--masteryarn--name万明珠--py-filesdata.py,logic.pymain.py
通过--py-files指定依赖,多个文件之间用逗号分隔,注意:逗号和文件名之间不能有空格。
运行正常,通过--py-files指定的文件会在运行时被添加到sys.path中,从而让解释器在导入的时候能够找得到。当然了,由于我们是在main.py所在的目录中提交的,而依赖的data.py和logic.py也位于当前目录,所以此时不指定--py-files也是可以的。但如果当前目录找不到指定的依赖,那么就必须显式指定了。
指定依赖我们知道了,可要是依赖的文件非常多怎么办?难道要挨个指定吗?这种情况可以使用一种更简洁的做法,将依赖整体打包成一个zip文件或egg文件。
执行没有问题,这种做法在有大量依赖文件的时候会很方便。当然啦,.py和.zip也是可以共存的,比如我们只将logic.py打包。
[root@satori001~]#ziplogic.ziplogic.pyadding:logic.py(deflated14%)[root@satori001~]#spark-submit--masteryarn--name万明珠--py-fileslogic.zip,data.pymain.py将logic.py打包成logic.zip,然后和data.py一起指定,此时也是没有问题的。
在程序运行的过程中,可以多看看webUI。
在分布式计算中,任务会被分配到集群的不同节点上执行,而每个节点上的任务可能需要访问某些公共数据。如果这些数据在被任务访问之前都需要先独立传输,那么将会导致大量的数据冗余和网络通信开销。于是PySpark提供了两种类型的共享变量,来对数据共享和通信进行优化:
共享变量应用于分布式计算环境,负责在多个节点之间高效共享数据,下面就来看看具体细节。
广播允许程序将一个只读变量缓存在每个节点上,而不是在每个任务中重复发送该数据。广播变量使用的是一种高效的广播算法来分发数据,确保每个节点只接收一份数据副本,因此通过广播变量可以显著减少网络通信量和计算节点之间的数据传输,从而提高分布式计算的效率。
我们举个例子,来说明这一点。
frompprintimportpprintfrompysparkimportSparkContextsc=SparkContext()#每个元组对应学生的ID和姓名student_info=[(1,"古明地觉"),(2,"芙兰朵露"),(3,"琪露诺"),]#每个元组对应学生的ID、科目和成绩student_score=[(1,"语文",90),(1,"数学",92),(1,"英语",95),(2,"语文",85),(2,"数学",97),(2,"英语",91),(3,"语文",100),(3,"数学",9),(3,"英语",100),]#现在如果将student_score里面的ID换成学生的姓名该怎么办呢?rdd=sc.parallelize(student_score)rdd=rdd.map(lambdax:(dict(student_info)[x[0]],x[1],x[2]))pprint(rdd.collect())sc.stop()文件名为main.py,我们提交一下,看看是否正常运行。
运行是没问题的,然后我们观察一下上面的代码,如果是本地运行那没什么好说的,但如果是提交到Spark上分布式运行就不一样了。我们说非RDD代码是交由Driver运行的,所以student_info是Driver创建的一个本地列表,构建在Driver内部。但问题来了,Executor在运行RDD代码的时候依赖student_info,而它内部显然是没有的,因为student_info在Driver里面,它和Executor不在同一个地方,该怎么办呢?
对于Spark而言这个问题很简单,如果Executor需要,那么Driver将对象序列化之后再通过网络发过去就行了。
需要注意:Executor内部会启动线程池,多个Task可以并行运行在Executor中。如果分区数量大,导致Task非常多,那么也会分布在多个Executor中。我们就假设有4个Task,分别位于两个Executor中。然后Driver发送student_info,它会以Task为单位,所以student_info会被发送4次。
但这就有问题了,我们知道进程内的数据是可以被多个线程共享的,也就是说Executor进程只要收到一份数据,它内部的多个线程就都能拿到,完全没必要给每个线程都发一份。所以student_info只需要发送两次就可以了,而没必要发送4次,从而导致内存浪费以及额外的网络IO。
怎么解决呢?答案就是广播变量,在创建的时候告诉Spark一声,这是一个被Executor内部的多个线程共享的广播变量。当Executor内部的线程找Driver要变量时,Driver会先看自己之前有没有发给该Executor内部的其它线程,如果发过,那么就不发了。然后告诉该线程,自己已经发过了,你去找别的线程要吧。
就好比给班级发锦旗,这个锦旗是班级所有同学共享的。A班的同学做了好事,那么给A班发个锦旗,但如果A班的同学又做了好事,那么锦旗就不会再发了。同样的道理,当广播变量被发送之后,就会被Executor缓存起来,线程用的话直接用就行,Driver不会二次发送。
然后我们看看广播变量如何创建:
frompprintimportpprintfrompysparkimportSparkContextsc=SparkContext()#创建广播变量student_info=sc.broadcast([(1,"古明地觉"),(2,"芙兰朵露"),(3,"琪露诺"),])student_score=[(1,"语文",90),(1,"数学",92),(1,"英语",95),(2,"语文",85),(2,"数学",97),(2,"英语",91),(3,"语文",100),(3,"数学",9),(3,"英语",100),]#调用广播变量的value属性,可以将值取出来rdd=sc.parallelize(student_score)rdd=rdd.map(lambdax:(dict(student_info.value)[x[0]],x[1],x[2]))pprint(rdd.collect())sc.stop()我们说过,一份代码会被拆分成不同的部分,非RDD部分交给Driver执行,RDD部分交给Executor分布式执行。现在Driver创建了一个广播变量,它会将广播变量发给多个Executor,但一个Executor则只发送一次。当线程需要使用时,直接获取广播变量的value属性即可。
因为普通变量不管谁来获取,Driver都会直接发一份。但如果是广播变量,它会生成一个标记,保证同一个Executor只发送一次。
到这里估计有人会产生一个疑问,既然本地变量无法共享,那用RDD不也行吗。比如上面的student_info,将它包装成一个RDD,这样不就没问题了吗,不仅没有内存浪费,而且还是分布式运行的。表面上看起来是这样,但其实这么做反而会带来执行性能的降低。因为当student_info作为RDD分散在不同节点上,每个节点只存储了部分数据,为避免找不到key,需要使用join算子将两个分布式的RDD关联起来,而这必然会带来shuffle。
我们一直提到shuffle,目前只需要知道它是一个比较昂贵的操作,涉及到数据的传输,一会儿单独介绍它。
我们再举个例子解释一下:
假设两个RDD的分区数都是三,数据分布如图所示,在对student_score做map的时候,需要将student_info的每个分区的数据都发送过去,这里面涉及了9次数据交换(网络传输)。当然我们这里只是举例,数据量还很小,如果是上T的数据量,那么shuffle的影响就会立即凸显出来。
总结:当数据量不大(比如几百条、几千条、几万条)时,建议使用广播变量。当数据量很大时,建议使用分布式RDD。
说完了广播变量,再来看看累加器。累加器提供了一种跨任务累加数据的方式,主要用于计数或求和等操作。累加器的更新只在各节点的任务中进行,只有Driver可以读取累加器的值。
我们举例说明:
frompysparkimportSparkContextsc=SparkContext()#两个分区rdd=sc.parallelize(range(1,11),2)#定义一个全局变量count=0#定义一个map函数defmap_func(x):globalcountcount+=1print(f"count={count}")returnf"甜狗{x}号"rdd=rdd.map(map_func)print(rdd.collect())print(f"finalcount={count}")你觉得执行这段代码会输出什么结果呢?首先说一句,这段代码是可以直接在本地执行的(python3xxx.py),只要本地有Java环境。这种做法和启动pysparkshell的原理是类似的,一般用于学习或者做测试用,生产上我们肯定还是提交到Spark集群上面的。
我们发现一些有意思的地方,函数里面的print(count)并没有依次输出1~10,而是将1~5输出了两次。因为map_func是在Executor里面执行的,而count变量是定义在Driver里面的。由于RDD有两个分区,那么会有两个线程在执行,Driver会分别把count发给两个线程。每个线程map五次,最终两次打印1~5,因为这两个count是独立的。
不仅如此,发送给Executor内部线程的count和Driver里的count也是独立的。不管Executor内部对count做了什么操作,都不影响外层Driver的count,因此最终打印了0。
可能有人觉得这是因为整数是不可变对象导致的,如果换成列表呢?答案是结论不变。不管是可变对象还是不可变对象,表现都是一致的。因为多个Task可能会位于不同的进程当中,所以Driver会将整个对象序列化之后发送过去,而不是简简单单增加一个引用计数。
因此当我们想累加数据的时候,普通的写法是行不通的,而是要使用累加器。
frompysparkimportSparkContextsc=SparkContext()rdd=sc.parallelize(range(1,11),2)#这里不再使用普通变量,而是使用累加器变量count=sc.accumulator(0)#定义一个map函数defmap_func(x):globalcountcount+=1print(f"count={count}")returnf"甜狗{x}号"rdd=rdd.map(map_func)print(rdd.collect())print(f"finalcount={count}")在任务里面给累加器变量加1的时候,Driver里的累加器变量会同步进行修改。
这就是累加器的魔力,可以在分布式的场景下完成数据的累加。如果你想拿到累加器里面的值,那么获取它的value属性即可。
但还存在一个问题,为啥Driver里面的累加器的值是10,而两个任务还是输出的1~5呢?难道整体不应该输出1~10吗?其实这和累加器的设计有关,累加器只能在Executor上执行累加操作,在Driver上执行读取操作。这种设计是为了优化分布式计算的性能,减少网络通信的开销。
当Driver将累加器传递给两个不同的任务,每个任务分别执行5次自增操作时,这些自增操作是并行执行的。每个任务对累加器的修改是局部的,直到任务完成后,这些修改才会被合并到Driver的累加器中。因此,虽然每个任务都会将累加器从0增加到5,但这是在它们各自的执行上下文中发生的,而这些局部的累加值会在任务结束时合并到Driver的累加器中,最终得到总和为10。
所以我们可以得出结论:累加器的更新(在Executor上)和读取(在Driver上)是分离的,任务中看到的累加器值不是全局累加器值,而是该任务在执行过程中对累加器进行操作的局部视图。每个任务的累加器操作是独立的,它们不会看到其它任务对累加器的修改。所有任务完成后,各自对累加器的修改会被汇总到Driver的累加器变量里。
但是在使用累加器的时候,有一个陷阱需要注意。
frompysparkimportSparkContextsc=SparkContext()rdd1=sc.parallelize(range(1,11),2)count=sc.accumulator(0)defmap_func(x):globalcountcount+=1rdd2=rdd1.map(map_func)rdd2.collect()print(count)#10rdd3=rdd2.map(lambdax:x)print(count)#10rdd3.collect()print(count)#20我们看到调用rdd3.collect()之后,count的值变成了20,这是怎么回事?很简单,当调用rdd2.collect()之后数据就已经被收集起来了,那么在RDD链条上,rdd2以及它之前的RDD数据就不存在了,因为RDD是过程数据。
随后调用rdd3.collect()表示又要收集数据,但之前的rdd2数据已经不存在了。于是Spark会根据血缘关系,从头开始计算,rdd1=>rdd2=>rdd3,而在这个过程中,显然又执行了一次rdd1.map(map_func),所以累加器的值变成了20。
因此要理解RDD是过程数据这一概念,假设有如下RDD。
rdd1=sc.parallelize(...)rdd2=rdd1.map(...)rdd3=rdd2.map(...)rdd4=rdd3.map(...)#调用rdd4.collect()的时候,会真正开始计算#rdd1=>rdd2=>rdd3=>rdd4rdd4.collect()#但在调用rdd4.collect()之后,整个RDD链条上的数据就没了rdd5=rdd3.map(...)#Spark会找到依赖关系,得出rdd1=>rdd2=>rdd3=>rdd5#于是会从rdd1生成rdd2开始再执行一遍rdd5.collect()所以RDD是过程数据,transformation算子也只是在构建执行计划,遇见action算子之后才开始执行。在执行的过程中生成的RDD不会单独保存数据,如果要调用action算子收集或统计数据,那么只能根据血缘关系从头再来一遍。
frompysparkimportSparkContextsc=SparkContext()rdd1=sc.parallelize([1])rdd2=rdd1.map(lambdax:print("a"))rdd3=rdd2.map(lambdax:print("b"))rdd4=rdd3.map(lambdax:print("c"))#rdd1=>rdd2=>rdd3=>rdd4rdd4.collect()"""abc"""#rdd1=>rdd2=>rdd3=>rdd5rdd5=rdd3.map(lambdax:print("d"))rdd5.collect()"""abd"""但如果不想重头再来怎么办呢?还记得RDD的缓存吗,我们可以cache或persist一下。比如调用rdd3.cache(),但要在rdd4.collect()之前调用,那么后续在调用rdd5.collect()的时候,就只会打印出d,而不是abd。
接下来聊一聊关于Spark内核的问题,这一部分有助于你对Spark运行原理的理解,而且在面试中也是经常会被问到的。
下面我们来逐一介绍。
Spark的核心是基于RDD实现的,而SparkScheduler便是其核心实现的重要一环,作用是任务调度。Scheduler会根据RDD之间的依赖关系构建DAG,基于DAG划分Stage,然后将Stage中的任务发送到指定节点运行。基于Spark的任务调度原理,可以合理规划资源利用,做到尽可能用最少的资源去高效地完成任务计算。
我们说每调用一次transformation算子都会生成一个新的RDD,整个过程相当于在构建执行计划,而这个执行计划便叫做DAG(有向无环图)。
DAG(有向无环图)从字面意思上来理解,就是有方向并且没有形成闭环的执行流程图。
以上便是一个DAG流程图,它标记了执行方向,并且没有形成闭环。
DAG的作用就是标识代码逻辑的执行流程,指示程序每一步应该做什么。另外通过4040端口可以进入webUI,查看任务执行的详细信息,而页面中便包含了相应的DAG。我们举个例子:
importtimefrompysparkimportSparkContextsc=SparkContext()rdd1=sc.parallelize(range(10))rdd2=rdd1.map(lambdax:x+1)rdd3=rdd2.filter(lambdax:x%2==0)rdd4=rdd3.map(lambdax:x*5)rdd4.collect()time.sleep(60*5)这里需要单独说明一下,为什么程序的结尾要多一个time.sleep。首先SparkContext对象在创建时,会启动一个webUI,端口默认是4040。如果在同一节点上创建了多个SparkContext对象,那么相应的webUI端口会加1,比如第二个SparkContetx对象的webUI端口是4041,第三个则是4042,依次类推。当然同一个应用程序,应该保证只有一个SparkContext对象。
那么问题的关键来了,既然SparkContext对象在创建时会启动webUI,那如果它被销毁了,这个webUI是不是就关闭了。答案确实如此,因此这里使用time.sleep让程序不结束,保证SparkContext对象在内存中存活。
显然这种做法是比较low的,而且使用4040只能查看当前执行的任务,之前已完成的任务就看不到了。因此Spark还支持我们配置历史服务器,所有执行完毕的任务都会被单独保存起来。这样我们就不用使用time.sleep了,等任务执行完毕后直接去历史服务器查看即可,至于历史服务器如何配置,我们稍后再说。
执行程序之后,访问webUI。
以上就是Spark为我们提供的可视化DAG流程图,但我们好像并没有看到map和filter这些算子。原因很简单,因为生成RDD之后的map和filter操作都是窄依赖(一会详细解释),这意味着没有shuffle发生,所以它们没有显示。
SparkwebUI给出的DAG没有体现出分区,但我们要知道DAG中的操作都是在每个分区上并行执行的。
然后再来看看Job和action算子之间的关系,前面说了,一个应用程序可以被划分为多个Job,那么这个划分是基于什么呢?我们知道调用transformation算子相当于在构建执行计划,而action算子则是开关,调用action算子时,整个RDD链条会开始执行,此时就会产生一个Job。
总结:一次action算子调用,会产生一个Job,而每个Job会对应各自的DAG。
importtimefrompysparkimportSparkContextsc=SparkContext()rdd1=sc.parallelize([1,2,3,4,5])rdd2=rdd1.map(lambdax:x+1)#对应一个Jobrdd2.collect()rdd3=rdd2.map(lambdax:x+1)#对应一个Jobrdd3.collect()rdd4=rdd3.map(lambdax:x+1)#对应一个Jobrdd4.collect()time.sleep(60*5)代码中出现了3个action算子,那么应该会划分三个Job,我们提交到集群中,然后查看webUI看看是不是这样。
我们看到有三个Job,每个Job由action算子触发,webUI中也标识了每个action算子位于哪一行,点击之后便可查看对应的DAG。所以DAG是绑定在Job上面的,它是RDD的逻辑执行图,为了构建物理上的Spark详细执行计划而生。
RDD五大特性之一便是具有依赖关系(血缘关系),但依赖关系也分两种:窄依赖和宽依赖。
先来看看窄依赖:
像map、filter、flatMap、union等等,它们都是窄依赖。窄依赖的特点是父RDD的一个分区,最多发给子RDD的一个分区,所以窄依赖是可以像流水线一样,一直往下走,过程非常简单。并且窄依赖始终在内存中进行,如果是MapReduce,那么每一步都需要先落盘。
而对于宽依赖,父RDD的一个分区,会发送给子RDD的多个分区,也就是父RDD的一个分区会被子RDD的多个分区所使用。
不难看出,如果是窄依赖,那么子RDD在分区数据丢失之后,直接根据父RDD对应的分区进行计算即可。如果是宽依赖,那么子RDD在分区数据丢失之后,再根据父RDD重新计算则是一件比较麻烦的事情,因为它来自父RDD的多个分区,会涉及到shuffle操作。那什么是shuffle呢?
shuffle是Spark用于重新分配数据的一种机制,以便对不同partition里面的数据进行分组。比如reduceByKey,它会将所有key相同的value都组合在一起,但由于RDD有多个分区,那么在组合的过程中肯定要发生数据交换,这会涉及到数据序列化、网络IO等等,因此shuffle是一个比较昂贵的操作。
我们最后再用一张图,来展示一下窄依赖、宽依赖、以及shuffle操作。
reduceByKey在汇总数据时,会先在每个分区内部汇总,然后整体再全局汇总。
整个过程还是比较好理解的,然后再来说说Stage的划分。对于Spark来说,会根据DAG中的宽依赖,划分出不同的阶段(Stage)。方式是从后向前,遇到一个宽依赖便划分出一个阶段,称为Stage。从上图中可以看到,基于宽依赖(或者说shuffle),将DAG(Job)划分成了两个阶段,而在每个阶段内部一定都是窄依赖。
因此一个Stage的边界往往是从某个地方取数据开始,到shuffle结束。
我们实际测试一下:
importtimefrompysparkimportSparkContextsc=SparkContext()#补充:如果是local模式执行,那么读取本地文件和HDFS文件都行#但如果是提交到集群(Standalone或YARN),那么一定要使用HDFSrdd1=sc.textFile("hdfs://satori001:9000/text.txt")rdd2=rdd1.flatMap(lambdax:x.split(""))rdd3=rdd2.map(lambdax:(x,1))rdd4=rdd3.reduceByKey(lambdax,y:x+y)rdd4.saveAsTextFile("hdfs://satori001:9000/result")time.sleep(60*5)我们提交到集群,看一下DAG:
因为存在一个宽依赖(shuffle),所以Job被划分成了两个Stage。
在介绍内存迭代计算之前,先来补充一下前面提到的历史服务器。SparkContext对象在创建时会启动一个webUI,上面展示了很多有用的信息,其中包括:
但当程序结束,SparkContext对象被销毁,这个webUI就看不到了,所以我们需要配置历史服务器。首先在Spark配置文件目录中有一个spark-defaults.conf.template,我们拷贝一份。
cpspark-defaults.conf.templatespark-defaults.conf然后打开文件、设置参数。
#启用事件日志记录功能spark.eventLog.enabledtrue#日志的存储路径,但该路径不会自动创建#如果HDFS上没有spark_log,那么会报错,因此需要事先创建spark.eventLog.dirhdfs://satori001:9000/spark_log#表示历史服务器从哪个目录中获取已完成任务的事件日志spark.history.fs.logDirectoryhdfs://satori001:9000/spark_log#历史服务器的端口号,默认是18080spark.history.ui.port18080#设置日志清除周期spark.history.fs.cleaner.enabledtruespark.history.fs.cleaner.interval1dspark.history.fs.cleaner.maxAge7d然后别忘记在HDFS上创建相应的目录:hdfsdfs-mkdir/spark_log,创建之后启动历史服务器:start-history-server.sh(位于$SPARK_HOME的sbin目录中)。
历史服务器启动之后,会监听18080端口,不过在查看之前,先随便跑两个任务。
上面显示日志记录在HDFS文件系统的/spark_log目录中,如果你的作业是凌晨跑的,那么第二天上班的时候也能看。我们注意到在左下角还有showincompleteapplications,显示未完成的任务,因此即便任务挂掉了也是可以看到信息的,这就很方便了。
如果停止历史服务器,可以使用stop-history-server.sh。
好了,回归正题,我们来说内存迭代计算。
我们一直说Executor内部会启动线程池并行执行,而这个并行便体现在分区上。比如RDD有三个分区,每个分区对应一个Task线程,那么在单个Stage中,一个分区的所有transformation算子都由一个线程来完成(保证每个分区的迭代计算都是基于内存)。比如:
假设RDD的分区数为N,那么在一个Stage里面会有N条流水线,每一条都是纯内存计算。比如上图,三个Task线程便形成了三个并行的计算管道。但是当出现shuffle时,会产生数据传输,这是不可避免的。
Spark默认受到全局并行度(一会儿说)的限制,除非个别算子有特殊分区的情况,大部分的算子都会遵循全局并行度的要求,来规划分区数。比如全局并行度是3,那么大部分算子的分区就是3。总之在Spark中我们只推荐设置全局并行度,不要在算子上再设置并行度,否则很容易产生shuffle。而一旦产生shuffle,那么就相当于在基于内存的计算管道上面切了一刀,导致部分计算不能走纯内存,从而影响性能。当然一些排序算子除外,计算算子让它默认选择分区数就行了。
现在如果问你为什么Spark比MapReduce快,相信你一定知道原因,可以从以下两个方面回答:
我们举例说明,首先是在配置文件中设置。
#修改spark-defaults.confspark.default.parallelism100在客户端参数中:
spark-submit--masteryarn--conf"spark.default.parallelism=100"在代码中设置:
conf=SparkConf()conf.set("spark.default.parallelism",100)当然啦,我们前面在通过sc.parallelize()和sc.textFile()创建RDD的时候,也可以指定并行度,或者说分区数(优先级最高)。如果没有指定,那么会读取spark.default.parallelism参数,因此该参数指定的并行度是全局的,也叫全局并行度。
如果都没有指定,那么Spark默认会设置一个合适的并行度。
Spark推荐设置全局并行度,假设并行度为3,那么创建的RDD就会被划分为3个分区。但是我们不要在程序运行过程中擅自修改RDD的分区(比如调用repartition算子),因为会影响内存迭代管道的构建,以及产生额外的shuffle。
那么在集群中,应该如何规划并行度呢?结论:并行度应该设置为CPU总核心数的2~10倍,比如集群的可用CPU核心是100个,那么并行度应该设置为200~1000,下面解释一下原因。
Spark的任务由Driver进行调度,流程如下:
前4步都是Driver的工作,最后一步是Worker的工作。然后在里面出现了DAGScheduler和TaskScheduler,它们是做什么的呢?
画张图解释一下:
图中的DAG基于shuffle被划分为两个阶段,保证每个阶段内部的操作都是窄依赖的,不会再出现shuffle。然后RDD是有分区的,针对每个分区会在Executor的线程池中启动一个子线程,然后在同一个Stage内部,每个分区上的所有transformation算子都会由同一个线程执行,保证每个分区的迭代计算都是基于内存的,即内存迭代计算。至于不同分区,则会交给不同的线程,这些线程组成了并行计算管道,每个管道负责各自分区的数据迭代与转换。
所以从图中可以看到,在一个Stage内部,一个分区上的所有转换操作(内存迭代计算)便构成了一个Task。由于图中的RDD是三分区,DAG被划分为两个Stage,因此总共会有6个Task,每个Task由单独的线程执行。
而以上我们说的这些便是DAGScheduler要做的事情,它要基于DAG以及宽窄依赖,计算出需要分配几个Task,每个Task都负责哪些工作,以及Task之间如何交互。
DAGScheduler在划分Task的时候,不会考虑Executor。换句话说,不管你启动时通过--num-executors参数指定多少个Executor,对于同一份DAG,划分的结果不变。
等到DAGScheduler将Task划分好,接力棒便会交给TaskScheduler,它会根据已有的Executor数量、可用资源、负载等,将Task调度到合适的Executor上,并对其进行监控。注意:这个调度并不是随随便便调度,我们知道当出现shuffle的时候,那么Task之间是需要数据交互的。显然对于需要数据交互的Task,它们应该被调度到同一个Executor中,这样便可以走内存,而不是网络IO。
一般来说,Executor的数量应该和WorkerNode的数量保持一致,也就是一个工作节点上部署一个Executor即可。
所以DAG调度器就类似于老板,负责根据计划指定任务。Task调度器类似于总监,负责将任务分配给具体的人,并定期询问进度。
接下来聊一聊SparkSQL,从名字上来看显然是让我们像写SQL一样去编写Spark应用程序。但Saprk并不仅仅是SQL,SQL只是Spark提供的功能之一。
想想Hive,它们存在的意义都是类似的。如果使用MapReduce编程的话,需要会Java;使用Spark编程的话,虽然简单,但也需要你会Scala、Java、Python等编程语言中的一种。而SQL则是真的"老少咸宜",并且它已经成为了事实上的一个标准,如果一款框架能让你像写SQL一样编写程序的话,那么它一定是非常受欢迎的,就类似于Hive一样。
目前我们已经知道为什么要有SQL了,而在大数据领域可以基于SQL的框架还有其它的,这些框架我们也称之为SQLonHadoop,因为数据存储在Hadoop的HDFS之上,并且支持SQL。
ApacheHive
那么Hive都有哪些功能呢?
ClouheraImpala
这个是Cloudera公司开发的,很多公司都是采购他们的CDH。对于impala而言,也是使用SQL,只不过它不是把SQL运行在MapReduce之上,而是使用了自己的守护进程。一般情况下,这些进程显然要和DataNode安装在同一节点上,因为要读数据。
Impala特点如下:
Spark的一个子模块,让SQL跑在Spark引擎上,这也是我们即将介绍的。
Presto
一个基于SQL的交互式查询引擎,可以和Hive共享元数据信息,但它主要是提供了一些连接器,通过这些链接器,可以查询Hive、Cassandra等框架里面的数据。
Phoenix
HBase的数据主要基于API来查询,这个过程还是比较费劲的,而Phoenix支持使用SQL来查询HBase的数据。
Drill
支持HDFS、Hive、SparkSQL等多种后端存储,并进行数据处理。
关于SparkSQL,有很多人认为它就是一个单纯的SQL处理框架,这是一个典型的误区。SparkSQL的官方定义是,一个用于处理海量结构化数据的Spark子模块,特点如下:
SparkSQL的应用不局限于SQL,还支持Hive、JSON、Parquet文件的直接读取以及操作,SQL仅仅是SparkSQL的功能之一而已。
Hive和SparkSQL都属于分布式SQL计算引擎,是构建大规模结构化数据计算的绝佳利器,那么这两者的差异体现在哪呢?
SparkSQL的使用率一直处在领先的位置,但因为历史原因,Hive仍占有一席之地。
我们之前处理数据是通过RDD实现的,而数据抽象除了RDD还有DataFrame,当执行SparkSQL时就会返回一个DataFrame。
SparkSQL执行时返回的DataFrame和Pandas的DataFrame在结构上是一致的,区别就是SparkDataFrame是分布式的,而PandasDataFrame是单机的。当然,Spark当中其实有三种数据抽象:
DataFrame和DataSet,最终还是要被翻译成RDD,所以RDD是Spark的核心。实际上,RDD支持的操作已经不少了,但生产中我们还是很少直接使用RDD进行编程,而是使用DataFrame和DataSet。
因此用Python开发Spark程序,我们会用到两种结构,分别是RDD和DataFrame。这两种结构的底层逻辑是相似的,因为DataFrame本身就基于RDD实现,只是DataFrame更专注于二维结构化数据。
所以如果你的数据是二维表结构,那么推荐使用DataFrame,当然RDD也是可以的,只是肯定没有使用DataFrame方便。那么问题来了,对于同一份数据,使用RDD存储和使用DataFrame存储有什么差异呢?
DataFrame只能按照二维表的格式存储数据,而RDD则是存储对象本身(格式可以有多种,一般是列表)。
那么问题来了,DataFrame和RDD,哪一种结构更适合SQL呢?其实答案前面已经给出了,因为SparkSQL执行之后返回的是DataFrame,那么肯定是DataFrame更适合SQL。至于原因也很简单,SQL操作的数据是二维表结构,和DataFrame是一致的,毕竟SQL的含义就是结构化查询语言。而RDD存储的是对象,除了列表,还可以是字符串,甚至是自定义类的实例,这些SQL是无法处理的。
在介绍RDD时,我们说程序的入口对象是SparkContext,但Spark在2.0版本推出了SparkSession,作为Spark编码的统一入口对象。基于SparkSession,可以做到以下两点:
所以后续代码的执行环境入口对象,统一使用SparkSession。
frompyspark.sqlimportSparkSessionsession=SparkSession.builder.\master("yarn").\appName("万明珠").\config("spark.sql.shuffle.partitions",4)\.getOrCreate()以上我们就创建了SparkSession对象,其中master方法用于指定运行模式,appName方法用于指定应用程序名称,它们都调用了config方法。
事实上我们提交任务时也是如此:
#以下两种方式是等价的spark-submit--masteryarn--name万明珠main.pyspark-submit--confspark.master=yarn--confspark.app.name=万明珠main.py所有的配置都通过--conf指定,但为了简便,Spark也提供了单独的参数,比如--master和--name。关于提交任务这里多补充了一下,并且记得之前说过,运行模式和应用名称不建议在代码中硬编码。
frompyspark.sqlimportSparkSession#所以直接调用getOrCreate创建即可session=SparkSession.builder.getOrCreate()print(session)"""
显然是有的,名字叫spark,并且通过sparkContext属性可以拿到SparkContext对象。然后SparkSession作为SparkSQL编程的入口对象,显然SQL语句就是由它来执行的,但是SparkSession对象不仅可以执行SQL,还可以读取各类文件,返回DataFrame。
所以SparkSQL不仅仅是SQL,官方将它定义为Spark的一个子模块,除了SQL还可以处理各种文件。
下面就来用SparkSession对象读取数据源,创建DataFrame。关于数据源,支持的种类非常多,比如本地、HDFS、亚马逊S3、阿里OSS、腾讯COS、RDBMS等,而数据的载体可以是文本、JSON、Parquet、JDBC等。当数据被读取进来之后,会得到DataFrame,然后对它进行操作即可。
我们举例说明,目前在HDFS上有一个文本文件data.csv,内容如下:
17,female,古明地觉400,female,四方茉莉18,female,椎名真白我们将它读取出来,得到DataFrame。
frompyspark.sqlimportSparkSessionsession=SparkSession.builder.getOrCreate()df=session.read.csv("hdfs://satori001:9000//data.csv",sep=",",#单元格分隔符lineSep="\n",#行分隔符header=False,#CSV是否存在表头)#然后给DataFrame指定一个列名df=df.toDF("年龄","性别","姓名")#打印df的结构df.printSchema()#打印df的内容df.show()提交到YARN上运行。
结果没有问题,并且DataFrame还可以注册成一个临时的视图,从而支持SQL。
为了方便查看结果,我们使用pysparkshell。
>>>df=spark.read.csv("hdfs://satori001:9000//data.csv",sep=",",header=False)>>>df=df.toDF("年龄","性别","姓名")#将结果集注册成一个临时视图>>>df.createTempView("girls")#通过SQL进行查询>>>spark.sql("SELECT*FROMgirlsWHERE`年龄`>=18").show()+----+------+--------+|年龄|性别|姓名|+----+------+--------+|400|female|四方茉莉||18|female|椎名真白|+----+------+--------+是不是很方便呢?如果你熟悉SQL的话,那么将结果集注册成一个临时视图,然后编写SQL查询即可。当然如果不喜欢SQL风格,那么还有DSL风格,直接对DataFrame本身做操作。
非常方便,至于选择哪种方式看你自身喜好。然后关于DataFrame更详细的操作,一会儿介绍DataFrame的时候再说,目前我们只需要知道SparkSession对象可以读取很多的数据源,这些数据在读取进来之后会得到DataFrame。
数据处理分为三步:读取数据,处理数据,导出数据,而对于SparkSQL来说,也是如此。
下面我们就围绕着这三步,来好好地聊一聊。
DataFrame是一个二维表结构,任何一个二维表都有绕不开的三个属性:行、列、表结构描述。正如MySQL的表一样,表由许多行组成、表具有多个列、表也有相应的结构信息(列、列名、类型、约束等)。基于这个前提,DataFrame的组成如下:
在结构层面
在数据层面
在表结构层面,DataFrame由StructType描述:
frompyspark.sql.typesimportStructType,StructField,IntegerType,StringTypestruct_type=StructType([#可以传递三个参数:列名、列的类型,是否允许为空(默认允许)StructField("id",IntegerType(),False),StructField("name",StringType(),True),StructField("age",IntegerType(),True),])#也可以单独添加,由于add方法会返回self,因此可以链式调用#比如struct_type=StructType().add(...).add(...)struct_type.add("address",StringType())#查看已有字段print(struct_type.names)"""['id','name','age','address']"""一个StructField负责描述一个列,描述信息包含:列名、列的类型、列是否允许为空,由于二维表可以有多个列,那么就会有多个StructField,而多个StructField便组成了StructType。然后一行数据在代码中被表示为一个Row对象,一列数据被表示为一个Column对象(同时也包含列的信息)。
说完了DataFrame的结构,我们来创建DataFrame,它的创建方式还是比较多的。除了用SparkSession读取数据源的方式创建之外,还有其它方式,我们逐一介绍。
DataFrame对象可以从RDD转化而来,因为都是分布式数据集,只需要将结构转换一下即可。
RDD的每个元素都是一个列表,对应DataFrame的一行,然后手动指定列的名称,至于类型会基于RDD进行推断,这里显然都是字符串。事实上,DataFrame也可以直接基于列表来创建,和Pandas是类似的。
frompyspark.sqlimportSparkSessionimportpandasaspddata=[[1,2,3],[4,5,6],[7,8,9]]session=SparkSession.builder.getOrCreate()df1=session.createDataFrame(data,schema=["a","b","c"])df2=pd.DataFrame(data,columns=["a","b","c"])df1.show()"""+---+---+---+|a|b|c|+---+---+---+|1|2|3||4|5|6||7|8|9|+---+---+---+"""print(df2)"""abc012314562789"""然后注意里面的schema参数,除了传一个列表,还可以传StructType对象。
frompyspark.sqlimportSparkSessionfrompyspark.sql.typesimportStructType,StructField,IntegerType,StringTypesession=SparkSession.builder.getOrCreate()sc=session.sparkContextrdd=sc.textFile("hdfs://satori001:9000//data.csv")rdd=rdd.map(lambdax:x.split(","))struct_type=StructType([StructField("年龄",StringType()),StructField("性别",StringType()),StructField("姓名",StringType()),])df=session.createDataFrame(rdd,schema=struct_type)df.show()这样也是可以的,但是要保证手动指定的类型和RDD内部数据的类型是一致的。比如年龄,RDD内部是以字符串的形式存储的,所以这里也必须要指定为StringType。
最后RDD还有一个toDF方法,用于生成DataFrame。
frompyspark.sqlimportSparkSessionsession=SparkSession.builder.getOrCreate()sc=session.sparkContextrdd=sc.textFile("hdfs://satori001:9000//data.csv")rdd=rdd.map(lambdax:x.split(","))#也可以传递StructType对象,但要保证类型是一致的df=rdd.toDF(["年龄","性别","姓名"])df.show()"""+----+------+--------+|年龄|性别|姓名|+----+------+--------+|17|female|古明地觉||400|female|四方茉莉||18|female|椎名真白|+----+------+--------+"""以上就是基于RDD创建DataFrame的几种方式。
我们还可以将Pandas的DataFrame转成Spark的DataFrame。
frompyspark.sqlimportSparkSessionimportpandasaspdpandas_df=pd.DataFrame({"name":["satori","koishi","marisa"],"age":[17,16,18]})session=SparkSession.builder.getOrCreate()spark_df=session.createDataFrame(pandas_df)spark_df.printSchema()"""root|--name:string(nullable=true)|--age:long(nullable=true)"""spark_df.show()"""+------+---+|name|age|+------+---+|satori|17||koishi|16||marisa|18|+------+---+"""比较简单,也是通过createDataFrame方法。
SparkSession还可以读取数据源,支持不同模式,比如text。
frompyspark.sqlimportSparkSessionsession=SparkSession.builder.getOrCreate()#.txt和.csv本质上都是纯文本#如果以text模式读取,那么得到的DataFrame只会有一列df=session.read.text("hdfs://satori001:9000//data.csv")df=df.toDF("data")df.show()"""+-------------------+|data|+-------------------+|17,female,古明地觉||400,female,四方茉莉||18,female,椎名真白|+-------------------+"""因此我们很少会用text模式读取。
再来看看csv模式。
frompyspark.sqlimportSparkSessionsession=SparkSession.builder.getOrCreate()df=session.read.csv("hdfs://satori001:9000//data.csv",sep=",",#单元格分隔符lineSep="\n",#行分隔符header=False,#没有表头)df.show()"""+---+------+--------+|_c0|_c1|_c2|+---+------+--------+|17|female|古明地觉||400|female|四方茉莉||18|female|椎名真白|+---+------+--------+"""header参数默认为None,如果不指定或者指定为False,那么会自动生成表头,然后可以调用toDF方法将表头替换掉。如果header参数指定为True,那么会将第一行作为表头。当然csv方法还有很多其它参数,可以点进源码中查看。
再来看看json模式,在HDFS上有一个data.json,内容如下。
{"name":"satori","age":17}{"name":"koishi","age":16}{"name":"marisa","age":18}然后我们以json模式读取它。
frompyspark.sqlimportSparkSessionsession=SparkSession.builder.getOrCreate()df=session.read.json("hdfs://satori001:9000//data.json")df.show()"""+---+------+|age|name|+---+------+|17|satori||16|koishi||18|marisa|+---+------+"""以上就是json模式。
关于读取文件就说到这里,当然还有很多其它方法,可以通过源码查看。
前面说了,DataFrame支持两种风格进行编程,分别是SQL风格和DSL风格。
再来回顾一下使用SQL编程。
frompyspark.sqlimportSparkSessionsession=SparkSession.builder.getOrCreate()df=session.createDataFrame([["satori",17],["koishi",16],["marisa",19],["scarlet",400]],schema=["name","age"])#将df注册成一个临时视图#注意:createTempView方法要求视图不能存在,否则报错#如果你希望视图不存在则创建,存在则替换,那么可以使用createOrReplaceTempViewdf.createTempView("girl")#除了临时视图之外,还可以注册成全局视图,通过createGlobalTempView和createOrReplaceGlobalTempView方法#全局视图可以跨SparkSession存在,而临时视图只能作用于当前的SparkSession,不过一般情况下我们只会创建一个SparkSession#使用SQL编程session.sql("SELECTUPPER(name)ASupper_nameFROMgirlWHEREage>18").show()"""+----------+|upper_name|+----------+|MARISA||SCARLET|+----------+"""如果你很熟悉SQL,那么基于SQL编程是完全没问题的,即使是像窗口函数这种复杂的语法也是完全支持的。如果不喜欢SQL,也可以通过DSL编程。
frompyspark.sqlimportSparkSessionfrompyspark.sql.functionsimportuppersession=SparkSession.builder.getOrCreate()df=session.createDataFrame([["satori",17],["koishi",16],["marisa",19],["scarlet",400]],schema=["name","age"])#df[df["age"]>18]等价于df.where(df["age"]>18)或者df.filter(df["age"]>18)df[df["age"]>18].select(upper(df["name"]).alias("upper_name")).show()"""+----------+|upper_name|+----------+|MARISA||SCARLET|+----------+"""#选择1条df.where(df["age"]>18).limit(1).show()"""+------+---+|name|age|+------+---+|marisa|19|+------+---+"""还是很简单的,整体操作和SQL比较相似。
df.where(...).select(...).groupby(...).sort(...).limit(...).offset(...)这些方法可以链式调用,每一步都会返回一个DataFrame。然后pyspark.sql.functions里面还提供了大量的函数,支持我们对DataFrame的一整列进行操作,并返回Column对象。总之如果你熟悉Pandas的话,那么用PySparkDataFrame会非常简单,因此关于它的API就不赘述了,可以参考官网或者询问ChatGPT。
这里我们举几个复杂的例子(工作中也会经常遇到),来验证一下PySparkDataFrame是不是和PandasDataFrame一样强大。
假设有如下DataFrame,我们希望创建一个C列,它的值来自于A列,如果A列不为空,否则来自于B列。
AB0011None20033None40055那么应如何处理呢?
frompyspark.sqlimportSparkSessionfrompyspark.sql.functionsimportwhensession=SparkSession.builder.getOrCreate()df=session.createDataFrame([['001','1'],[None,'2'],['003','3'],[None,'4'],['005','5']],schema=["A","B"])df.show()"""+----+---+|A|B|+----+---+|001|1||null|2||003|3||null|4||005|5|+----+---+"""#可以通过withColumn方法创建一个新列,并指定列名和列值#注意:PySpark的DataFrame不支持本地修改,因为它是基于RDD实现的,而RDD是不可变的#所以df.withColumn方法会返回一个新的DataFrame,并在原有的DataFrame的基础上增加一列df=df.withColumn("C",#当A列不为空时,使用A列的值,否则使用B列的值when(df["A"].isNotNull(),df["A"]).otherwise(df["B"]))df.show()"""+----+---+---+|A|B|C|+----+---+---+|001|1|001||null|2|2||003|3|003||null|4|4||005|5|005|+----+---+---+"""还是很简单的,要是我们希望C列的值来自于A列和B列中较大的那一个,该怎么做呢?
frompyspark.sqlimportSparkSessionfrompyspark.sql.functionsimportgreatestsession=SparkSession.builder.getOrCreate()df=session.createDataFrame([[1,11],[22,2],[33,3],[4,44],[55,5]],schema=["A","B"])df.show()"""+---+---+|A|B|+---+---+|1|11||22|2||33|3||4|44||55|5|+---+---+"""df=df.withColumn("C",greatest(df["A"],df["B"]))df.show()"""+---+---+---+|A|B|C|+---+---+---+|1|11|11||22|2|22||33|3|33||4|44|44||55|5|55|+---+---+---+"""虽然和PandasDataFrame的操作有些不一样,但不难理解,多用用就熟悉了。
假设有如下DataFrame:
+-------+-------+-----+|name|subject|score|+-------+-------+-----+|satori|chinese|90||satori|math|95||satori|english|96||scarlet|chinese|87||scarlet|math|92||scarlet|english|98||cirno|chinese|100||cirno|math|9||cirno|english|91|+-------+-------+-----+我希望变成如下结构,要怎么做呢?
+-------+-------+-------+----+|name|chinese|english|math|+-------+-------+-------+----+|satori|90|96|95||scarlet|87|98|92||cirno|100|91|9|+-------+-------+-------+----+这是一个典型的列转行,下面看看如何实现。
frompyspark.sqlimportSparkSessionsession=SparkSession.builder.getOrCreate()df=session.createDataFrame([["satori","chinese",90],["satori","math",95],["satori","english",96],["scarlet","chinese",87],["scarlet","math",92],["scarlet","english",98],["cirno","chinese",100],["cirno","math",9],["cirno","english",91]],schema=["name","subject","score"])df.show()"""+-------+-------+-----+|name|subject|score|+-------+-------+-----+|satori|chinese|90||satori|math|95||satori|english|96||scarlet|chinese|87||scarlet|math|92||scarlet|english|98||cirno|chinese|100||cirno|math|9||cirno|english|91|+-------+-------+-----+"""df=df.groupBy("name").pivot("subject").agg({"score":"max"})df.show()"""+-------+-------+-------+----+|name|chinese|english|math|+-------+-------+-------+----+|satori|90|96|95||scarlet|87|98|92||cirno|100|91|9|+-------+-------+-------+----+"""非常简单,一行就搞定了。
假设有如下数据:
姓名生日声优0琪亚娜·卡斯兰娜12月7日陶典,钉宫理惠1布洛妮娅·扎伊切克8月18日TetraCalyx,Hanser,阿澄佳奈,花泽香菜2德丽莎·阿波卡利斯3月28日花玲,田村由香里我希望变成如下格式:
姓名生日声优0琪亚娜·卡斯兰娜12月7日陶典1琪亚娜·卡斯兰娜12月7日钉宫理惠2布洛妮娅·扎伊切克8月18日TetraCalyx3布洛妮娅·扎伊切克8月18日Hanser4布洛妮娅·扎伊切克8月18日阿澄佳奈5布洛妮娅·扎伊切克8月18日花泽香菜6德丽莎·阿波卡利斯3月28日花玲7德丽莎·阿波卡利斯3月28日田村由香里该怎么做呢?
列转行可以简单地认为是将数据库中的宽表变成一张高表,而之前介绍的行转列则是把一张高表变成一张宽表。假设有如下数据:
姓名水果星期一星期二星期三古明地觉草莓70斤72斤60斤雾雨魔理沙樱桃61斤60斤81斤琪露诺西瓜103斤116斤153斤我们希望变成下面这种形式:
姓名水果日期销量古明地觉草莓星期一70斤雾雨魔理沙樱桃星期一61斤琪露诺西瓜星期一103斤古明地觉草莓星期二72斤雾雨魔理沙樱桃星期二60斤琪露诺西瓜星期二116斤古明地觉草莓星期三60斤雾雨魔理沙樱桃星期三81斤琪露诺西瓜星期三153斤当然我们这里字段比较少,如果把星期一到星期日全部都写上去有点太长了。不过从这里也能看出前者对应的是一张宽表,就是字段非常多,我们要将其转换成一张高表。
DataFrame基于RDD,所以DataFrame也会产生shuffle,默认的分区数(spark.sql.shuffle.partitions)为200。这个在项目中要合理的设置,一般是集群的CPU总核心数的2到4倍。设置方式有三种:
最后来看一下DataFrame的导出,将DataFrame处理完毕后,我们肯定要导出到某个具体的位置。
frompyspark.sql.readwriterimportDataFrameReader,DataFrameWriter#读取数据源,可以使用session.read.xxxsession.read.xxx#导出DataFrame,则是df.write.xxxdf.write.xxxsession.read返回的是DataFrameReader对象,df.write返回的是DataFrameWriter对象,具体想导出到哪个位置,直接调用相应的方法即可。我们以读取和写入数据库为例,演示一下。
frompyspark.sqlimportSparkSessionsession=SparkSession.builder.getOrCreate()df=session.read.jdbc(url="jdbc:mysql://username:password@host:3306/database",table="user")df.show()我们测试一下,看看有没有问题。
读取正常,再来看看导出。
df.write.jdbc(url="jdbc:mysql://username:password@host:3306/database",table="user",mode="append")这里导出也是没有问题的,需要注意里面的参数mode,它有如下选项。
如果表不存在,那么会自动创建表并写入数据。
无论是Hive还是SparkSQL,在分析处理数据时,都需要使用函数。SparkSQL已经自带了非常多的函数,在pyspark.sql.functions中,这些函数基本能满足绝大部分工作要求。但如果你的场景比较特殊,找不到满足条件的函数,那么Spark也支持自定义。
首先在Hive中,自定义函数有以下三种类型:
而SparkSQL目前仅仅支持UDF和UDAF,其中Python只支持UDF,下面我们来看看如何自定义UDF。
frompyspark.sqlimportSparkSessionfrompyspark.sql.typesimportIntegerTypefrompyspark.sql.functionsimportudfsession=SparkSession.builder.getOrCreate()df=session.createDataFrame([[17],[18],[19]],schema=["age"])df.show()"""+---+|age|+---+|17||18||19|+---+"""defincr_10(x):returnx+10incr_10=udf(incr_10,IntegerType())df.select(incr_10(df["age"]).alias("age")).show()"""+---+|age|+---+|27||28||29|+---+"""因为只支持DSL,所以相比第一种方式就是少了个参数而已。
然后再来看看返回数组的UDF。
UDF返回值类型还可以是map,有了map我们便可以生成多列。
让我们先来回顾一下RDD执行流程:
如果简单来看的话,有四个步骤:
尽管相比MapReduce来说,RDD已经足够方便了,但它还是比较底层的。RDD的运行完全按照开发者的代码执行,如果开发者水平有限,那么RDD的执行效率也会受到影响。而SparkSQL会对代码进行自动优化,以提升代码运行效率,降低开发者水平带来的影响。那么问题来了,为什么SparkSQL可以被优化,而RDD不行。
很简答,因为RDD内部的数据类型和格式是不受限制的,很难进行优化。但SparkSQL不同,我们用该模块操作的数据结构是DataFrame,这是一个固定的二维表格式,可以被针对,而负责执行优化的便是Catalyst优化器。
为了解决过多依赖Hive的问题,SparkSQL使用了一个新的SQL优化器Catalyst,用于替代Hive中的优化器,整个SparkSQL的架构如下:
而我们的重点则是Catalyst如何对SQL进行优化,并生成RDD代码的。
Catalyst优化器的具体流程如下:
下面我们一步一步分析。
步骤一:解析SQL,生成AST(抽象语法树)
第一步生成的AST完全是按照SQL语句本身翻译过来的,没有经过任何优化,两者体现的含义是一致的。我们看一下AST,需要从下往上看。首先是扫描两张表,然后Join,之后按照指定条件过滤,再筛选。
步骤二:在AST中加入元数据信息,该步骤主要是为了方便后续优化的。
会在原始的AST上打一些标记,这些标记是做什么的不用理会,它是Spark内部使用的。
步骤三:对已经加入元数据的AST进行优化。
优化的途径非常多,我们介绍两个常见的。
SparkSQL内置了一两百种优化规则,感兴趣的话可以通过源码查看:org.apache.spark.sql.catalyst.optimizer.Optimizer。
步骤四:上面生成的AST叫做逻辑执行计划(优化过后的),还需要再生成物理执行计划。
可以使用queryExecution方法查看逻辑执行计划,使用explain方法查看物理执行计划。
步骤五:基于物理执行计划,生成RDD代码。
总结一下就是:
Hive我们之前说过,对于Hive来说就两个组件:
因此对于Hive来说,执行引擎和MetaStore都是必不可少的。好了,简单回顾了一下Hive,再来看看Spark。
Hive是将SQL翻译成MR,Spark是将SQL翻译成RDD,两者本质是类似的。只是Spark没有元数据管理,如果想写SQL,那么必须要先读数据得到DataFrame,然后将它注册为一个视图。而SparkonHive便是将Hive的MetaStore借给Spark,让Spark也拥有元数据管理。
因为MetaStore本质上就是一个元数据管理服务,它能和Hive的执行引擎搭配,那么自然也能和Spark的执行引擎搭配。
所以将Spark的SQL执行引擎(SparkSQL)和Hive的MetaStore结合起来,就得到了SparkonHive。说白了就是将Hive的SQL执行引擎给替换掉了,换成了更有效率的SparkSQL,以后SQL会被翻译成RDD,而不是MapReduce。
那么Spark要如何连接Hive的MetaStore呢?在$SPARK_HOME/conf目录中新建一个hive-site.xml,添加如下内容。
session=SparkSession.builder.\config("spark.sql.shuffle.partitions",50).\config("spark.sql.warehouse.dir","hdfs://satori001:9000/user/hive/warehouse").\config("hive.metastore.uris","thrift://satori001:9083").\enableHiveSupport().\getOrCreate()另外Hive的MetaStore会依赖关系型数据库,一般是MySQL,所以Spark的jars目录要包含能操作MySQL的驱动,这里我们之前已经安装过了。
然后不要忘记在Hive中开启MetaStore服务,修改Hive的配置文件hive-site.xml,添加如下内容:
注意:如果你不使用Hive的MetaStore服务,那么SparkSQL在创建表时,会将元数据写入本地的spark-warehouse目录,然后实体数据也会存在该目录中。
在TPC-DS基准测试中,Spark3.0的性能达到了Spark2.4的两倍。
那么相比2.x,Spark在3.x中都引入了哪些功能呢?
由于数据统计信息(元数据)不足或不准确,以及对成本的错误估算(执行计划调度),导致生成的初始执行计划不理想。于是Spark在3.x版本提供了AdaptiveQueryExecution(简称AQE),即自适应查询技术,通过在运行时对查询执行计划进行优化,允许Planner在运行时执行可选计划。这些可选计划将会基于运行时数据统计进行动态优化,从而提高性能。
开启AQE的方式:将spark.sql.adaptive.enabled参数指定为true,可以在代码文件和spark-defaults.conf配置文件中指定,或者提交任务时指定。
AQE提供了三个方面的优化:
动态调整shuffle分区的数量,用户可以在开始时设置相对较多的shuffle分区数,AQE会在运行时将相邻的多个小分区合并为较大大分区。
尽管我们规定了分区数,但如果相邻的多个分区的数据量很小的话,那么其实没必要使用多个分区。因此在开启AQE之后,会将多个相邻的小分区进行合并。
此优化可以在一定程度上避免因缺少统计信息或错误估计大小(两种情况也可能同时存在),而导致执行计划性能不佳的情况。具体做法是在运行时将SORTMERGEJOIN转换成BROADCASTHASHJOIN,从而进一步提升性能。
对于右边的数据集,统计的结果是25MB,但实际只有8MB,这种差异只会在运行时产生。当开启AQE之后,就会识别到这种差异,然后进行优化。默认情况下的JOIN是采用SORT+MERGE的形式,但开启AQE识别到这种差异之后,会发现右边的数据集其实很小,完全可以广播出去。这样每个Executor都包含右边数据集的一个副本,将SORTMERGEJOIN转成基于广播的哈希JOIN,从而避免大量的网络传输,提升性能。
倾斜JOIN(SkewJoin)可能导致负载的极度不平衡,并严重降低性能。AQE从shuffle文件统计信息中检测到任何倾斜后,可以将倾斜的分区分割成更小的分区,并将它们与另一侧相应的分区连接起来。这种优化可以并行化倾斜处理,获得更好的整体性能。
比如AJOINB,但A表中分区A0的长度明显大于其它分区,因此A0便是一个倾斜的分区,在和B0进行JOIN时,对应的Task就变成了长尾Task。
如果开启了AQE,那么在执行AJOINB之前,会通过上游Stage的统计信息,判断是否存在数据倾斜。由于当前partitionA0明显超过平均值的数倍,于是会判断AJOINB发生了数据倾斜,且倾斜分区为partitionA0。而为了解决这个问题,SparkAQE会将A0的数据拆成N份,使用N个Task去处理该partition,每个Task只读取若干个Map的shuffle输出文件。
N个Task各自读取A0的部分数据,然后和B0做JOIN,这N个Task执行的结果和A0、B0直接做JOIN的结果是等价的。不难看出,在这样的处理中,B表的partition0会被读取N次,虽然增加了一定的额外成本,但通过N个任务处理倾斜数据带来的收益是高于成本的。
总结:在Stage提交之前,根据上游Stage的所有MapTask的统计信息,计算得到下游每个ReduceTask的shuffle输入,因此SparkAQE能够自动发现产生数据倾斜的JOIN,并且做出优化处理,该功能就是SparkAQESkewJoin(动态优化倾斜Join)。
当然啦,对分区进行拆分也是有条件的。
只有两个条件都满足时,AQE才会进行优化。
以上便是Spark在3.0提供的AQE,核心思想就是利用执行结束的上游Stage的统计信息(主要是数据量和记录数),来优化下游Stage的物理执行计划。而在使用上,我们只需记住三点:
前面介绍SparkSQL原理时说过,在生成执行计划之前会进行列值裁剪,将不用的字段过滤掉,而这一步是在运行之前发生的,属于静态裁剪。但除了字段可以过滤之外,分区也可以过滤,只是分区过滤在编译阶段不一定会被优化器识别出来。而当优化器在编译阶段无法识别可跳过的分区时,便可使用动态分区裁剪,即基于运行时推断的信息来对分区进行裁剪。
动态分区裁剪在星型模型中很常见,星型模型是由一个或多个引用了任意个维度表的事实表组成。在连接操作中,我们可以通过识别维度表过滤之后的分区,来裁剪从事实表中读取的分区,从而极大地减少数据量(shuffle带来的影响),提升性能。在TPC-DS基准测试中,102个查询有60个获得了2~18倍的性能提升。
该特性是自动开启的,我们正常编写SQL,会自动优化。
因此Spark官方开发了Koalas,它是基于Spark的PandasAPI实现,目的是让数据科学家能够在分布式环境中更高效地处理大型数据集。Koalas在使用上和Pandas是一样的,但底层跑的是分布式RDD。经过一年多的开发,Koalas已经对PandasAPI实现了80%,每月的PyPI下载量也迅速增长到了85万,并以两周一次的发布节奏快速演进。
安装Koalas:pipinstallkoalas。
importpandasaspdimportdatabricks.koalasaskspandas_df=pd.DataFrame({"a":[1,2,3],"b":[4,5,6]})koalas_df=ks.from_pandas(pandas_df)KoalasDataFrame的数据结构和PySparkDataFrame是一致的,都是基于RDD的分布式数据集,在操作的时候依旧会产生DAG、宽窄依赖、阶段划分、内存迭代等等。但是KoalasDataFrame的API和PandasDataFrame是一致的,这种设计既可以让熟悉Pandas的开发者不需要额外的学习成本,又能利用Spark的分布式计算。
如果你是Pandas的重度使用者,那么可以非常方便地将代码切换到Koalas,只需把Pandas的DataFrame换成Koalas的DataFrame,然后便可以提交到Spark集群之上运行。但对于那些不是特别依赖Pandas的开发者来说,他们会更倾向于使用SparkSQL,因为里面提供的DataFrame同样易于操作并且功能强大。
目前来讲,Koalas和SparkSQL用的人都不少,具体使用哪种取决于自身喜好。
关于Spark就说到这里,然后我们对学习的内容做一个总结。
但Spark不仅仅支持离线处理,还可以通过SparkStreaming和StructedStreaming对数据进行实时的流处理,后续我们来介绍SparkStreaming和StructedStreaming。