大数据项目实战之在线教育(04实时实现)十一vs十一

第1章项目需求及架构设计1.1项目需求分析一、数据采集平台搭建二、Kafka、Zookeeper中间件准备三、下游SparkStreaming对接Kafka接收数据,实现vip个数统计、栏目打标签功能、做题正确率与掌握度的实时计算功能。

1.2项目框架1.2.1技术选型一、数据存储:Kafka、MySql二、数据处理:Spark三、其他组件:Zookeeper1.2.2流程设计

第2章需求2.0原始数据格式及对应topic2.0.1实时统计注册人数kafka对应topic:register_topic数据格式:

数据格式

{"app_id":"1","device_id":"102","distinct_id":"5fa401c8-dd45-4425-b8c6-700f9f74c532","event_name":"-","ip":"121.76.152.135","last_event_name":"-","last_page_id":"0","next_event_name":"-","next_page_id":"2","page_id":"1","server_time":"-","uid":"245494"}

uid:用户idapp_id:平台iddeviceid:平台iddisinct_id:唯一标识Ip:用户ip地址last_page_id:上一页面idpage_id:当前页面id0:首页1:商品课程页2:订单页面3:支付页面next_page_id:下一页面id2.0.4实时统计学员播放视频各时长Kafka对应topic:course_learn数据格式:

{"biz":"bdfb58e5-d14c-45d2-91bc-1d9409800ac3","chapterid":"1","cwareid":"3","edutypeid":"3","pe":"55","ps":"41","sourceType":"APP","speed":"2","subjectid":"2","te":"1563352166417","ts":"1563352159417","uid":"235","videoid":"2"}

2.2模拟数据采集模拟数据采集将准备好的log文件使用kafka生产者代码发送信息到topic

注册日志数据register.log日志文件对应topic:register_topic

做题数据qz_log日志文件对应topic:qz_log

商品页面数据page_log日志文件对应topic:page_topic

视频播放时长数据course_learn.log日志文件对应topic:course_learn

如果windows下没有安装hadoop环境先配置环境

Ip解析工具Ip解析本地库:

2.3.实时统计注册人员信息用户使用网站或APP进行注册,后台实时收集数据传输Kafka,SparkStreaming进行对接统计,实时统计注册人数。需求1:实时统计注册人数,批次为3秒一批,使用updateStateBykey算子计算历史数据和当前批次的数据总数,仅此需求使用updateStateBykey,后续需求不使用updateStateBykey。需求2:每6秒统统计一次1分钟内的注册数据,不需要历史数据提示:reduceByKeyAndWindow算子需求3:观察对接数据,尝试进行调优。2.4实时计算学员做题算正确率与知识点掌握度mysql建表语句:

创建相应的包

配置pom.xml文件

com_atguigu_warehouseorg.apache.sparkspark-core_2.11${spark.version}org.apache.sparkspark-sql_2.11${spark.version}org.apache.sparkspark-hive_2.11${spark.version}org.scala-langscala-library

com.alibabafastjson1.2.47

org.scala-toolsmaven-scala-plugin2.15.1compile-scalaadd-sourcecompiletest-compile-scalaadd-sourcetestCompileorg.apache.maven.pluginsmaven-assembly-pluginjar-with-dependencies

第5章需求实现5.1创建MySql配置文件在resource源码包下创建comerce.peoperties

jdbc.url=jdbc:mysql://hadoop102:3306/course_learnuseUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai&useSSL=falsejdbc.user=rootjdbc.password=000000

5.2创建读取配置文件的工具类在com.atguigu.qzpoint.util创建ConfigurationManager类packagecom.atguigu.qzpoint.util;

importjava.io.InputStream;importjava.util.Properties;

/****读取配置文件工具类*/publicclassConfigurationManager{

privatestaticPropertiesprop=newProperties();

static{try{InputStreaminputStream=ConfigurationManager.class.getClassLoader().getResourceAsStream("comerce.properties");prop.load(inputStream);}catch(Exceptione){e.printStackTrace();}}

//获取配置项publicstaticStringgetProperty(Stringkey){returnprop.getProperty(key);}

//获取布尔类型的配置项publicstaticbooleangetBoolean(Stringkey){Stringvalue=prop.getProperty(key);try{returnBoolean.valueOf(value);}catch(Exceptione){e.printStackTrace();}returnfalse;}

}5.3创建Json解析工具类在com.atguigu.qz.point.util创建ParseJsonData类packagecom.atguigu.qzpoint.util;

importcom.alibaba.fastjson.JSONObject;

publicclassParseJsonData{

publicstaticJSONObjectgetJsonData(Stringdata){try{returnJSONObject.parseObject(data);}catch(Exceptione){returnnull;}}}

5.4创建Druid连接池在com.atgugiu.qzpoint.util创建DataSourceUtil类packagecom.atguigu.qzpoint.util;

importcom.alibaba.druid.pool.DruidDataSourceFactory;

importjavax.sql.DataSource;importjava.io.Serializable;importjava.sql.*;importjava.util.Properties;

/***德鲁伊连接池*/publicclassDataSourceUtilimplementsSerializable{publicstaticDataSourcedataSource=null;

//提供获取连接的方法publicstaticConnectiongetConnection()throwsSQLException{returndataSource.getConnection();}

//提供关闭资源的方法【connection是归还到连接池】//提供关闭资源的方法【方法重载】3dqlpublicstaticvoidcloseResource(ResultSetresultSet,PreparedStatementpreparedStatement,Connectionconnection){//关闭结果集//ctrl+alt+m将java语句抽取成方法closeResultSet(resultSet);//关闭语句执行者closePrepareStatement(preparedStatement);//关闭连接closeConnection(connection);}

privatestaticvoidcloseConnection(Connectionconnection){if(connection!=null){try{connection.close();}catch(SQLExceptione){e.printStackTrace();}}}

privatestaticvoidclosePrepareStatement(PreparedStatementpreparedStatement){if(preparedStatement!=null){try{preparedStatement.close();}catch(SQLExceptione){e.printStackTrace();}}}

privatestaticvoidcloseResultSet(ResultSetresultSet){if(resultSet!=null){try{resultSet.close();}catch(SQLExceptione){e.printStackTrace();}}}}

5.5创建操作MySql的代理类在com.atguigu.qzpoint.util创建SqlProxy类packagecom.atguigu.qzpoint.util

importjava.sql.{Connection,PreparedStatement,ResultSet}

traitQueryCallback{defprocess(rs:ResultSet)}

classSqlProxy{privatevarrs:ResultSet=_privatevarpsmt:PreparedStatement=_

/***执行修改语句**@paramconn*@paramsql*@paramparams*@return*/defexecuteUpdate(conn:Connection,sql:String,params:Array[Any]):Int={varrtn=0try{psmt=conn.prepareStatement(sql)if(params!=null&¶ms.length>0){for(i<-0untilparams.length){psmt.setObject(i+1,params(i))}}rtn=psmt.executeUpdate()}catch{casee:Exception=>e.printStackTrace()}rtn}

/***执行查询语句*执行查询语句**@paramconn*@paramsql*@paramparams*@return*/defexecuteQuery(conn:Connection,sql:String,params:Array[Any],queryCallback:QueryCallback)={rs=nulltry{psmt=conn.prepareStatement(sql)if(params!=null&¶ms.length>0){for(i<-0untilparams.length){psmt.setObject(i+1,params(i))}}rs=psmt.executeQuery()queryCallback.process(rs)}catch{casee:Exception=>e.printStackTrace()}}

defshutdown(conn:Connection):Unit=DataSourceUtil.closeResource(rs,psmt,conn)}

5.6实时统计注册人数代码实现packagecom.atguigu.qzpoint.streaming

importjava.langimportjava.sql.ResultSetimportjava.util.Random

importcom.atguigu.qzpoint.util.{DataSourceUtil,QueryCallback,SqlProxy}importorg.apache.kafka.clients.consumer.ConsumerRecordimportorg.apache.kafka.common.TopicPartitionimportorg.apache.kafka.common.serialization.StringDeserializerimportorg.apache.spark.SparkConfimportorg.apache.spark.streaming.dstream.InputDStreamimportorg.apache.spark.streaming.kafka010._importorg.apache.spark.streaming.{Seconds,StreamingContext}

importscala.collection.mutable

objectRegisterStreaming{privatevalgroupid="register_group_test"

//valdsStream=stream.filter(item=>item.value().split("\t").length==3)//.mapPartitions(partitions=>//partitions.map(item=>{//valrand=newRandom()//valline=item.value()//valarr=line.split("\t")//valapp_id=arr(1)//(rand.nextInt(3)+"_"+app_id,1)//}))//dsStream.print()//vala=dsStream.reduceByKey(_+_)//a.print()//a.map(item=>{//valappid=item._1.split("_")(1)//(appid,item._2)//}).reduceByKey(_+_).print()

//处理完业务逻辑后手动提交offset维护到本地mysql中stream.foreachRDD(rdd=>{valsqlProxy=newSqlProxy()valclient=DataSourceUtil.getConnectiontry{valoffsetRanges:Array[OffsetRange]=rdd.asInstanceOf[HasOffsetRanges].offsetRangesfor(or<-offsetRanges){sqlProxy.executeUpdate(client,"replaceinto`offset_manager`(groupid,topic,`partition`,untilOffset)values(,,,)",Array(groupid,or.topic,or.partition.toString,or.untilOffset))}}catch{casee:Exception=>e.printStackTrace()}finally{sqlProxy.shutdown(client)}})ssc.start()ssc.awaitTermination()}

}

5.7实时统计学员做题正确率与知识点掌握度packagecom.atguigu.qzpoint.streaming

importjava.langimportjava.sql.{Connection,ResultSet}importjava.time.LocalDateTimeimportjava.time.format.DateTimeFormatter

/***知识点掌握度实时统计*/objectQzPointStreaming{

privatevalgroupid="qz_point_group"

}}

5.8实时统计商品页到订单页,订单页到支付页转换率packagecom.atguigu.qzpoint.streaming

importjava.langimportjava.sql.{Connection,ResultSet}importjava.text.NumberFormat

importcom.atguigu.qzpoint.util.{DataSourceUtil,ParseJsonData,QueryCallback,SqlProxy}importorg.apache.kafka.clients.consumer.ConsumerRecordimportorg.apache.kafka.common.TopicPartitionimportorg.apache.kafka.common.serialization.StringDeserializerimportorg.apache.spark.streaming.dstream.InputDStreamimportorg.apache.spark.streaming.kafka010._importorg.apache.spark.streaming.{Seconds,StreamingContext}importorg.apache.spark.{SparkConf,SparkFiles}importorg.lionsoul.ip2region.{DbConfig,DbSearcher}

importscala.collection.mutableimportscala.collection.mutable.ArrayBuffer

/***页面转换率实时统计*/objectPageStreaming{privatevalgroupid="vip_count_groupid"

ssc.sparkContext.addFile(this.getClass.getResource("/ip2region.db").getPath)//广播文件valipDStream=dsStream.mapPartitions(patitions=>{valdbFile=SparkFiles.get("ip2region.db")valipsearch=newDbSearcher(newDbConfig(),dbFile)patitions.map{item=>valip=item._4valprovince=ipsearch.memorySearch(ip).getRegion().split("\\|")(2)//获取ip详情中国|0|上海|上海市|有线通(province,1l)//根据省份统计点击个数}}).reduceByKey(_+_)

//计算转换率//处理完业务逻辑后手动提交offset维护到本地mysql中stream.foreachRDD(rdd=>{valsqlProxy=newSqlProxy()valclient=DataSourceUtil.getConnectiontry{calcJumRate(sqlProxy,client)//计算转换率valoffsetRanges:Array[OffsetRange]=rdd.asInstanceOf[HasOffsetRanges].offsetRangesfor(or<-offsetRanges){sqlProxy.executeUpdate(client,"replaceinto`offset_manager`(groupid,topic,`partition`,untilOffset)values(,,,)",Array(groupid,or.topic,or.partition.toString,or.untilOffset))}}catch{casee:Exception=>e.printStackTrace()}finally{sqlProxy.shutdown(client)}})ssc.start()ssc.awaitTermination()}

/***计算页面跳转个数**@paramsqlProxy*@paramitem*@paramclient*/defcalcPageJumpCount(sqlProxy:SqlProxy,item:(String,Int),client:Connection):Unit={valkeys=item._1.split("_")varnum:Long=item._2valpage_id=keys(1).toInt//获取当前page_idvallast_page_id=keys(0).toInt//获取上一page_idvalnext_page_id=keys(2).toInt//获取下页面page_id//查询当前page_id的历史num个数sqlProxy.executeQuery(client,"selectnumfrompage_jump_ratewherepage_id=",Array(page_id),newQueryCallback{overridedefprocess(rs:ResultSet):Unit={while(rs.next()){num+=rs.getLong(1)}rs.close()}

//对num进行修改并且判断当前page_id是否为首页if(page_id==1){sqlProxy.executeUpdate(client,"insertintopage_jump_rate(last_page_id,page_id,next_page_id,num,jump_rate)"+"values(,,,,)onduplicatekeyupdatenum=num+",Array(last_page_id,page_id,next_page_id,num,"100%",num))}else{sqlProxy.executeUpdate(client,"insertintopage_jump_rate(last_page_id,page_id,next_page_id,num)"+"values(,,,)onduplicatekeyupdatenum=num+",Array(last_page_id,page_id,next_page_id,num,num))}})}

5.9实时统计学员播放视频各时长packagecom.atguigu.qzpoint.streaming

importjava.langimportjava.sql.{Connection,ResultSet}

importcom.atguigu.qzpoint.bean.LearnModelimportcom.atguigu.qzpoint.util.{DataSourceUtil,ParseJsonData,QueryCallback,SqlProxy}importorg.apache.kafka.clients.consumer.ConsumerRecordimportorg.apache.kafka.common.TopicPartitionimportorg.apache.kafka.common.serialization.StringDeserializerimportorg.apache.spark.SparkConfimportorg.apache.spark.streaming.dstream.InputDStreamimportorg.apache.spark.streaming.kafka010._importorg.apache.spark.streaming.{Seconds,StreamingContext}

objectCourseLearnStreaming{privatevalgroupid="course_learn_test1"

defmain(args:Array[String]):Unit={valconf=newSparkConf().setAppName(this.getClass.getSimpleName).set("spark.streaming.kafka.maxRatePerPartition","30").set("spark.streaming.stopGracefullyOnShutdown","true")

//解析json数据valdsStream=stream.mapPartitions(partitions=>{partitions.map(item=>{valjson=item.value()valjsonObject=ParseJsonData.getJsonData(json)valuserId=jsonObject.getIntValue("uid")valcwareid=jsonObject.getIntValue("cwareid")valvideoId=jsonObject.getIntValue("videoid")valchapterId=jsonObject.getIntValue("chapterid")valedutypeId=jsonObject.getIntValue("edutypeid")valsubjectId=jsonObject.getIntValue("subjectid")valsourceType=jsonObject.getString("sourceType")valspeed=jsonObject.getIntValue("speed")valts=jsonObject.getLong("ts")valte=jsonObject.getLong("te")valps=jsonObject.getIntValue("ps")valpe=jsonObject.getIntValue("pe")LearnModel(userId,cwareid,videoId,chapterId,edutypeId,subjectId,sourceType,speed,ts,te,ps,pe)})})

dsStream.foreachRDD(rdd=>{rdd.cache()//统计播放视频有效时长完成时长总时长rdd.groupBy(item=>item.userId+"_"+item.cwareId+"_"+item.videoId).foreachPartition(partitoins=>{valsqlProxy=newSqlProxy()valclient=DataSourceUtil.getConnectiontry{partitoins.foreach{case(key,iters)=>calcVideoTime(key,iters,sqlProxy,client)//计算视频时长}}catch{casee:Exception=>e.printStackTrace()}finally{sqlProxy.shutdown(client)}})//统计章节下视频播放总时长rdd.mapPartitions(partitions=>{partitions.map(item=>{valtotaltime=Math.ceil((item.te-item.ts)/1000).toLongvalkey=item.chapterId(key,totaltime)})}).reduceByKey(_+_).foreachPartition(partitoins=>{valsqlProxy=newSqlProxy()valclient=DataSourceUtil.getConnectiontry{partitoins.foreach(item=>{sqlProxy.executeUpdate(client,"insertintochapter_learn_detail(chapterid,totaltime)values(,)onduplicatekey"+"updatetotaltime=totaltime+",Array(item._1,item._2,item._2))})}catch{casee:Exception=>e.printStackTrace()}finally{sqlProxy.shutdown(client)}})

//统计课件下的总播放时长rdd.mapPartitions(partitions=>{partitions.map(item=>{valtotaltime=Math.ceil((item.te-item.ts)/1000).toLongvalkey=item.cwareId(key,totaltime)})}).reduceByKey(_+_).foreachPartition(partitions=>{valsqlProxy=newSqlProxy()valclient=DataSourceUtil.getConnectiontry{partitions.foreach(item=>{sqlProxy.executeUpdate(client,"insertintocwareid_learn_detail(cwareid,totaltime)values(,)onduplicatekey"+"updatetotaltime=totaltime+",Array(item._1,item._2,item._2))})}catch{casee:Exception=>e.printStackTrace()}finally{sqlProxy.shutdown(client)}})

//统计辅导下的总播放时长rdd.mapPartitions(partitions=>{partitions.map(item=>{valtotaltime=Math.ceil((item.te-item.ts)/1000).toLongvalkey=item.edutypeId(key,totaltime)})}).reduceByKey(_+_).foreachPartition(partitions=>{valsqlProxy=newSqlProxy()valclient=DataSourceUtil.getConnectiontry{partitions.foreach(item=>{sqlProxy.executeUpdate(client,"insertintoedutype_learn_detail(edutypeid,totaltime)values(,)onduplicatekey"+"updatetotaltime=totaltime+",Array(item._1,item._2,item._2))})}catch{casee:Exception=>e.printStackTrace()}finally{sqlProxy.shutdown(client)}})

})//计算转换率//处理完业务逻辑后手动提交offset维护到本地mysql中stream.foreachRDD(rdd=>{valsqlProxy=newSqlProxy()valclient=DataSourceUtil.getConnectiontry{valoffsetRanges:Array[OffsetRange]=rdd.asInstanceOf[HasOffsetRanges].offsetRangesfor(or<-offsetRanges){sqlProxy.executeUpdate(client,"replaceinto`offset_manager`(groupid,topic,`partition`,untilOffset)values(,,,)",Array(groupid,or.topic,or.partition.toString,or.untilOffset))}}catch{casee:Exception=>e.printStackTrace()}finally{sqlProxy.shutdown(client)}})ssc.start()ssc.awaitTermination()}

第6章总结与调优6.1保证SparkStreaming第一次启动不丢数据在kafka的参数auto.offset.rest设定为earlist,保证SparkStreaming第一次启动从kafka最早偏移量开始拉取数据

6.2SparkStreaming手动维护偏移量

在SparkStreaming下有三种消费模式的定义最多一次、至少一次、恰好一次那么最好是无限接近恰好一次。要实现恰好一次偏移量必须手动维护,因为自动提交会在SparkStreaming刚运行时就立马提交offset,如果这个时候SparkStreaming消费信息失败了,那么offset也就错误提交了。所以必须保证:1.手动维护偏移量2.处理完业务数据后再提交offset手动维护偏移量需设置kafka参数enable.auto.commit改为false

手动维护提交offset有两种选择:1.处理完业务数据后手动提交到Kafka2.处理完业务数据后手动提交到本地库如MySql、HBase

1、先来看如何提交到kafka官网所示:

stream.foreachRdd后根据每个rdd先转换成HashOffsetRanges对象通过.offsetRanges方法获取到偏移量对象,再通过commitAsync方法将偏移量提交。

2、维护到本地MySql如项目所示:Driver端需先去判断Msql库中是否存在偏移量,如果存在偏移量则从MySql中获取到当前topic对应的最新offset大小,如果MySql不存在则从kafka中获取

消费到数据后,进行业务处理处理完后需将offset最新值保存到MySql

那么如果有面试官提问如何保证数据恰好一次性消费回答到这两点一般就可以了,手动维护便宜量和先处理完业务数据再提交offset。但是处理业务数据和提交offset并非同一事物,在极端情况下如提交offset时断网断电还是会导致offset没有提交并且业务数据已处理完的情况。

那么保证事物就需要将并行度调成1或者将数据collect到driver端,再进行数据业务处理和提交offset,但这样还会导致并行度变成1很可能导致处理速度跟不上,所以大数据情况下一般不考虑事物。

6.3updateStateByKey算子与checkpoint

updateStateBykey算子根据官网描述,是返回一个新的“状态”的DStream的算子,其通过在键的先前状态和键的新值上应用给定函数更新每个键的状态。

具体写法:根据历史状态值,和当前批次的数据状态值的累加操作得出一个最新的结果。如项目中代码:

那么使用updateStateByBykey算子,必须使用SparkStreaming的checkpoint来维护历史状态数据

那么看下Hdfs上路径下的文件

存在小文件且小文件个数不可控,所以在真实企业生产环境上并不会使用checkpoint操作,也不会使用基于checkpoint的算子如updateStateBykey算子

那么如何代替updateStateBykey这种基于历史数据状态的操作的算子呢:在进行相应操作时,可以去库中查询出历史数据,再与当前数据进行操作得出最新结果集,将结果集再刷新到本地库中。

6.4计算SparkStreaming一秒钟拉取多少条数据在企业中往往会根据业务的实时性来定制一秒钟消费数据量的条数,来达到实时性,那么通过什么参数来设置SparkStreaming从kafka的拉取的条数呢。

根据官网描述,可以设置spark.streaming.kafka.maxRatePerPartition参数来设置SparkStreaming从kafka分区每秒拉取的条数

那么在项目中如实时统计学员做题正确率与知识点掌握度需求中,需要每秒100处理速度,针对此需求topic为qz_log分区为10,那么通过此参数设定10即可,每个分区没秒10条数据。一秒处理100条数据,当前批次为3秒一次,一批处理300条数据.

6.5SparkStreaming背压机制

6.6一个stage的耗时由什么决定

由上图可以看出一个stage由最慢的task耗时决定。6.7SparkStreaming优雅关闭提交SparkStreaming任务到yarn后,当需要停止程序时使用yarnapplication-killapplication_id命令来关闭SparkStreaming,那么操作此命令时需要保证数据不丢失,需要设置spark.streaming.stopGracefullOnShutdown参数为ture

当设置此参数后,SparkStreaming程序在接收到kill命令时,不会立马结束程序,Spark会在JVM关闭时正常关闭SparkStreaming,而不是是立马关闭,即保证当前数据处理完后再关闭。

6.8SparkStreaming默认分区数SparkStreaming默认并行度与所对应kafkatopic创建时的分区数所对应,比如项目中topic的分区都是10,SparkStreaming的默认分区就为10,且在真实开发环境中SparkStreaming一般不会去使用repartition增大分区操作,因为会进行shuffle耗时。第7章打包、spark-submit命令

spark-submit--masteryarn--deploy-modeclient--driver-memory1g--num-executors2--executor-cores2--executor-memory2g--classcom.atguigu.qzpoint.streaming.CourseLearnStreamingcom_atguigu_sparkstreaming-1.0-SNAPSHOT-jar-with-dependencies.jar

spark-submit--masteryarn--deploy-modeclient--driver-memory1g--num-executors2--executor-cores2--executor-memory2g--classcom.atguigu.qzpoint.streaming.PageStreamingcom_atguigu_sparkstreaming-1.0-SNAPSHOT-jar-with-dependencies.jar

spark-submit--masteryarn--deploy-modeclient--driver-memory1g--num-executors2--executor-cores2--executor-memory2g--classcom.atguigu.qzpoint.streaming.QzPointStreamingcom_atguigu_sparkstreaming-1.0-SNAPSHOT-jar-with-dependencies.jar

spark-submit--masteryarn--deploy-modeclient--driver-memory1g--num-executors2--executor-cores2--executor-memory2g--classcom.atguigu.qzpoint.streaming.RegisterStreamingcom_atguigu_sparkstreaming-1.0-SNAPSHOT-jar-with-dependencies.jar

第8章常见问题8.1jar包冲突问题

根据官网描述spark-streaming-kafka-0-10_2.11jar包中包含kafka-clients客户端jar包不需要再次添加kafka客户端jar包,如果再次添加kafka客户端jar包可能会引起版本冲突8.2无法序列化问题和正确操作数据库连接

对于开发者人员刚开始接手SparkStreaming时往往会错误的使用数据库连接,如上述官网描述对于connection获取,代码写在了foreachRDD内rdd.foreach上,那么这样操作实际是在driver端创建到connection,然后rdd.foreacher操作为分布式节点操作,connection.send方法实际发生在了各个executor节点上,这个时候就涉及到了driver端对象到executor端的一个网络传输问题,这个时候spark会发生错误,会报一个org.apache.spark.SparkException:Tasknotserializable这样一个任务无法序列化的错,在Spark中遇到此错误一般都是错误的将driver端对象在executor端使用造成的。那么创建connection操作必须在executor端执行

如官网描述,在rdd.foreach里创建connection,这样虽然不会发生错误,但是这样循环的粒度是针对每条数据,每循环一条数据都会创建一个连接,这样会造成资源浪费。图1和图2都是错误展示最后,正确的使用数据库连接,循环粒度是分区,在每个分区下创建一个数据库连接,循环分区下的数据每条数据使用当前分区下的数据库连接,当使用完毕后归还的连接池中。所以在SparkStreaming开发中需养成良好习惯:dstream.foreachRdd{rdd=>{rdd.foreachPartition{partitions=>{//循环分区//创建connectionpartitions.foreach(record=>{//业务处理使用当前connection}}//归还连接}}循环粒度foreachRdd=>foreachPartition=>foreach8.3SparkStreaming操作数据库时线程安全问题在SparkStreaming中,采用查询本地库的历史数据和当前批次数据的计算来代替需要基于hdfs的算子updatestatebykey,那么在查询和重新刷入本地库的时候处理不当会造成线程安全问题,数据不准的问题。

那么在查询本地库时需要进行一次预聚合操作,将相同key的数据落到一个分区,保证同一个key的数据指挥操作数据库一次,预聚合操作有reducebykey、groupbykey、groupby等算子。如项目所写:

题库更新操作时需要查询MySql本地库的历史数据,在查询本地库钱先进行了groupby操作将相同符合条件的业务数据聚合到了一个分区,保证相同用户下同一课程同一知识点的数据在当前批次下只会去查询一次MySql数据库并且一次覆盖。8.4数据倾斜问题数据倾斜为在shuffle过程中,必须将各个节点上相同的key的数据拉取到某节点的一个task来进行,此时如果某个key对应的数据量特别大的话,就会发生数据倾,某个task耗时非常大,那么一个stage的耗时由最慢的task决定,从而导致整个SparkStreaming任务运行非常缓慢。以reducebykey为例:

这张图就是发生了数据倾斜,那么解决方案最有效的为两阶段聚合,先打散key聚合一次,再还原key聚合一次。

具体代码展示:

对DStream进行map操作对原始key前加上随机值,map完后进行第一次reducebykey操作,此结果为打散key后的reducebykey结果,再次进行map操作根据分隔符,去掉随机数保留原有key,map后再进行reducebykey,保证相同key的数据准确累加。

8.5SparkStreaming消费多topic在真实环境中往往会有许多业务场景非常类似,比如打标签、监控某指标,可能代码逻辑都一样只有某个取值不一样,这个时候一个SparkStreaming就可以监控多个topic,然后根据topic的名称来进行不同的业务处理,就不需要开发多个SparkStreaming程序了。查看kafkaUtils.createDirectStream方法

可以发现topic参数可以是个多个值,也就是createDirectStream方法支持多个topic。

通过kafkaUtils.createDirectStream方法获取到DStream,这个DStream流的类型为InputDStream[ConsumerRecord[String,String]],那么在可以通过调用map方法,ConsumerRecord的topic方法来获取对应的topic名称

获取到topic名称后value数据后,就可以在后续操作里根据判断topic名称来处理不同的业务。

点击下载内存泄露分析工具下载下来是一个jar包

那么需要编写bat批处理来运行创建run.bat

编辑titleibm-heap-analyzer

path=%PATH%;%C:\JAVA\jdk1.8.0_51\bin

E:

cdE:\IBMheapAnalyzer\IBM_DUMP_wjfx

java.exe-Xms1048M-Xmx4096M-jarha456.jar

路径需要改成自己当前路径点击run.bat运行

运行成功8.6.2模拟内存泄露场景内存泄露的原因往往是因为对象无法释放或被回收造成,那么在本项目中就模拟此场景。

如上图所示,在计算学员知识点正确率与掌握度代码中,在最后提交offset提交偏移量后,循环往map里添加LearnMode对象,使每处理一批数据就往map里添加100000个LearnMode对象,使堆内存撑满。

8.6.3查找driver进程在集群上提交sparkstreaming任务ps-ef|grepcom.atguigu.qzpoint.streaming.QzPointStreaming通过此命令查找到driver进程号

进程号为6860

8.6.4JMAP命令使用jmap-heappid命令查看6860进程,内存使用情况。jmap-heap6860

发现新生代和老年代内存占满,有对象无法被销毁或回收。再通过jmap-histopid命令查看对象的内存情况。

jmap-histo6860>a.log

jmap-dump:live,format=b,file=dump.log6860

将dump从集群下载下来,打开IBMHeapAnalyzer进行分析

从饼状图可以看出,堆内存中存在大量HashEntry类点击Analysis分析查看各个对象内存的泄露大小和总大小。

选中最大的分析对象双击,或者右键点击Findobjectinatreeview查看树状图。

可以看出HashEntry的父类为HashMap,并且点击HashEntry查看内部,

链表里的next对象为LearnModel

可以定位SparkStreaming在操作map添加LearnModel时发生了内存泄露

THE END
1.在线教育系统好做吗现在综上所述,建设一套高效运作且具有良好口碑的在线教育平台绝非一蹴而就之事,它需要综合运用优质内容建设、先进技术保障以及精细化运营指导三重策略进行有机结合。而在这一过程中引入成熟完善的知识付费在线教育系统,将对企业的整体发展带来积极影响。此类平台不仅能让授课人更便捷高效地进行授课,而且也能让广大学员享受更https://blog.csdn.net/yezi87s/article/details/144415714
2.在线教育平台必修课——用户人群解析爱上学习无论是投资在线教育还是在线教育领域创业,都需要对在线教育领域进行调研和分析。用户是在线教育最大的金主,让用户对你的产品熟知并接纳,让用户成为实体广告为你在线教育平台手口相传?这就要对在线教育的用户进行分析,对用户知己知彼才能百战共赢,既满足用户学习的需求,也能实现在线教育的盈利变现。 http://www.ishangstudy.com/news/detail/id/63.html
3.后疫情时代大学生在线教育需求变迁研究本文针对在线教育产业的“痛点”,通过对武汉市大学生进行问卷调查获取原始数据,基于马斯洛需求层次理论分析新冠疫情发生前后大学生对在线教育产品的需求变迁,并运用技术接受模型(TAM)来探究影响大学生对在线教育产品需求意愿的因素,最终提出个人、企业、政府应共同助力5G时代在线教育发展、加强对在线教育平台监管,进一步提升https://www.fx361.com/page/2021/1203/9159388.shtml
4.在线教育平台教培机构应该如何营销?随着互联网技术的迅猛发展,在线教育平台日益成为教培机构的重要渠道。为了在激烈的市场竞争中脱颖而出,教培机构需要制定科学有效的营销策略。 一、精准定位与品牌塑造 1.市场调研与需求分析 在制定营销策略前,教培机构需要进行详细的市场调研,了解目标市场的需求和痛点。这包括: https://m.grazy.cn/article/21597.htm
5.基于springboot的在线教育平台的设计与实现文献综述怎么写基于Spring Boot的在线教育平台的设计与实现文献综述可以按照以下步骤进行写作:引言部分:介绍在线教育的背景和重要性,以及Spring Boot作为开发框架的优势和应用范围。相关技术综述:概述与在线教育平台设计相关的技术,如Java语言、Spring Boot框架、数据库等,并对每个技术进行简要介绍。在线教育平台需求分析:https://wen.baidu.com/question/1393361682051798580.html
6.在线教育培训平台中数据分析模块有什么优势?现在,企业的竞争力不仅依赖于产品的质量和服务的效率,更在于员工的素质与技能的提升。因为传统的培训模式往往受到时间和空间的限制,难以满足企业快速发展的需求,都已经慢慢转移到线上进行。在线教育培训平台的每一个功能模块都有其作用,其中比较容易被忽视的就是数据分析。 https://www.xiaoe-tech.com/extendRead/4061.html
7.我国在线教育行业的需求及发展趋势分析.docx教育我国在线教育行业的需求及发展趋势分析.docx 18页内容提供方:黄橙文化 大小:1.1 MB 字数:约1.86万字 发布时间:2022-08-07发布于广东 浏览人气:66 下载次数:仅上传者可见 收藏次数:0 需要金币:*** 金币 (10金币=人民币1元)我国在线教育行业的需求及发展趋势分析.docx 关闭预览 想预览更多内容https://max.book118.com/html/2022/0802/5010320313004314.shtm
8.知识付费在线教育小程序源码搭建知识付费在线教育模式的开发是一个涉及教育内容、技术平台、用户体验和市场运营等多个方面的综合性项目。以下是一个详细的开发指南: 一、市场调研与需求分析 明确目标用户: 确定对在线教育有需求的用户群体,如学生、职场人士、兴趣爱好者等。 分析用户的学习习惯、偏好以及支付意愿。 了解用户需求: 通过问卷调查、用户https://m.11467.com/blog/d9189690.htm
9.系统架构师入门指南:从零开始学习系统架构设计在线教育平台的系统架构设计需要考虑到用户管理、课程管理、学习记录、考试评测等多个方面。这里我们以一个简单的在线教育平台为例,展示系统架构设计的过程。 需求分析: 在线教育平台需要支持用户注册、登录、课程浏览、学习记录、考试评测等功能。 架构设计: 我们可以采用事件驱动架构来实现这个在线教育平台,通过事件的传播https://www.imooc.com/article/361175
10.教育大数据行业发展现状及前景战略规划建议报告第1章:教育大数据行业综述及数据来源说明 1.1 教育大数据行业界定 1.1.1 教育大数据的定义 1.1.2 教育大数据技术框架 1、技术采集 2、数据处理 3、数据分析 4、应用服务 1.1.3 教育大数据性质特征 1.1.4 教育大数据与传统教育数据的区别 1.1.5 教育大数据的分析体系搭建 https://maimai.cn/article/detail?fid=1846094883&efid=fVq5WNoRddfGQrd6lI_77A
11.8000字干货!在线启蒙教育的产品设计公式(附竞品分析模型工具)娱乐是一个兴奋性需求,作为互联网产品的深度习惯用户,如果能让孩子们一边学习一边娱乐,通过劳逸结合的形式帮助小朋友成长学习,这会非常棒。 4. 小结 从用户动机、痛点和需求分析不难发现,互联网的高速发展是催生在线英语启蒙教育的关键,其次 80、90 后这个核心用户群是互联网当下的主力军,他们消费力强,决策力果敢,https://www.uisdc.com/online-primer-design/
12.开源技术:在线教育系统源码及教育培训APP开发指南二、教育培训APP的开发指南 2.1功能需求分析 常见的功能包括:用户注册与登录、课程浏览与搜索、课程购买与支付、课程学习与测试、互动交流与反馈等。 2.2技术选型 2.2.1前端开发 移动端APP开发通常采用ReactNative或Flutter。ReactNative基于JavaScript,具有良好的跨平台性能和庞大的社区支持。Flutter由Google开发,基于Dart语https://blog.itpub.net/69960494/viewspace-3019578/
13.线上教学平台运行总结(通用15篇)培养学生德智体美劳全面发展是我们教育的根本,为促进学生身心健康均衡发展,疫情期间学校也在线上开展了全学科教学。四年级的小学科任教教师高度重视,积极推送课程,组织学生参与活动。 总之,立足本职工作、关注学生需求、从学生实际出发、努力完成教学任务、为学生创造更好的学习氛围,是我们不断追求并为之不懈奋斗的目标。https://m.oh100.com/kaoshi/jiaoxuezongjie/646722.html
14.在线教育解决方案教育行业教育在线培训解决方案华为云在线培训平台可以智能地将一个云题库与学习需要对接,通过华为云在线协同服务帮助学生完成学习目标;例如学生可以在线学习与自助学习同步的课程云服务,然后同步作答题目,作答完成后由系统智能为你呈现解题过程辅助你提高学习成绩https://www.huaweicloud.com/solution/ces/
15.101种最热门的EdTech工具界面·财经号Teachable是一个在线平台,教育者可以使用轻松创建在线课程或辅导业务。超过10万的创作者,已经使用 Teachable 来分享他们的知识。使用定制工具来策划和定制你的内容,让你的课程简易创作。这个平台可以让你轻松掌握创作技巧——或者其实也不需要技巧,因为非常简单。 https://m.jiemian.com/article/4838337.html
16.产品分析洋葱数学,有趣有内涵的在线教育内容产品人人都是商业模式:主要通过售卖教材课程及专题课程内容;另有一些教辅书籍在线下售卖。 1.3 用户及需求分析 1.3.1 地域及人群分布 根据百度指数2019年4月的数据显示:洋葱数学使用者中女性稍多于男性;注册用户年龄在30~39间的家长及教师人群比较多;在传统的教育大省中,用户人数多。 https://www.woshipm.com/evaluating/2218007.html