Spark入门:RDD编程

通过前面几章的介绍,我们已经了解了Spark的运行架构和RDD设计与运行原理,并介绍了RDD操作的两种类型:转换操作和行动操作。同时,我们前面通过一个简单的WordCount实例,也大概介绍了RDD的几种简单操作。现在我们介绍更多关于RDD编程的内容。Spark中针对RDD的操作包括创建RDD、RDD转换操作和RDD行动操作。

RDD可以通过两种方式创建:*第一种:读取一个外部数据集。比如,从本地文件加载数据集,或者从HDFS文件系统、HBase、Cassandra、AmazonS3等外部数据源中加载数据集。Spark可以支持文本文件、SequenceFile文件(Hadoop提供的SequenceFile是一个由二进制序列化过的key/value的字节流组成的文本存储文件)和其他符合HadoopInputFormat格式的文件。*第二种:调用SparkContext的parallelize方法,在Driver中一个已经存在的集合(数组)上创建。

cd/usr/local/hadoop./sbin/start-dfs.sh然后,我们按照下面命令启动spark-shell:

cdusr/local/spark/mycode/mkdirrdd然后,使用vim编辑器,在rdd目录下新建一个word.txt文件,你可以在文件里面随便输入几行英文语句用来测试。

经过上面的准备工作以后,我们就可以开始创建RDD了。

Spark采用textFile()方法来从文件系统中加载数据创建RDD,该方法把文件的URI作为参数,这个URI可以是本地文件系统的地址,或者是分布式文件系统HDFS的地址,或者是AmazonS3的地址等等。下面请切换回spark-shell窗口,看一下如何从本地文件系统中加载数据:

scala>vallines=sc.textFile("hdfs://localhost:9000/user/hadoop/word.txt")scala>vallines=sc.textFile("/user/hadoop/word.txt")scala>vallines=sc.textFile("word.txt")注意,上面三条命令是完全等价的命令,只不过使用了不同的目录形式,你可以使用其中任意一条命令完成数据加载操作。

可以调用SparkContext的parallelize方法,在Driver中一个已经存在的集合(数组)上创建。下面请在spark-shell中操作:

scala>valarray=Array(1,2,3,4,5)array:Array[Int]=Array(1,2,3,4,5)scala>valrdd=sc.parallelize(array)rdd:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[13]atparallelizeat:29从执行结果信息可以看出,rdd是一个Int类型的RDD。上面使用数组来创建,或者,也可以从列表中创建:

scala>vallist=List(1,2,3,4,5)list:List[Int]=List(1,2,3,4,5)scala>valrdd=sc.parallelize(list)rdd:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[14]atparallelizeat:29从执行结果信息可以看出,rdd是一个Int类型的RDD。

RDD被创建好以后,在后续使用过程中一般会发生两种操作:*转换(Transformation):基于现有的数据集创建一个新的数据集。*行动(Action):在数据集上进行运算,返回计算值。

对于RDD而言,每一次转换操作都会产生不同的RDD,供给下一个“转换”使用。转换得到的RDD是惰性求值的,也就是说,整个转换过程只是记录了转换的轨迹,并不会发生真正的计算,只有遇到行动操作时,才会发生真正的计算,开始从血缘关系源头开始,进行物理的转换操作。下面列出一些常见的转换操作(TransformationAPI):*filter(func):筛选出满足函数func的元素,并返回一个新的数据集*map(func):将每个元素传递到函数func中,并将结果返回为一个新的数据集*flatMap(func):与map()相似,但每个输入元素都可以映射到0或多个输出结果*groupByKey():应用于(K,V)键值对的数据集时,返回一个新的(K,Iterable)形式的数据集*reduceByKey(func):应用于(K,V)键值对的数据集时,返回一个新的(K,V)形式的数据集,其中的每个值是将每个key传递到函数func中进行聚合

行动操作是真正触发计算的地方。Spark程序执行到行动操作时,才会执行真正的计算,从文件中加载数据,完成一次又一次转换操作,最终,完成行动操作得到结果。下面列出一些常见的行动操作(ActionAPI):*count()返回数据集中的元素个数*collect()以数组的形式返回数据集中的所有元素*first()返回数据集中的第一个元素*take(n)以数组的形式返回数据集中的前n个元素*reduce(func)通过函数func(输入两个参数并返回一个值)聚合数据集中的元素*foreach(func)将数据集中的每个元素传递到函数func中运行*

这里给出一段简单的代码来解释Spark的惰性机制。

scala>vallines=sc.textFile("data.txt")scala>vallineLengths=lines.map(s=>s.length)scala>valtotalLength=lineLengths.reduce((a,b)=>a+b)上面第一行首先从外部文件data.txt中构建得到一个RDD,名称为lines,但是,由于textFile()方法只是一个转换操作,因此,这行代码执行后,不会立即把data.txt文件加载到内存中,这时的lines只是一个指向这个文件的指针。第二行代码用来计算每行的长度(即每行包含多少个单词),同样,由于map()方法只是一个转换操作,这行代码执行后,不会立即计算每行的长度。第三行代码的reduce()方法是一个“动作”类型的操作,这时,就会触发真正的计算。这时,Spark会把计算分解成多个任务在不同的机器上执行,每台机器运行位于属于它自己的map和reduce,最后把结果返回给DriverProgram。

下面我们举几个实例加深了解。请在spark-shell下执行下面操作。下面是一个关于filter()操作的实例。

scala>vallines=sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt")lines:org.apache.spark.rdd.RDD[String]=file:///usr/local/spark/mycode/rdd/word.txtMapPartitionsRDD[16]attextFileat:27scala>lines.filter(line=>line.contains("Spark")).count()res1:Long=2//这是执行返回的结果上面的代码中,lines就是一个RDD。lines.filter()会遍历lines中的每行文本,并对每行文本执行括号中的匿名函数,也就是执行Lamda表达式:line=>line.contains("Spark"),在执行Lamda表达式时,会把当前遍历到的这行文本内容赋值给参数line,然后,执行处理逻辑line.contains("Spark"),也就是只有当改行文本包含“Spark”才满足条件,才会被放入到结果集中。最后,等到lines集合遍历结束后,就会得到一个结果集,这个结果集中包含了所有包含“Spark”的行。最后,对这个结果集调用count(),这是一个行动操作,会计算出结果集中的元素个数。

这里再给出另外一个实例,我们要找出文本文件中单行文本所包含的单词数量的最大值,代码如下:

实际上,如果我们把上面的lines.map(line=>line.split("").size).reduce((a,b)=>if(a>b)aelseb)分开逐步执行,你就可以更加清晰地发现每个步骤生成的RDD的类型。

scala>vallist=List("Hadoop","Spark","Hive")list:List[String]=List(Hadoop,Spark,Hive)scala>valrdd=sc.parallelize(list)rdd:org.apache.spark.rdd.RDD[String]=ParallelCollectionRDD[22]atparallelizeat:29scala>rdd.cache()//会调用persist(MEMORY_ONLY),但是,语句执行到这里,并不会缓存rdd,这是rdd还没有被计算生成scala>println(rdd.count())//第一次行动操作,触发一次真正从头到尾的计算,这时才会执行上面的rdd.cache(),把这个rdd放到缓存中3scala>println(rdd.collect().mkString(","))//第二次行动操作,不需要触发从头到尾的计算,只需要重复使用上面缓存中的rddHadoop,Spark,Hive最后,可以使用unpersist()方法手动地把持久化的RDD从缓存中移除。

RDD是弹性分布式数据集,通常RDD很大,会被分成很多个分区,分别保存在不同的节点上。RDD分区的一个分区原则是使得分区的个数尽量等于集群中的CPU核心(core)数目。对于不同的Spark部署模式而言(本地模式、Standalone模式、YARN模式、Mesos模式),都可以通过设置spark.default.parallelism这个参数的值,来配置默认的分区数目,一般而言:*本地模式:默认为本地机器的CPU数目,若设置了local[N],则默认为N;*ApacheMesos:默认的分区数为8;*Standalone或YARN:在“集群中所有CPU核心数目总和”和“2”二者中取较大值作为默认值;

因此,对于parallelize而言,如果没有在方法中指定分区数,则默认为spark.default.parallelism,比如:

scala>valarray=Array(1,2,3,4,5)array:Array[Int]=Array(1,2,3,4,5)scala>valrdd=sc.parallelize(array,2)#设置两个分区rdd:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[13]atparallelizeat:29对于textFile而言,如果没有在方法中指定分区数,则默认为min(defaultParallelism,2),其中,defaultParallelism对应的就是spark.default.parallelism。如果是从HDFS中读取文件,则分区数为文件分片数(比如,128MB/片)。

在实际编程中,我们经常需要把RDD中的元素打印输出到屏幕上(标准输出stdout),一般会采用语句rdd.foreach(println)或者rdd.map(println)。当采用本地模式(local)在单机上执行时,这些语句会打印出一个RDD中的所有元素。但是,当采用集群模式执行时,在worker节点上执行打印语句是输出到worker节点的stdout中,而不是输出到任务控制节点DriverProgram中,因此,任务控制节点DriverProgram中的stdout是不会显示打印语句的这些输出内容的。为了能够把所有worker节点上的打印输出信息也显示到DriverProgram中,可以使用collect()方法,比如,rdd.collect().foreach(println),但是,由于collect()方法会把各个worker节点上的所有RDD元素都抓取到DriverProgram中,因此,这可能会导致内存溢出。因此,当你只需要打印RDD的部分元素时,可以采用语句rdd.take(100).foreach(println)。

THE END
1.Android数据的五种持久化方式android数据持久化存储方式数据持久化就是指将那些内存中的瞬时数据保存到存储设备中,保证即时在手机或电脑关机的情况下,这些数据仍然不会丢失。 Android系统中主要提供了3种方式用于简单地实现数据持久化技术,即文件存储、SharedPreference以及数据库存储。另外,将数据保存在SD卡中也算是一种数据持久化技术,但是这种存储方式没有前三种方式安全。https://blog.51cto.com/u_12929/6534710
2.数据持久化的四种方式所谓的持久化,就是将数据保存到硬盘中,使得在应用程序或机器重启后可以继续访问之前保存的数据。在iOS开发中,有很多数据持久化的方案,接下来我将尝试着介绍一下5种方案: plist文件(属性列表) preference(偏好设置) NSKeyedArchiver(归档) SQLite 3 CoreData https://www.jianshu.com/p/17faa1eae676
3.Redis的持久化详解RedisRedis提供了2种不同形式的持久化方式: RDB(Redis DataBase):简而言之,就是在指定的时间间隔内,定时的将 redis 存储的数据生成Snapshot快照并存储到磁盘等介质上; AOF(Append Of File):将 redis 执行过的所有写指令记录下来,在下次 redis 重新启动时,只要把这些写指令从前到后再重复执行一遍,就可以实现数据恢复https://www.jb51.net/database/287421mez.htm
4.阿里面试官:刷4遍以下面试题,你也能从外包成功跳到大厂12.Redis客户端命令执行的方式? 13.如何停止redis服务? 14.如何查看当前键是否存在? 15.如何删除数据? 16.redis为什么快?单线程? 17.字符串最大不能超过多少? 18.redis默认分多少个数据库? 19.redis持久化的几种方式? 20.RDB持久化? 21.RDB的持久化是如何触发的? https://maimai.cn/article/detail?fid=1650337220&efid=EBt1w2lyYVHJKvwIFwMu0w
5.从入门到高阶,一文搞懂Redis- 最终一致性(BASE):基本可用、软状态/柔性事务、最终一致性1.2.3.4.5. 二、Redis是什么? Redis = Remote Dictionary Server,即远程字典服务。 是一个开源的使用ANSI C语言编写、支持网络、可基于内存亦可持久化的日志型、Key-Value数据库,并提供多种语言的API。 https://www.easemob.com/news/8157
6.Redis调优大揭秘:掌握这几十种技巧,让你的Redis更快更稳定Redis作为一个高性能的内存数据库,也面临着这个问题,例如在Redis中,主线程处理客户端请求、子进程进行数据持久化、子线程处理RDB/AOF rewrite、后台线程处理异步lazy-free和异步释放fd等,这些线程会在多个逻辑处理器之间切换,从而导致上下文切换的性能损耗。https://developer.aliyun.com/article/1408665
7.iOS数据持久化之一——plist文件原腾讯云开发者社区plist是一种文件格式,其内容规则是xml文件,后缀为.plist,因此,我们更习惯于成它问plist文件,在iOS开发中,这种文件常用来保存一些简单的配置数据,例如项目中的info.plist。 通过plist文件编辑器,我们可以很方便的查看和编辑层次清晰的plist文件。 二、通过操作plist文件进行数据持久化的几种方式 https://cloud.tencent.com/developer/article/1187369
8.数据持久化的几种方式数据持久话方式数据持久化的方式有很多,最近只学了几种,在此一一列出来,记录下自己的成长,以下几种方式都是在编辑器模式下直接运行的,不需要游戏运行。 1.首先是Json格式的写入和读取 需要存储的数据类一定要加上[Serializable]序列化标签,Data类里面有名字和一个SubData的集合,SubData类里面则包括常用的几个数据类型的字段,最https://blog.csdn.net/qq_42010059/article/details/84963658
9.IntellijIDEA插件开发京东云技术团队很多idea插件文档更多的是介绍如何创建一个简单的idea插件,本篇文章从开发环境、demo、生态组件、添加依赖包、源码解读、网络请求、渲染数据、页面交互等方面介绍,是一篇能够满足基本的插件开发工程要求的文章。 如有疏漏欢迎指正,如想深入了解欢迎探讨。 一、简介 https://www.tulingxueyuan.cn/tlzx/jsp/3896.html
10.美国国安局网络攻击中国上万次,窃取超140GB数据2、持久化控制类武器 TAO依托此类武器对西北工业大学网络进行隐蔽持久控制,TAO行动队可通过加密通道发送控制指令操作此类武器实施对西北工业大学网络的渗透、控制、窃密等行为。此类武器共有6种: ①“二次约会” 此武器长期驻留在网关服务器、边界路由器等网络边界设备及服务器上,可针对海量数据流量进行精准过滤与自动化http://www.jccq.gov.cn/cqzx/sjdt_5/202209/t20220906_1661620.shtml