大数据项目实战之在线教育(04实时实现)十一vs十一

第1章项目需求及架构设计1.1项目需求分析一、数据采集平台搭建二、Kafka、Zookeeper中间件准备三、下游SparkStreaming对接Kafka接收数据,实现vip个数统计、栏目打标签功能、做题正确率与掌握度的实时计算功能。

1.2项目框架1.2.1技术选型一、数据存储:Kafka、MySql二、数据处理:Spark三、其他组件:Zookeeper1.2.2流程设计

第2章需求2.0原始数据格式及对应topic2.0.1实时统计注册人数kafka对应topic:register_topic数据格式:

数据格式

{"app_id":"1","device_id":"102","distinct_id":"5fa401c8-dd45-4425-b8c6-700f9f74c532","event_name":"-","ip":"121.76.152.135","last_event_name":"-","last_page_id":"0","next_event_name":"-","next_page_id":"2","page_id":"1","server_time":"-","uid":"245494"}

uid:用户idapp_id:平台iddeviceid:平台iddisinct_id:唯一标识Ip:用户ip地址last_page_id:上一页面idpage_id:当前页面id0:首页1:商品课程页2:订单页面3:支付页面next_page_id:下一页面id2.0.4实时统计学员播放视频各时长Kafka对应topic:course_learn数据格式:

{"biz":"bdfb58e5-d14c-45d2-91bc-1d9409800ac3","chapterid":"1","cwareid":"3","edutypeid":"3","pe":"55","ps":"41","sourceType":"APP","speed":"2","subjectid":"2","te":"1563352166417","ts":"1563352159417","uid":"235","videoid":"2"}

2.2模拟数据采集模拟数据采集将准备好的log文件使用kafka生产者代码发送信息到topic

注册日志数据register.log日志文件对应topic:register_topic

做题数据qz_log日志文件对应topic:qz_log

商品页面数据page_log日志文件对应topic:page_topic

视频播放时长数据course_learn.log日志文件对应topic:course_learn

如果windows下没有安装hadoop环境先配置环境

Ip解析工具Ip解析本地库:

2.3.实时统计注册人员信息用户使用网站或APP进行注册,后台实时收集数据传输Kafka,SparkStreaming进行对接统计,实时统计注册人数。需求1:实时统计注册人数,批次为3秒一批,使用updateStateBykey算子计算历史数据和当前批次的数据总数,仅此需求使用updateStateBykey,后续需求不使用updateStateBykey。需求2:每6秒统统计一次1分钟内的注册数据,不需要历史数据提示:reduceByKeyAndWindow算子需求3:观察对接数据,尝试进行调优。2.4实时计算学员做题算正确率与知识点掌握度mysql建表语句:

创建相应的包

配置pom.xml文件

com_atguigu_warehouseorg.apache.sparkspark-core_2.11${spark.version}org.apache.sparkspark-sql_2.11${spark.version}org.apache.sparkspark-hive_2.11${spark.version}org.scala-langscala-library

com.alibabafastjson1.2.47

org.scala-toolsmaven-scala-plugin2.15.1compile-scalaadd-sourcecompiletest-compile-scalaadd-sourcetestCompileorg.apache.maven.pluginsmaven-assembly-pluginjar-with-dependencies

第5章需求实现5.1创建MySql配置文件在resource源码包下创建comerce.peoperties

jdbc.url=jdbc:mysql://hadoop102:3306/course_learnuseUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai&useSSL=falsejdbc.user=rootjdbc.password=000000

5.2创建读取配置文件的工具类在com.atguigu.qzpoint.util创建ConfigurationManager类packagecom.atguigu.qzpoint.util;

importjava.io.InputStream;importjava.util.Properties;

/****读取配置文件工具类*/publicclassConfigurationManager{

privatestaticPropertiesprop=newProperties();

static{try{InputStreaminputStream=ConfigurationManager.class.getClassLoader().getResourceAsStream("comerce.properties");prop.load(inputStream);}catch(Exceptione){e.printStackTrace();}}

//获取配置项publicstaticStringgetProperty(Stringkey){returnprop.getProperty(key);}

//获取布尔类型的配置项publicstaticbooleangetBoolean(Stringkey){Stringvalue=prop.getProperty(key);try{returnBoolean.valueOf(value);}catch(Exceptione){e.printStackTrace();}returnfalse;}

}5.3创建Json解析工具类在com.atguigu.qz.point.util创建ParseJsonData类packagecom.atguigu.qzpoint.util;

importcom.alibaba.fastjson.JSONObject;

publicclassParseJsonData{

publicstaticJSONObjectgetJsonData(Stringdata){try{returnJSONObject.parseObject(data);}catch(Exceptione){returnnull;}}}

5.4创建Druid连接池在com.atgugiu.qzpoint.util创建DataSourceUtil类packagecom.atguigu.qzpoint.util;

importcom.alibaba.druid.pool.DruidDataSourceFactory;

importjavax.sql.DataSource;importjava.io.Serializable;importjava.sql.*;importjava.util.Properties;

/***德鲁伊连接池*/publicclassDataSourceUtilimplementsSerializable{publicstaticDataSourcedataSource=null;

//提供获取连接的方法publicstaticConnectiongetConnection()throwsSQLException{returndataSource.getConnection();}

//提供关闭资源的方法【connection是归还到连接池】//提供关闭资源的方法【方法重载】3dqlpublicstaticvoidcloseResource(ResultSetresultSet,PreparedStatementpreparedStatement,Connectionconnection){//关闭结果集//ctrl+alt+m将java语句抽取成方法closeResultSet(resultSet);//关闭语句执行者closePrepareStatement(preparedStatement);//关闭连接closeConnection(connection);}

privatestaticvoidcloseConnection(Connectionconnection){if(connection!=null){try{connection.close();}catch(SQLExceptione){e.printStackTrace();}}}

privatestaticvoidclosePrepareStatement(PreparedStatementpreparedStatement){if(preparedStatement!=null){try{preparedStatement.close();}catch(SQLExceptione){e.printStackTrace();}}}

privatestaticvoidcloseResultSet(ResultSetresultSet){if(resultSet!=null){try{resultSet.close();}catch(SQLExceptione){e.printStackTrace();}}}}

5.5创建操作MySql的代理类在com.atguigu.qzpoint.util创建SqlProxy类packagecom.atguigu.qzpoint.util

importjava.sql.{Connection,PreparedStatement,ResultSet}

traitQueryCallback{defprocess(rs:ResultSet)}

classSqlProxy{privatevarrs:ResultSet=_privatevarpsmt:PreparedStatement=_

/***执行修改语句**@paramconn*@paramsql*@paramparams*@return*/defexecuteUpdate(conn:Connection,sql:String,params:Array[Any]):Int={varrtn=0try{psmt=conn.prepareStatement(sql)if(params!=null&¶ms.length>0){for(i<-0untilparams.length){psmt.setObject(i+1,params(i))}}rtn=psmt.executeUpdate()}catch{casee:Exception=>e.printStackTrace()}rtn}

/***执行查询语句*执行查询语句**@paramconn*@paramsql*@paramparams*@return*/defexecuteQuery(conn:Connection,sql:String,params:Array[Any],queryCallback:QueryCallback)={rs=nulltry{psmt=conn.prepareStatement(sql)if(params!=null&¶ms.length>0){for(i<-0untilparams.length){psmt.setObject(i+1,params(i))}}rs=psmt.executeQuery()queryCallback.process(rs)}catch{casee:Exception=>e.printStackTrace()}}

defshutdown(conn:Connection):Unit=DataSourceUtil.closeResource(rs,psmt,conn)}

5.6实时统计注册人数代码实现packagecom.atguigu.qzpoint.streaming

importjava.langimportjava.sql.ResultSetimportjava.util.Random

importcom.atguigu.qzpoint.util.{DataSourceUtil,QueryCallback,SqlProxy}importorg.apache.kafka.clients.consumer.ConsumerRecordimportorg.apache.kafka.common.TopicPartitionimportorg.apache.kafka.common.serialization.StringDeserializerimportorg.apache.spark.SparkConfimportorg.apache.spark.streaming.dstream.InputDStreamimportorg.apache.spark.streaming.kafka010._importorg.apache.spark.streaming.{Seconds,StreamingContext}

importscala.collection.mutable

objectRegisterStreaming{privatevalgroupid="register_group_test"

//valdsStream=stream.filter(item=>item.value().split("\t").length==3)//.mapPartitions(partitions=>//partitions.map(item=>{//valrand=newRandom()//valline=item.value()//valarr=line.split("\t")//valapp_id=arr(1)//(rand.nextInt(3)+"_"+app_id,1)//}))//dsStream.print()//vala=dsStream.reduceByKey(_+_)//a.print()//a.map(item=>{//valappid=item._1.split("_")(1)//(appid,item._2)//}).reduceByKey(_+_).print()

//处理完业务逻辑后手动提交offset维护到本地mysql中stream.foreachRDD(rdd=>{valsqlProxy=newSqlProxy()valclient=DataSourceUtil.getConnectiontry{valoffsetRanges:Array[OffsetRange]=rdd.asInstanceOf[HasOffsetRanges].offsetRangesfor(or<-offsetRanges){sqlProxy.executeUpdate(client,"replaceinto`offset_manager`(groupid,topic,`partition`,untilOffset)values(,,,)",Array(groupid,or.topic,or.partition.toString,or.untilOffset))}}catch{casee:Exception=>e.printStackTrace()}finally{sqlProxy.shutdown(client)}})ssc.start()ssc.awaitTermination()}

}

5.7实时统计学员做题正确率与知识点掌握度packagecom.atguigu.qzpoint.streaming

importjava.langimportjava.sql.{Connection,ResultSet}importjava.time.LocalDateTimeimportjava.time.format.DateTimeFormatter

/***知识点掌握度实时统计*/objectQzPointStreaming{

privatevalgroupid="qz_point_group"

}}

5.8实时统计商品页到订单页,订单页到支付页转换率packagecom.atguigu.qzpoint.streaming

importjava.langimportjava.sql.{Connection,ResultSet}importjava.text.NumberFormat

importcom.atguigu.qzpoint.util.{DataSourceUtil,ParseJsonData,QueryCallback,SqlProxy}importorg.apache.kafka.clients.consumer.ConsumerRecordimportorg.apache.kafka.common.TopicPartitionimportorg.apache.kafka.common.serialization.StringDeserializerimportorg.apache.spark.streaming.dstream.InputDStreamimportorg.apache.spark.streaming.kafka010._importorg.apache.spark.streaming.{Seconds,StreamingContext}importorg.apache.spark.{SparkConf,SparkFiles}importorg.lionsoul.ip2region.{DbConfig,DbSearcher}

importscala.collection.mutableimportscala.collection.mutable.ArrayBuffer

/***页面转换率实时统计*/objectPageStreaming{privatevalgroupid="vip_count_groupid"

ssc.sparkContext.addFile(this.getClass.getResource("/ip2region.db").getPath)//广播文件valipDStream=dsStream.mapPartitions(patitions=>{valdbFile=SparkFiles.get("ip2region.db")valipsearch=newDbSearcher(newDbConfig(),dbFile)patitions.map{item=>valip=item._4valprovince=ipsearch.memorySearch(ip).getRegion().split("\\|")(2)//获取ip详情中国|0|上海|上海市|有线通(province,1l)//根据省份统计点击个数}}).reduceByKey(_+_)

//计算转换率//处理完业务逻辑后手动提交offset维护到本地mysql中stream.foreachRDD(rdd=>{valsqlProxy=newSqlProxy()valclient=DataSourceUtil.getConnectiontry{calcJumRate(sqlProxy,client)//计算转换率valoffsetRanges:Array[OffsetRange]=rdd.asInstanceOf[HasOffsetRanges].offsetRangesfor(or<-offsetRanges){sqlProxy.executeUpdate(client,"replaceinto`offset_manager`(groupid,topic,`partition`,untilOffset)values(,,,)",Array(groupid,or.topic,or.partition.toString,or.untilOffset))}}catch{casee:Exception=>e.printStackTrace()}finally{sqlProxy.shutdown(client)}})ssc.start()ssc.awaitTermination()}

/***计算页面跳转个数**@paramsqlProxy*@paramitem*@paramclient*/defcalcPageJumpCount(sqlProxy:SqlProxy,item:(String,Int),client:Connection):Unit={valkeys=item._1.split("_")varnum:Long=item._2valpage_id=keys(1).toInt//获取当前page_idvallast_page_id=keys(0).toInt//获取上一page_idvalnext_page_id=keys(2).toInt//获取下页面page_id//查询当前page_id的历史num个数sqlProxy.executeQuery(client,"selectnumfrompage_jump_ratewherepage_id=",Array(page_id),newQueryCallback{overridedefprocess(rs:ResultSet):Unit={while(rs.next()){num+=rs.getLong(1)}rs.close()}

//对num进行修改并且判断当前page_id是否为首页if(page_id==1){sqlProxy.executeUpdate(client,"insertintopage_jump_rate(last_page_id,page_id,next_page_id,num,jump_rate)"+"values(,,,,)onduplicatekeyupdatenum=num+",Array(last_page_id,page_id,next_page_id,num,"100%",num))}else{sqlProxy.executeUpdate(client,"insertintopage_jump_rate(last_page_id,page_id,next_page_id,num)"+"values(,,,)onduplicatekeyupdatenum=num+",Array(last_page_id,page_id,next_page_id,num,num))}})}

5.9实时统计学员播放视频各时长packagecom.atguigu.qzpoint.streaming

importjava.langimportjava.sql.{Connection,ResultSet}

importcom.atguigu.qzpoint.bean.LearnModelimportcom.atguigu.qzpoint.util.{DataSourceUtil,ParseJsonData,QueryCallback,SqlProxy}importorg.apache.kafka.clients.consumer.ConsumerRecordimportorg.apache.kafka.common.TopicPartitionimportorg.apache.kafka.common.serialization.StringDeserializerimportorg.apache.spark.SparkConfimportorg.apache.spark.streaming.dstream.InputDStreamimportorg.apache.spark.streaming.kafka010._importorg.apache.spark.streaming.{Seconds,StreamingContext}

objectCourseLearnStreaming{privatevalgroupid="course_learn_test1"

defmain(args:Array[String]):Unit={valconf=newSparkConf().setAppName(this.getClass.getSimpleName).set("spark.streaming.kafka.maxRatePerPartition","30").set("spark.streaming.stopGracefullyOnShutdown","true")

//解析json数据valdsStream=stream.mapPartitions(partitions=>{partitions.map(item=>{valjson=item.value()valjsonObject=ParseJsonData.getJsonData(json)valuserId=jsonObject.getIntValue("uid")valcwareid=jsonObject.getIntValue("cwareid")valvideoId=jsonObject.getIntValue("videoid")valchapterId=jsonObject.getIntValue("chapterid")valedutypeId=jsonObject.getIntValue("edutypeid")valsubjectId=jsonObject.getIntValue("subjectid")valsourceType=jsonObject.getString("sourceType")valspeed=jsonObject.getIntValue("speed")valts=jsonObject.getLong("ts")valte=jsonObject.getLong("te")valps=jsonObject.getIntValue("ps")valpe=jsonObject.getIntValue("pe")LearnModel(userId,cwareid,videoId,chapterId,edutypeId,subjectId,sourceType,speed,ts,te,ps,pe)})})

dsStream.foreachRDD(rdd=>{rdd.cache()//统计播放视频有效时长完成时长总时长rdd.groupBy(item=>item.userId+"_"+item.cwareId+"_"+item.videoId).foreachPartition(partitoins=>{valsqlProxy=newSqlProxy()valclient=DataSourceUtil.getConnectiontry{partitoins.foreach{case(key,iters)=>calcVideoTime(key,iters,sqlProxy,client)//计算视频时长}}catch{casee:Exception=>e.printStackTrace()}finally{sqlProxy.shutdown(client)}})//统计章节下视频播放总时长rdd.mapPartitions(partitions=>{partitions.map(item=>{valtotaltime=Math.ceil((item.te-item.ts)/1000).toLongvalkey=item.chapterId(key,totaltime)})}).reduceByKey(_+_).foreachPartition(partitoins=>{valsqlProxy=newSqlProxy()valclient=DataSourceUtil.getConnectiontry{partitoins.foreach(item=>{sqlProxy.executeUpdate(client,"insertintochapter_learn_detail(chapterid,totaltime)values(,)onduplicatekey"+"updatetotaltime=totaltime+",Array(item._1,item._2,item._2))})}catch{casee:Exception=>e.printStackTrace()}finally{sqlProxy.shutdown(client)}})

//统计课件下的总播放时长rdd.mapPartitions(partitions=>{partitions.map(item=>{valtotaltime=Math.ceil((item.te-item.ts)/1000).toLongvalkey=item.cwareId(key,totaltime)})}).reduceByKey(_+_).foreachPartition(partitions=>{valsqlProxy=newSqlProxy()valclient=DataSourceUtil.getConnectiontry{partitions.foreach(item=>{sqlProxy.executeUpdate(client,"insertintocwareid_learn_detail(cwareid,totaltime)values(,)onduplicatekey"+"updatetotaltime=totaltime+",Array(item._1,item._2,item._2))})}catch{casee:Exception=>e.printStackTrace()}finally{sqlProxy.shutdown(client)}})

//统计辅导下的总播放时长rdd.mapPartitions(partitions=>{partitions.map(item=>{valtotaltime=Math.ceil((item.te-item.ts)/1000).toLongvalkey=item.edutypeId(key,totaltime)})}).reduceByKey(_+_).foreachPartition(partitions=>{valsqlProxy=newSqlProxy()valclient=DataSourceUtil.getConnectiontry{partitions.foreach(item=>{sqlProxy.executeUpdate(client,"insertintoedutype_learn_detail(edutypeid,totaltime)values(,)onduplicatekey"+"updatetotaltime=totaltime+",Array(item._1,item._2,item._2))})}catch{casee:Exception=>e.printStackTrace()}finally{sqlProxy.shutdown(client)}})

})//计算转换率//处理完业务逻辑后手动提交offset维护到本地mysql中stream.foreachRDD(rdd=>{valsqlProxy=newSqlProxy()valclient=DataSourceUtil.getConnectiontry{valoffsetRanges:Array[OffsetRange]=rdd.asInstanceOf[HasOffsetRanges].offsetRangesfor(or<-offsetRanges){sqlProxy.executeUpdate(client,"replaceinto`offset_manager`(groupid,topic,`partition`,untilOffset)values(,,,)",Array(groupid,or.topic,or.partition.toString,or.untilOffset))}}catch{casee:Exception=>e.printStackTrace()}finally{sqlProxy.shutdown(client)}})ssc.start()ssc.awaitTermination()}

第6章总结与调优6.1保证SparkStreaming第一次启动不丢数据在kafka的参数auto.offset.rest设定为earlist,保证SparkStreaming第一次启动从kafka最早偏移量开始拉取数据

6.2SparkStreaming手动维护偏移量

在SparkStreaming下有三种消费模式的定义最多一次、至少一次、恰好一次那么最好是无限接近恰好一次。要实现恰好一次偏移量必须手动维护,因为自动提交会在SparkStreaming刚运行时就立马提交offset,如果这个时候SparkStreaming消费信息失败了,那么offset也就错误提交了。所以必须保证:1.手动维护偏移量2.处理完业务数据后再提交offset手动维护偏移量需设置kafka参数enable.auto.commit改为false

手动维护提交offset有两种选择:1.处理完业务数据后手动提交到Kafka2.处理完业务数据后手动提交到本地库如MySql、HBase

1、先来看如何提交到kafka官网所示:

stream.foreachRdd后根据每个rdd先转换成HashOffsetRanges对象通过.offsetRanges方法获取到偏移量对象,再通过commitAsync方法将偏移量提交。

2、维护到本地MySql如项目所示:Driver端需先去判断Msql库中是否存在偏移量,如果存在偏移量则从MySql中获取到当前topic对应的最新offset大小,如果MySql不存在则从kafka中获取

消费到数据后,进行业务处理处理完后需将offset最新值保存到MySql

那么如果有面试官提问如何保证数据恰好一次性消费回答到这两点一般就可以了,手动维护便宜量和先处理完业务数据再提交offset。但是处理业务数据和提交offset并非同一事物,在极端情况下如提交offset时断网断电还是会导致offset没有提交并且业务数据已处理完的情况。

那么保证事物就需要将并行度调成1或者将数据collect到driver端,再进行数据业务处理和提交offset,但这样还会导致并行度变成1很可能导致处理速度跟不上,所以大数据情况下一般不考虑事物。

6.3updateStateByKey算子与checkpoint

updateStateBykey算子根据官网描述,是返回一个新的“状态”的DStream的算子,其通过在键的先前状态和键的新值上应用给定函数更新每个键的状态。

具体写法:根据历史状态值,和当前批次的数据状态值的累加操作得出一个最新的结果。如项目中代码:

那么使用updateStateByBykey算子,必须使用SparkStreaming的checkpoint来维护历史状态数据

那么看下Hdfs上路径下的文件

存在小文件且小文件个数不可控,所以在真实企业生产环境上并不会使用checkpoint操作,也不会使用基于checkpoint的算子如updateStateBykey算子

那么如何代替updateStateBykey这种基于历史数据状态的操作的算子呢:在进行相应操作时,可以去库中查询出历史数据,再与当前数据进行操作得出最新结果集,将结果集再刷新到本地库中。

6.4计算SparkStreaming一秒钟拉取多少条数据在企业中往往会根据业务的实时性来定制一秒钟消费数据量的条数,来达到实时性,那么通过什么参数来设置SparkStreaming从kafka的拉取的条数呢。

根据官网描述,可以设置spark.streaming.kafka.maxRatePerPartition参数来设置SparkStreaming从kafka分区每秒拉取的条数

那么在项目中如实时统计学员做题正确率与知识点掌握度需求中,需要每秒100处理速度,针对此需求topic为qz_log分区为10,那么通过此参数设定10即可,每个分区没秒10条数据。一秒处理100条数据,当前批次为3秒一次,一批处理300条数据.

6.5SparkStreaming背压机制

6.6一个stage的耗时由什么决定

由上图可以看出一个stage由最慢的task耗时决定。6.7SparkStreaming优雅关闭提交SparkStreaming任务到yarn后,当需要停止程序时使用yarnapplication-killapplication_id命令来关闭SparkStreaming,那么操作此命令时需要保证数据不丢失,需要设置spark.streaming.stopGracefullOnShutdown参数为ture

当设置此参数后,SparkStreaming程序在接收到kill命令时,不会立马结束程序,Spark会在JVM关闭时正常关闭SparkStreaming,而不是是立马关闭,即保证当前数据处理完后再关闭。

6.8SparkStreaming默认分区数SparkStreaming默认并行度与所对应kafkatopic创建时的分区数所对应,比如项目中topic的分区都是10,SparkStreaming的默认分区就为10,且在真实开发环境中SparkStreaming一般不会去使用repartition增大分区操作,因为会进行shuffle耗时。第7章打包、spark-submit命令

spark-submit--masteryarn--deploy-modeclient--driver-memory1g--num-executors2--executor-cores2--executor-memory2g--classcom.atguigu.qzpoint.streaming.CourseLearnStreamingcom_atguigu_sparkstreaming-1.0-SNAPSHOT-jar-with-dependencies.jar

spark-submit--masteryarn--deploy-modeclient--driver-memory1g--num-executors2--executor-cores2--executor-memory2g--classcom.atguigu.qzpoint.streaming.PageStreamingcom_atguigu_sparkstreaming-1.0-SNAPSHOT-jar-with-dependencies.jar

spark-submit--masteryarn--deploy-modeclient--driver-memory1g--num-executors2--executor-cores2--executor-memory2g--classcom.atguigu.qzpoint.streaming.QzPointStreamingcom_atguigu_sparkstreaming-1.0-SNAPSHOT-jar-with-dependencies.jar

spark-submit--masteryarn--deploy-modeclient--driver-memory1g--num-executors2--executor-cores2--executor-memory2g--classcom.atguigu.qzpoint.streaming.RegisterStreamingcom_atguigu_sparkstreaming-1.0-SNAPSHOT-jar-with-dependencies.jar

第8章常见问题8.1jar包冲突问题

根据官网描述spark-streaming-kafka-0-10_2.11jar包中包含kafka-clients客户端jar包不需要再次添加kafka客户端jar包,如果再次添加kafka客户端jar包可能会引起版本冲突8.2无法序列化问题和正确操作数据库连接

对于开发者人员刚开始接手SparkStreaming时往往会错误的使用数据库连接,如上述官网描述对于connection获取,代码写在了foreachRDD内rdd.foreach上,那么这样操作实际是在driver端创建到connection,然后rdd.foreacher操作为分布式节点操作,connection.send方法实际发生在了各个executor节点上,这个时候就涉及到了driver端对象到executor端的一个网络传输问题,这个时候spark会发生错误,会报一个org.apache.spark.SparkException:Tasknotserializable这样一个任务无法序列化的错,在Spark中遇到此错误一般都是错误的将driver端对象在executor端使用造成的。那么创建connection操作必须在executor端执行

如官网描述,在rdd.foreach里创建connection,这样虽然不会发生错误,但是这样循环的粒度是针对每条数据,每循环一条数据都会创建一个连接,这样会造成资源浪费。图1和图2都是错误展示最后,正确的使用数据库连接,循环粒度是分区,在每个分区下创建一个数据库连接,循环分区下的数据每条数据使用当前分区下的数据库连接,当使用完毕后归还的连接池中。所以在SparkStreaming开发中需养成良好习惯:dstream.foreachRdd{rdd=>{rdd.foreachPartition{partitions=>{//循环分区//创建connectionpartitions.foreach(record=>{//业务处理使用当前connection}}//归还连接}}循环粒度foreachRdd=>foreachPartition=>foreach8.3SparkStreaming操作数据库时线程安全问题在SparkStreaming中,采用查询本地库的历史数据和当前批次数据的计算来代替需要基于hdfs的算子updatestatebykey,那么在查询和重新刷入本地库的时候处理不当会造成线程安全问题,数据不准的问题。

那么在查询本地库时需要进行一次预聚合操作,将相同key的数据落到一个分区,保证同一个key的数据指挥操作数据库一次,预聚合操作有reducebykey、groupbykey、groupby等算子。如项目所写:

题库更新操作时需要查询MySql本地库的历史数据,在查询本地库钱先进行了groupby操作将相同符合条件的业务数据聚合到了一个分区,保证相同用户下同一课程同一知识点的数据在当前批次下只会去查询一次MySql数据库并且一次覆盖。8.4数据倾斜问题数据倾斜为在shuffle过程中,必须将各个节点上相同的key的数据拉取到某节点的一个task来进行,此时如果某个key对应的数据量特别大的话,就会发生数据倾,某个task耗时非常大,那么一个stage的耗时由最慢的task决定,从而导致整个SparkStreaming任务运行非常缓慢。以reducebykey为例:

这张图就是发生了数据倾斜,那么解决方案最有效的为两阶段聚合,先打散key聚合一次,再还原key聚合一次。

具体代码展示:

对DStream进行map操作对原始key前加上随机值,map完后进行第一次reducebykey操作,此结果为打散key后的reducebykey结果,再次进行map操作根据分隔符,去掉随机数保留原有key,map后再进行reducebykey,保证相同key的数据准确累加。

8.5SparkStreaming消费多topic在真实环境中往往会有许多业务场景非常类似,比如打标签、监控某指标,可能代码逻辑都一样只有某个取值不一样,这个时候一个SparkStreaming就可以监控多个topic,然后根据topic的名称来进行不同的业务处理,就不需要开发多个SparkStreaming程序了。查看kafkaUtils.createDirectStream方法

可以发现topic参数可以是个多个值,也就是createDirectStream方法支持多个topic。

通过kafkaUtils.createDirectStream方法获取到DStream,这个DStream流的类型为InputDStream[ConsumerRecord[String,String]],那么在可以通过调用map方法,ConsumerRecord的topic方法来获取对应的topic名称

获取到topic名称后value数据后,就可以在后续操作里根据判断topic名称来处理不同的业务。

点击下载内存泄露分析工具下载下来是一个jar包

那么需要编写bat批处理来运行创建run.bat

编辑titleibm-heap-analyzer

path=%PATH%;%C:\JAVA\jdk1.8.0_51\bin

E:

cdE:\IBMheapAnalyzer\IBM_DUMP_wjfx

java.exe-Xms1048M-Xmx4096M-jarha456.jar

路径需要改成自己当前路径点击run.bat运行

运行成功8.6.2模拟内存泄露场景内存泄露的原因往往是因为对象无法释放或被回收造成,那么在本项目中就模拟此场景。

如上图所示,在计算学员知识点正确率与掌握度代码中,在最后提交offset提交偏移量后,循环往map里添加LearnMode对象,使每处理一批数据就往map里添加100000个LearnMode对象,使堆内存撑满。

8.6.3查找driver进程在集群上提交sparkstreaming任务ps-ef|grepcom.atguigu.qzpoint.streaming.QzPointStreaming通过此命令查找到driver进程号

进程号为6860

8.6.4JMAP命令使用jmap-heappid命令查看6860进程,内存使用情况。jmap-heap6860

发现新生代和老年代内存占满,有对象无法被销毁或回收。再通过jmap-histopid命令查看对象的内存情况。

jmap-histo6860>a.log

jmap-dump:live,format=b,file=dump.log6860

将dump从集群下载下来,打开IBMHeapAnalyzer进行分析

从饼状图可以看出,堆内存中存在大量HashEntry类点击Analysis分析查看各个对象内存的泄露大小和总大小。

选中最大的分析对象双击,或者右键点击Findobjectinatreeview查看树状图。

可以看出HashEntry的父类为HashMap,并且点击HashEntry查看内部,

链表里的next对象为LearnModel

可以定位SparkStreaming在操作map添加LearnModel时发生了内存泄露

THE END
1.在线教育系统设计与实现需求分析功能需求分析 系统功能模块概述 在线教育系统的设计与实现以用户之间的“教-学”关系为基础展开设计。系统的学习者那端主要涉及到的功能有:用户的登录与注册,个人信息的修改,课程的展示,在线选课,在线观看视频以及对课程发表评论和问答。教学者那端主要有登录注册,个人信息的修改,课程信息的发布,查看课程评价以及用户的https://www.jianshu.com/p/b107c2565026
2.在线教育系统业务,在线教育用户的需求是什么?线上学习的普遍需求在线教育涉及的主要人群包括:学生、家长、老师、学校这四个方面,而各方的需求有相同的地方,那就是提高学生的学习成绩,但也有一些其他不同的需求。接下来我们就一起看看在线教育用户的需求。 一、老师、家长、学校、学生四方面对在线教育产品的要求 老师对在线教育的需求是减负即减少工作量,减少备课、批改作业、寻找https://blog.csdn.net/uudsjhd/article/details/136207445
3.在线教育系统需求分析文档因为知识付费的火爆,现在越来越多的人开始在在线教育平台上做付费课程。因为原创不容易,导致很多付费课程的内容重复。但是想要使付费课程被更多人关注,必然要进行原创才行。 参考图片 一、明确课程受众 首先针对自己研究的领域、专业、实践,去分析哪些人群需要这些知识。比如人像摄影技巧的受众是喜欢摄影的人,喜欢自拍的https://www.bilibili.com/read/cv38958394
4.2024年在线教育市场规模及发展趋势深度剖析个性化教学:大数据与人工智能技术的融合,将推动在线教育向个性化教学迈进。通过精准分析学习者数据,平台可提供定制化的课程内容,实现因材施教。 线上线下融合:结合线上教育的便捷性与线下教育的个性化辅导优势,线上线下融合将成为未来教育的新趋势,满足学习者多样化的学习需求。 https://www.qyresearch.com.cn/news/18714/online-education
5.在线教育平台开发需求分析51CTO博客在线教育平台开发需求分析 随着互联网的不断发展,现代信息技术向教育行业迅速扩展,教育的方式也不再是传统面对面的书本传授了,线下模式正面临着前所未有的严峻挑战,同时也面临着前良好的发展机遇—在线教育平台开发需求旺盛。它打破了传统教育的局限,在高等教育、继续教育、在职教育乃至终身教育中发挥前所未有的作用。https://blog.51cto.com/u_16074861/6262033
6.2025年在线游戏教育娱乐行业全球市场需求分析及前景预测报告在线游戏教育娱乐行业报告首先梳理了行业市场特征、宏观环境对市场整体和上下游产业的影响、市场环境变化,还对行业SWOT(优势、劣势、机遇、挑战)进行分析,随后从整体市场和细分市场(类型、应用、地区)出发,分析了市场规模、相关影响因素、主要潜力市场、竞争格局及其演变方向、重点企业发展现状和发展趋势。区域层面,报告将全https://www.gelonghui.com/p/1487636
7.线上教育用户行为及其需求调查分析历史项目免费模板线上教育用户行为及其需求调查分析感谢您能抽出几分钟时间来参加本次答题,现在我们就马上开始吧! 您所在的年级是 本科生 硕士研究生 博士研究生 您在线上教育中主要学习哪些内容?(多选题) 自己学校开展的专业相关课程 其他高校组织的专业领域课程 专业相关的学术交流会议、报告 各教育平台提供的专业课程 https://www.wenjuan.com/m/slp/mYZBzur/
8.我国在线教育行业的需求及发展趋势分析.docx教育我国在线教育行业的需求及发展趋势分析.docx 18页内容提供方:黄橙文化 大小:1.1 MB 字数:约1.86万字 发布时间:2022-08-07发布于广东 浏览人气:66 下载次数:仅上传者可见 收藏次数:0 需要金币:*** 金币 (10金币=人民币1元)我国在线教育行业的需求及发展趋势分析.docx 关闭预览 想预览更多内容https://max.book118.com/html/2022/0802/5010320313004314.shtm
9.在线教育平台必修课——用户人群解析爱上学习无论是投资在线教育还是在线教育领域创业,都需要对在线教育领域进行调研和分析。用户是在线教育最大的金主,让用户对你的产品熟知并接纳,让用户成为实体广告为你在线教育平台手口相传?这就要对在线教育的用户进行分析,对用户知己知彼才能百战共赢,既满足用户学习的需求,也能实现在线教育的盈利变现。 http://www.ishangstudy.com/news/detail/id/63.html
10.深圳在线教育行业发展现状和前景(工资待遇人才需求发展趋势硕士人才需求分析 ? 2024年较2023年 2.463% 占深圳在线教育行业 历年招聘职位量占比 说明:深圳在线教育行业硕士发展前景怎么样?有前途吗?好找工作吗?2024年硕士招聘职位183个,占深圳在线教育行业2.463%,曲线越向上代表市场需求量越大,就业情况相对较好。数据由各地招聘网站统计而来,仅检索职位名称。 https://www.jobui.com/salary/shenzhen-all/ind-zaixianjiaoyu/
11.8000字干货!在线启蒙教育的产品设计公式(附竞品分析模型工具)娱乐是一个兴奋性需求,作为互联网产品的深度习惯用户,如果能让孩子们一边学习一边娱乐,通过劳逸结合的形式帮助小朋友成长学习,这会非常棒。 4. 小结 从用户动机、痛点和需求分析不难发现,互联网的高速发展是催生在线英语启蒙教育的关键,其次 80、90 后这个核心用户群是互联网当下的主力军,他们消费力强,决策力果敢,https://www.uisdc.com/online-primer-design/
12.K12在线教育行业发展SWOT分析本文采用SWOT分析方法针对K12在线教育行业当前内部的优势与劣势、外部的机会与威胁进行综合性系统分析,并据此预测行业未来发展趋势,提出相关发展策略。 在线教育是一种以网络为知识传播媒介的教育模式,区别于传统教育的面对面教学形式。中国的在线教育于20世纪90年代起步,近几年伴随着互联网的发展突然开始了快速崛起。 https://xinjiang.sqqingxi.com/18984.html
13.教育APP开发:针对教育的app产品需求分析—上海艾艺教育APP开发、教育小程序开发、教育网站开发、教育管理系统开发,欢迎随时咨询上海软件开发公司「艾艺」,艾艺拥有专业的“数字教育事业部”,凭借15年行业经验,为客户量身定制教育平台,提供在线教育平台搭建+课程开发等一站式解决方案和服务,咨询热线:17702199087(同微信),发送您的需求,可获取您专属的解决方案和报价。https://www.adinnet.cn/bloginfo/2021_05/blog_5665.html
14.知识付费在线教育小程序源码搭建知识付费在线教育模式的开发是一个涉及教育内容、技术平台、用户体验和市场运营等多个方面的综合性项目。以下是一个详细的开发指南: 一、市场调研与需求分析 明确目标用户: 确定对在线教育有需求的用户群体,如学生、职场人士、兴趣爱好者等。 分析用户的学习习惯、偏好以及支付意愿。 了解用户需求: 通过问卷调查、用户https://m.11467.com/blog/d9189690.htm
15.在线教育平台教培机构应该如何营销?随着互联网技术的迅猛发展,在线教育平台日益成为教培机构的重要渠道。为了在激烈的市场竞争中脱颖而出,教培机构需要制定科学有效的营销策略。 一、精准定位与品牌塑造 1.市场调研与需求分析 在制定营销策略前,教培机构需要进行详细的市场调研,了解目标市场的需求和痛点。这包括: https://m.grazy.cn/article/21597.htm
16.产品分析洋葱数学,有趣有内涵的在线教育内容产品人人都是商业模式:主要通过售卖教材课程及专题课程内容;另有一些教辅书籍在线下售卖。 1.3 用户及需求分析 1.3.1 地域及人群分布 根据百度指数2019年4月的数据显示:洋葱数学使用者中女性稍多于男性;注册用户年龄在30~39间的家长及教师人群比较多;在传统的教育大省中,用户人数多。 https://www.woshipm.com/evaluating/2218007.html
17.系统架构师入门指南:从零开始学习系统架构设计在线教育平台的系统架构设计需要考虑到用户管理、课程管理、学习记录、考试评测等多个方面。这里我们以一个简单的在线教育平台为例,展示系统架构设计的过程。 需求分析: 在线教育平台需要支持用户注册、登录、课程浏览、学习记录、考试评测等功能。 架构设计: 我们可以采用事件驱动架构来实现这个在线教育平台,通过事件的传播https://www.imooc.com/article/361175
18.产品分析:为何斑马英语成吸金怪兽?二、幼儿启蒙在线教育行业应用现状 2.1在线幼儿教育产品类型 随着市场得火热,赛道不断用各种新姿势步入新玩家,有些是利用原有母婴优势入场,有些是面对线下幼儿园B端用户等等,本次重点会对2C-内容类产品重点分析。下面将会根据目前市场上的用户下载量,融资程度等筛选出几款产品进行对比分析。 https://maimai.cn/article/detail?fid=1412879648&efid=zblQwE8w7plsQQLXOcx_Nw