第1章项目需求及架构设计1.1项目需求分析一、数据采集平台搭建二、Kafka、Zookeeper中间件准备三、下游SparkStreaming对接Kafka接收数据,实现vip个数统计、栏目打标签功能、做题正确率与掌握度的实时计算功能。
1.2项目框架1.2.1技术选型一、数据存储:Kafka、MySql二、数据处理:Spark三、其他组件:Zookeeper1.2.2流程设计
第2章需求2.0原始数据格式及对应topic2.0.1实时统计注册人数kafka对应topic:register_topic数据格式:
数据格式
{"app_id":"1","device_id":"102","distinct_id":"5fa401c8-dd45-4425-b8c6-700f9f74c532","event_name":"-","ip":"121.76.152.135","last_event_name":"-","last_page_id":"0","next_event_name":"-","next_page_id":"2","page_id":"1","server_time":"-","uid":"245494"}
uid:用户idapp_id:平台iddeviceid:平台iddisinct_id:唯一标识Ip:用户ip地址last_page_id:上一页面idpage_id:当前页面id0:首页1:商品课程页2:订单页面3:支付页面next_page_id:下一页面id2.0.4实时统计学员播放视频各时长Kafka对应topic:course_learn数据格式:
{"biz":"bdfb58e5-d14c-45d2-91bc-1d9409800ac3","chapterid":"1","cwareid":"3","edutypeid":"3","pe":"55","ps":"41","sourceType":"APP","speed":"2","subjectid":"2","te":"1563352166417","ts":"1563352159417","uid":"235","videoid":"2"}
2.2模拟数据采集模拟数据采集将准备好的log文件使用kafka生产者代码发送信息到topic
注册日志数据register.log日志文件对应topic:register_topic
做题数据qz_log日志文件对应topic:qz_log
商品页面数据page_log日志文件对应topic:page_topic
视频播放时长数据course_learn.log日志文件对应topic:course_learn
如果windows下没有安装hadoop环境先配置环境
Ip解析工具Ip解析本地库:
2.3.实时统计注册人员信息用户使用网站或APP进行注册,后台实时收集数据传输Kafka,SparkStreaming进行对接统计,实时统计注册人数。需求1:实时统计注册人数,批次为3秒一批,使用updateStateBykey算子计算历史数据和当前批次的数据总数,仅此需求使用updateStateBykey,后续需求不使用updateStateBykey。需求2:每6秒统统计一次1分钟内的注册数据,不需要历史数据提示:reduceByKeyAndWindow算子需求3:观察对接数据,尝试进行调优。2.4实时计算学员做题算正确率与知识点掌握度mysql建表语句:
创建相应的包
配置pom.xml文件
第5章需求实现5.1创建MySql配置文件在resource源码包下创建comerce.peoperties
jdbc.url=jdbc:mysql://hadoop102:3306/course_learnuseUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai&useSSL=falsejdbc.user=rootjdbc.password=000000
5.2创建读取配置文件的工具类在com.atguigu.qzpoint.util创建ConfigurationManager类packagecom.atguigu.qzpoint.util;
importjava.io.InputStream;importjava.util.Properties;
/****读取配置文件工具类*/publicclassConfigurationManager{
privatestaticPropertiesprop=newProperties();
static{try{InputStreaminputStream=ConfigurationManager.class.getClassLoader().getResourceAsStream("comerce.properties");prop.load(inputStream);}catch(Exceptione){e.printStackTrace();}}
//获取配置项publicstaticStringgetProperty(Stringkey){returnprop.getProperty(key);}
//获取布尔类型的配置项publicstaticbooleangetBoolean(Stringkey){Stringvalue=prop.getProperty(key);try{returnBoolean.valueOf(value);}catch(Exceptione){e.printStackTrace();}returnfalse;}
}5.3创建Json解析工具类在com.atguigu.qz.point.util创建ParseJsonData类packagecom.atguigu.qzpoint.util;
importcom.alibaba.fastjson.JSONObject;
publicclassParseJsonData{
publicstaticJSONObjectgetJsonData(Stringdata){try{returnJSONObject.parseObject(data);}catch(Exceptione){returnnull;}}}
5.4创建Druid连接池在com.atgugiu.qzpoint.util创建DataSourceUtil类packagecom.atguigu.qzpoint.util;
importcom.alibaba.druid.pool.DruidDataSourceFactory;
importjavax.sql.DataSource;importjava.io.Serializable;importjava.sql.*;importjava.util.Properties;
/***德鲁伊连接池*/publicclassDataSourceUtilimplementsSerializable{publicstaticDataSourcedataSource=null;
//提供获取连接的方法publicstaticConnectiongetConnection()throwsSQLException{returndataSource.getConnection();}
//提供关闭资源的方法【connection是归还到连接池】//提供关闭资源的方法【方法重载】3dqlpublicstaticvoidcloseResource(ResultSetresultSet,PreparedStatementpreparedStatement,Connectionconnection){//关闭结果集//ctrl+alt+m将java语句抽取成方法closeResultSet(resultSet);//关闭语句执行者closePrepareStatement(preparedStatement);//关闭连接closeConnection(connection);}
privatestaticvoidcloseConnection(Connectionconnection){if(connection!=null){try{connection.close();}catch(SQLExceptione){e.printStackTrace();}}}
privatestaticvoidclosePrepareStatement(PreparedStatementpreparedStatement){if(preparedStatement!=null){try{preparedStatement.close();}catch(SQLExceptione){e.printStackTrace();}}}
privatestaticvoidcloseResultSet(ResultSetresultSet){if(resultSet!=null){try{resultSet.close();}catch(SQLExceptione){e.printStackTrace();}}}}
5.5创建操作MySql的代理类在com.atguigu.qzpoint.util创建SqlProxy类packagecom.atguigu.qzpoint.util
importjava.sql.{Connection,PreparedStatement,ResultSet}
traitQueryCallback{defprocess(rs:ResultSet)}
classSqlProxy{privatevarrs:ResultSet=_privatevarpsmt:PreparedStatement=_
/***执行修改语句**@paramconn*@paramsql*@paramparams*@return*/defexecuteUpdate(conn:Connection,sql:String,params:Array[Any]):Int={varrtn=0try{psmt=conn.prepareStatement(sql)if(params!=null&¶ms.length>0){for(i<-0untilparams.length){psmt.setObject(i+1,params(i))}}rtn=psmt.executeUpdate()}catch{casee:Exception=>e.printStackTrace()}rtn}
/***执行查询语句*执行查询语句**@paramconn*@paramsql*@paramparams*@return*/defexecuteQuery(conn:Connection,sql:String,params:Array[Any],queryCallback:QueryCallback)={rs=nulltry{psmt=conn.prepareStatement(sql)if(params!=null&¶ms.length>0){for(i<-0untilparams.length){psmt.setObject(i+1,params(i))}}rs=psmt.executeQuery()queryCallback.process(rs)}catch{casee:Exception=>e.printStackTrace()}}
defshutdown(conn:Connection):Unit=DataSourceUtil.closeResource(rs,psmt,conn)}
5.6实时统计注册人数代码实现packagecom.atguigu.qzpoint.streaming
importjava.langimportjava.sql.ResultSetimportjava.util.Random
importcom.atguigu.qzpoint.util.{DataSourceUtil,QueryCallback,SqlProxy}importorg.apache.kafka.clients.consumer.ConsumerRecordimportorg.apache.kafka.common.TopicPartitionimportorg.apache.kafka.common.serialization.StringDeserializerimportorg.apache.spark.SparkConfimportorg.apache.spark.streaming.dstream.InputDStreamimportorg.apache.spark.streaming.kafka010._importorg.apache.spark.streaming.{Seconds,StreamingContext}
importscala.collection.mutable
objectRegisterStreaming{privatevalgroupid="register_group_test"
//valdsStream=stream.filter(item=>item.value().split("\t").length==3)//.mapPartitions(partitions=>//partitions.map(item=>{//valrand=newRandom()//valline=item.value()//valarr=line.split("\t")//valapp_id=arr(1)//(rand.nextInt(3)+"_"+app_id,1)//}))//dsStream.print()//vala=dsStream.reduceByKey(_+_)//a.print()//a.map(item=>{//valappid=item._1.split("_")(1)//(appid,item._2)//}).reduceByKey(_+_).print()
//处理完业务逻辑后手动提交offset维护到本地mysql中stream.foreachRDD(rdd=>{valsqlProxy=newSqlProxy()valclient=DataSourceUtil.getConnectiontry{valoffsetRanges:Array[OffsetRange]=rdd.asInstanceOf[HasOffsetRanges].offsetRangesfor(or<-offsetRanges){sqlProxy.executeUpdate(client,"replaceinto`offset_manager`(groupid,topic,`partition`,untilOffset)values(,,,)",Array(groupid,or.topic,or.partition.toString,or.untilOffset))}}catch{casee:Exception=>e.printStackTrace()}finally{sqlProxy.shutdown(client)}})ssc.start()ssc.awaitTermination()}
}
5.7实时统计学员做题正确率与知识点掌握度packagecom.atguigu.qzpoint.streaming
importjava.langimportjava.sql.{Connection,ResultSet}importjava.time.LocalDateTimeimportjava.time.format.DateTimeFormatter
/***知识点掌握度实时统计*/objectQzPointStreaming{
privatevalgroupid="qz_point_group"
}}
5.8实时统计商品页到订单页,订单页到支付页转换率packagecom.atguigu.qzpoint.streaming
importjava.langimportjava.sql.{Connection,ResultSet}importjava.text.NumberFormat
importcom.atguigu.qzpoint.util.{DataSourceUtil,ParseJsonData,QueryCallback,SqlProxy}importorg.apache.kafka.clients.consumer.ConsumerRecordimportorg.apache.kafka.common.TopicPartitionimportorg.apache.kafka.common.serialization.StringDeserializerimportorg.apache.spark.streaming.dstream.InputDStreamimportorg.apache.spark.streaming.kafka010._importorg.apache.spark.streaming.{Seconds,StreamingContext}importorg.apache.spark.{SparkConf,SparkFiles}importorg.lionsoul.ip2region.{DbConfig,DbSearcher}
importscala.collection.mutableimportscala.collection.mutable.ArrayBuffer
/***页面转换率实时统计*/objectPageStreaming{privatevalgroupid="vip_count_groupid"
ssc.sparkContext.addFile(this.getClass.getResource("/ip2region.db").getPath)//广播文件valipDStream=dsStream.mapPartitions(patitions=>{valdbFile=SparkFiles.get("ip2region.db")valipsearch=newDbSearcher(newDbConfig(),dbFile)patitions.map{item=>valip=item._4valprovince=ipsearch.memorySearch(ip).getRegion().split("\\|")(2)//获取ip详情中国|0|上海|上海市|有线通(province,1l)//根据省份统计点击个数}}).reduceByKey(_+_)
//计算转换率//处理完业务逻辑后手动提交offset维护到本地mysql中stream.foreachRDD(rdd=>{valsqlProxy=newSqlProxy()valclient=DataSourceUtil.getConnectiontry{calcJumRate(sqlProxy,client)//计算转换率valoffsetRanges:Array[OffsetRange]=rdd.asInstanceOf[HasOffsetRanges].offsetRangesfor(or<-offsetRanges){sqlProxy.executeUpdate(client,"replaceinto`offset_manager`(groupid,topic,`partition`,untilOffset)values(,,,)",Array(groupid,or.topic,or.partition.toString,or.untilOffset))}}catch{casee:Exception=>e.printStackTrace()}finally{sqlProxy.shutdown(client)}})ssc.start()ssc.awaitTermination()}
/***计算页面跳转个数**@paramsqlProxy*@paramitem*@paramclient*/defcalcPageJumpCount(sqlProxy:SqlProxy,item:(String,Int),client:Connection):Unit={valkeys=item._1.split("_")varnum:Long=item._2valpage_id=keys(1).toInt//获取当前page_idvallast_page_id=keys(0).toInt//获取上一page_idvalnext_page_id=keys(2).toInt//获取下页面page_id//查询当前page_id的历史num个数sqlProxy.executeQuery(client,"selectnumfrompage_jump_ratewherepage_id=",Array(page_id),newQueryCallback{overridedefprocess(rs:ResultSet):Unit={while(rs.next()){num+=rs.getLong(1)}rs.close()}
//对num进行修改并且判断当前page_id是否为首页if(page_id==1){sqlProxy.executeUpdate(client,"insertintopage_jump_rate(last_page_id,page_id,next_page_id,num,jump_rate)"+"values(,,,,)onduplicatekeyupdatenum=num+",Array(last_page_id,page_id,next_page_id,num,"100%",num))}else{sqlProxy.executeUpdate(client,"insertintopage_jump_rate(last_page_id,page_id,next_page_id,num)"+"values(,,,)onduplicatekeyupdatenum=num+",Array(last_page_id,page_id,next_page_id,num,num))}})}
5.9实时统计学员播放视频各时长packagecom.atguigu.qzpoint.streaming
importjava.langimportjava.sql.{Connection,ResultSet}
importcom.atguigu.qzpoint.bean.LearnModelimportcom.atguigu.qzpoint.util.{DataSourceUtil,ParseJsonData,QueryCallback,SqlProxy}importorg.apache.kafka.clients.consumer.ConsumerRecordimportorg.apache.kafka.common.TopicPartitionimportorg.apache.kafka.common.serialization.StringDeserializerimportorg.apache.spark.SparkConfimportorg.apache.spark.streaming.dstream.InputDStreamimportorg.apache.spark.streaming.kafka010._importorg.apache.spark.streaming.{Seconds,StreamingContext}
objectCourseLearnStreaming{privatevalgroupid="course_learn_test1"
defmain(args:Array[String]):Unit={valconf=newSparkConf().setAppName(this.getClass.getSimpleName).set("spark.streaming.kafka.maxRatePerPartition","30").set("spark.streaming.stopGracefullyOnShutdown","true")
//解析json数据valdsStream=stream.mapPartitions(partitions=>{partitions.map(item=>{valjson=item.value()valjsonObject=ParseJsonData.getJsonData(json)valuserId=jsonObject.getIntValue("uid")valcwareid=jsonObject.getIntValue("cwareid")valvideoId=jsonObject.getIntValue("videoid")valchapterId=jsonObject.getIntValue("chapterid")valedutypeId=jsonObject.getIntValue("edutypeid")valsubjectId=jsonObject.getIntValue("subjectid")valsourceType=jsonObject.getString("sourceType")valspeed=jsonObject.getIntValue("speed")valts=jsonObject.getLong("ts")valte=jsonObject.getLong("te")valps=jsonObject.getIntValue("ps")valpe=jsonObject.getIntValue("pe")LearnModel(userId,cwareid,videoId,chapterId,edutypeId,subjectId,sourceType,speed,ts,te,ps,pe)})})
dsStream.foreachRDD(rdd=>{rdd.cache()//统计播放视频有效时长完成时长总时长rdd.groupBy(item=>item.userId+"_"+item.cwareId+"_"+item.videoId).foreachPartition(partitoins=>{valsqlProxy=newSqlProxy()valclient=DataSourceUtil.getConnectiontry{partitoins.foreach{case(key,iters)=>calcVideoTime(key,iters,sqlProxy,client)//计算视频时长}}catch{casee:Exception=>e.printStackTrace()}finally{sqlProxy.shutdown(client)}})//统计章节下视频播放总时长rdd.mapPartitions(partitions=>{partitions.map(item=>{valtotaltime=Math.ceil((item.te-item.ts)/1000).toLongvalkey=item.chapterId(key,totaltime)})}).reduceByKey(_+_).foreachPartition(partitoins=>{valsqlProxy=newSqlProxy()valclient=DataSourceUtil.getConnectiontry{partitoins.foreach(item=>{sqlProxy.executeUpdate(client,"insertintochapter_learn_detail(chapterid,totaltime)values(,)onduplicatekey"+"updatetotaltime=totaltime+",Array(item._1,item._2,item._2))})}catch{casee:Exception=>e.printStackTrace()}finally{sqlProxy.shutdown(client)}})
//统计课件下的总播放时长rdd.mapPartitions(partitions=>{partitions.map(item=>{valtotaltime=Math.ceil((item.te-item.ts)/1000).toLongvalkey=item.cwareId(key,totaltime)})}).reduceByKey(_+_).foreachPartition(partitions=>{valsqlProxy=newSqlProxy()valclient=DataSourceUtil.getConnectiontry{partitions.foreach(item=>{sqlProxy.executeUpdate(client,"insertintocwareid_learn_detail(cwareid,totaltime)values(,)onduplicatekey"+"updatetotaltime=totaltime+",Array(item._1,item._2,item._2))})}catch{casee:Exception=>e.printStackTrace()}finally{sqlProxy.shutdown(client)}})
//统计辅导下的总播放时长rdd.mapPartitions(partitions=>{partitions.map(item=>{valtotaltime=Math.ceil((item.te-item.ts)/1000).toLongvalkey=item.edutypeId(key,totaltime)})}).reduceByKey(_+_).foreachPartition(partitions=>{valsqlProxy=newSqlProxy()valclient=DataSourceUtil.getConnectiontry{partitions.foreach(item=>{sqlProxy.executeUpdate(client,"insertintoedutype_learn_detail(edutypeid,totaltime)values(,)onduplicatekey"+"updatetotaltime=totaltime+",Array(item._1,item._2,item._2))})}catch{casee:Exception=>e.printStackTrace()}finally{sqlProxy.shutdown(client)}})
})//计算转换率//处理完业务逻辑后手动提交offset维护到本地mysql中stream.foreachRDD(rdd=>{valsqlProxy=newSqlProxy()valclient=DataSourceUtil.getConnectiontry{valoffsetRanges:Array[OffsetRange]=rdd.asInstanceOf[HasOffsetRanges].offsetRangesfor(or<-offsetRanges){sqlProxy.executeUpdate(client,"replaceinto`offset_manager`(groupid,topic,`partition`,untilOffset)values(,,,)",Array(groupid,or.topic,or.partition.toString,or.untilOffset))}}catch{casee:Exception=>e.printStackTrace()}finally{sqlProxy.shutdown(client)}})ssc.start()ssc.awaitTermination()}
第6章总结与调优6.1保证SparkStreaming第一次启动不丢数据在kafka的参数auto.offset.rest设定为earlist,保证SparkStreaming第一次启动从kafka最早偏移量开始拉取数据
6.2SparkStreaming手动维护偏移量
在SparkStreaming下有三种消费模式的定义最多一次、至少一次、恰好一次那么最好是无限接近恰好一次。要实现恰好一次偏移量必须手动维护,因为自动提交会在SparkStreaming刚运行时就立马提交offset,如果这个时候SparkStreaming消费信息失败了,那么offset也就错误提交了。所以必须保证:1.手动维护偏移量2.处理完业务数据后再提交offset手动维护偏移量需设置kafka参数enable.auto.commit改为false
手动维护提交offset有两种选择:1.处理完业务数据后手动提交到Kafka2.处理完业务数据后手动提交到本地库如MySql、HBase
1、先来看如何提交到kafka官网所示:
stream.foreachRdd后根据每个rdd先转换成HashOffsetRanges对象通过.offsetRanges方法获取到偏移量对象,再通过commitAsync方法将偏移量提交。
2、维护到本地MySql如项目所示:Driver端需先去判断Msql库中是否存在偏移量,如果存在偏移量则从MySql中获取到当前topic对应的最新offset大小,如果MySql不存在则从kafka中获取
消费到数据后,进行业务处理处理完后需将offset最新值保存到MySql
那么如果有面试官提问如何保证数据恰好一次性消费回答到这两点一般就可以了,手动维护便宜量和先处理完业务数据再提交offset。但是处理业务数据和提交offset并非同一事物,在极端情况下如提交offset时断网断电还是会导致offset没有提交并且业务数据已处理完的情况。
那么保证事物就需要将并行度调成1或者将数据collect到driver端,再进行数据业务处理和提交offset,但这样还会导致并行度变成1很可能导致处理速度跟不上,所以大数据情况下一般不考虑事物。
6.3updateStateByKey算子与checkpoint
updateStateBykey算子根据官网描述,是返回一个新的“状态”的DStream的算子,其通过在键的先前状态和键的新值上应用给定函数更新每个键的状态。
具体写法:根据历史状态值,和当前批次的数据状态值的累加操作得出一个最新的结果。如项目中代码:
那么使用updateStateByBykey算子,必须使用SparkStreaming的checkpoint来维护历史状态数据
那么看下Hdfs上路径下的文件
存在小文件且小文件个数不可控,所以在真实企业生产环境上并不会使用checkpoint操作,也不会使用基于checkpoint的算子如updateStateBykey算子
那么如何代替updateStateBykey这种基于历史数据状态的操作的算子呢:在进行相应操作时,可以去库中查询出历史数据,再与当前数据进行操作得出最新结果集,将结果集再刷新到本地库中。
6.4计算SparkStreaming一秒钟拉取多少条数据在企业中往往会根据业务的实时性来定制一秒钟消费数据量的条数,来达到实时性,那么通过什么参数来设置SparkStreaming从kafka的拉取的条数呢。
根据官网描述,可以设置spark.streaming.kafka.maxRatePerPartition参数来设置SparkStreaming从kafka分区每秒拉取的条数
那么在项目中如实时统计学员做题正确率与知识点掌握度需求中,需要每秒100处理速度,针对此需求topic为qz_log分区为10,那么通过此参数设定10即可,每个分区没秒10条数据。一秒处理100条数据,当前批次为3秒一次,一批处理300条数据.
6.5SparkStreaming背压机制
6.6一个stage的耗时由什么决定
由上图可以看出一个stage由最慢的task耗时决定。6.7SparkStreaming优雅关闭提交SparkStreaming任务到yarn后,当需要停止程序时使用yarnapplication-killapplication_id命令来关闭SparkStreaming,那么操作此命令时需要保证数据不丢失,需要设置spark.streaming.stopGracefullOnShutdown参数为ture
当设置此参数后,SparkStreaming程序在接收到kill命令时,不会立马结束程序,Spark会在JVM关闭时正常关闭SparkStreaming,而不是是立马关闭,即保证当前数据处理完后再关闭。
6.8SparkStreaming默认分区数SparkStreaming默认并行度与所对应kafkatopic创建时的分区数所对应,比如项目中topic的分区都是10,SparkStreaming的默认分区就为10,且在真实开发环境中SparkStreaming一般不会去使用repartition增大分区操作,因为会进行shuffle耗时。第7章打包、spark-submit命令
spark-submit--masteryarn--deploy-modeclient--driver-memory1g--num-executors2--executor-cores2--executor-memory2g--classcom.atguigu.qzpoint.streaming.CourseLearnStreamingcom_atguigu_sparkstreaming-1.0-SNAPSHOT-jar-with-dependencies.jar
spark-submit--masteryarn--deploy-modeclient--driver-memory1g--num-executors2--executor-cores2--executor-memory2g--classcom.atguigu.qzpoint.streaming.PageStreamingcom_atguigu_sparkstreaming-1.0-SNAPSHOT-jar-with-dependencies.jar
spark-submit--masteryarn--deploy-modeclient--driver-memory1g--num-executors2--executor-cores2--executor-memory2g--classcom.atguigu.qzpoint.streaming.QzPointStreamingcom_atguigu_sparkstreaming-1.0-SNAPSHOT-jar-with-dependencies.jar
spark-submit--masteryarn--deploy-modeclient--driver-memory1g--num-executors2--executor-cores2--executor-memory2g--classcom.atguigu.qzpoint.streaming.RegisterStreamingcom_atguigu_sparkstreaming-1.0-SNAPSHOT-jar-with-dependencies.jar
第8章常见问题8.1jar包冲突问题
根据官网描述spark-streaming-kafka-0-10_2.11jar包中包含kafka-clients客户端jar包不需要再次添加kafka客户端jar包,如果再次添加kafka客户端jar包可能会引起版本冲突8.2无法序列化问题和正确操作数据库连接
对于开发者人员刚开始接手SparkStreaming时往往会错误的使用数据库连接,如上述官网描述对于connection获取,代码写在了foreachRDD内rdd.foreach上,那么这样操作实际是在driver端创建到connection,然后rdd.foreacher操作为分布式节点操作,connection.send方法实际发生在了各个executor节点上,这个时候就涉及到了driver端对象到executor端的一个网络传输问题,这个时候spark会发生错误,会报一个org.apache.spark.SparkException:Tasknotserializable这样一个任务无法序列化的错,在Spark中遇到此错误一般都是错误的将driver端对象在executor端使用造成的。那么创建connection操作必须在executor端执行
如官网描述,在rdd.foreach里创建connection,这样虽然不会发生错误,但是这样循环的粒度是针对每条数据,每循环一条数据都会创建一个连接,这样会造成资源浪费。图1和图2都是错误展示最后,正确的使用数据库连接,循环粒度是分区,在每个分区下创建一个数据库连接,循环分区下的数据每条数据使用当前分区下的数据库连接,当使用完毕后归还的连接池中。所以在SparkStreaming开发中需养成良好习惯:dstream.foreachRdd{rdd=>{rdd.foreachPartition{partitions=>{//循环分区//创建connectionpartitions.foreach(record=>{//业务处理使用当前connection}}//归还连接}}循环粒度foreachRdd=>foreachPartition=>foreach8.3SparkStreaming操作数据库时线程安全问题在SparkStreaming中,采用查询本地库的历史数据和当前批次数据的计算来代替需要基于hdfs的算子updatestatebykey,那么在查询和重新刷入本地库的时候处理不当会造成线程安全问题,数据不准的问题。
那么在查询本地库时需要进行一次预聚合操作,将相同key的数据落到一个分区,保证同一个key的数据指挥操作数据库一次,预聚合操作有reducebykey、groupbykey、groupby等算子。如项目所写:
题库更新操作时需要查询MySql本地库的历史数据,在查询本地库钱先进行了groupby操作将相同符合条件的业务数据聚合到了一个分区,保证相同用户下同一课程同一知识点的数据在当前批次下只会去查询一次MySql数据库并且一次覆盖。8.4数据倾斜问题数据倾斜为在shuffle过程中,必须将各个节点上相同的key的数据拉取到某节点的一个task来进行,此时如果某个key对应的数据量特别大的话,就会发生数据倾,某个task耗时非常大,那么一个stage的耗时由最慢的task决定,从而导致整个SparkStreaming任务运行非常缓慢。以reducebykey为例:
这张图就是发生了数据倾斜,那么解决方案最有效的为两阶段聚合,先打散key聚合一次,再还原key聚合一次。
具体代码展示:
对DStream进行map操作对原始key前加上随机值,map完后进行第一次reducebykey操作,此结果为打散key后的reducebykey结果,再次进行map操作根据分隔符,去掉随机数保留原有key,map后再进行reducebykey,保证相同key的数据准确累加。
8.5SparkStreaming消费多topic在真实环境中往往会有许多业务场景非常类似,比如打标签、监控某指标,可能代码逻辑都一样只有某个取值不一样,这个时候一个SparkStreaming就可以监控多个topic,然后根据topic的名称来进行不同的业务处理,就不需要开发多个SparkStreaming程序了。查看kafkaUtils.createDirectStream方法
可以发现topic参数可以是个多个值,也就是createDirectStream方法支持多个topic。
通过kafkaUtils.createDirectStream方法获取到DStream,这个DStream流的类型为InputDStream[ConsumerRecord[String,String]],那么在可以通过调用map方法,ConsumerRecord的topic方法来获取对应的topic名称
获取到topic名称后value数据后,就可以在后续操作里根据判断topic名称来处理不同的业务。
点击下载内存泄露分析工具下载下来是一个jar包
那么需要编写bat批处理来运行创建run.bat
编辑titleibm-heap-analyzer
path=%PATH%;%C:\JAVA\jdk1.8.0_51\bin
E:
cdE:\IBMheapAnalyzer\IBM_DUMP_wjfx
java.exe-Xms1048M-Xmx4096M-jarha456.jar
路径需要改成自己当前路径点击run.bat运行
运行成功8.6.2模拟内存泄露场景内存泄露的原因往往是因为对象无法释放或被回收造成,那么在本项目中就模拟此场景。
如上图所示,在计算学员知识点正确率与掌握度代码中,在最后提交offset提交偏移量后,循环往map里添加LearnMode对象,使每处理一批数据就往map里添加100000个LearnMode对象,使堆内存撑满。
8.6.3查找driver进程在集群上提交sparkstreaming任务ps-ef|grepcom.atguigu.qzpoint.streaming.QzPointStreaming通过此命令查找到driver进程号
进程号为6860
8.6.4JMAP命令使用jmap-heappid命令查看6860进程,内存使用情况。jmap-heap6860
发现新生代和老年代内存占满,有对象无法被销毁或回收。再通过jmap-histopid命令查看对象的内存情况。
jmap-histo6860>a.log
jmap-dump:live,format=b,file=dump.log6860
将dump从集群下载下来,打开IBMHeapAnalyzer进行分析
从饼状图可以看出,堆内存中存在大量HashEntry类点击Analysis分析查看各个对象内存的泄露大小和总大小。
选中最大的分析对象双击,或者右键点击Findobjectinatreeview查看树状图。
可以看出HashEntry的父类为HashMap,并且点击HashEntry查看内部,
链表里的next对象为LearnModel
可以定位SparkStreaming在操作map添加LearnModel时发生了内存泄露