如何实现两个分布式服务(订单服务、学习服务)共同完成一件事即订单支付成功自动添加学生选课的需求,这里的关键是如何保证两个分布式服务的事务的一致性。尝试解决上边的需求,在订单服务中远程调用选课接口,伪代码如下:
5、分布式事务有哪些场景?
2.4.2事务补偿(TCC)TCC事务补偿是基于2PC实现的业务层事务控制方案,它是Try、Conrm和Cancel三个单词的首字母,含义如下:1、Try检查及预留业务资源完成提交事务前的检查,并预留好资源。2、Conrm确定执行业务操作对try阶段预留的资源正式执行。3、Cancel取消执行业务操作对try阶段预留的资源释放。
优点:最终保证数据的一致性,在业务层实现事务控制,灵活性好。缺点:开发成本高,每个事务操作每个参与者都需要实现try/conrm/cancel三个接口。
注意:TCC的try/conrm/cancel接口都要实现幂等性,在为在try、conrm、cancel失败后要不断重试。什么是幂等性?幂等性是指同一个操作无论请求多少次,其结果都相同。幂等操作实现方式有:1、操作之前在业务方法进行判断如果执行过了就不再执行。2、缓存所有请求和处理的结果,已经处理的请求则直接返回结果。3、在数据库表中加一个状态字段(未处理,已处理),数据操作时判断未处理时再处理。
优点:由MQ按异步的方式协调完成事务,性能较高。不用实现try/conrm/cancel接口,开发成本比TCC低。缺点:此方式基于关系数据库本地事务来实现,会出现频繁读写数据库记录,浪费数据库资源,另外对于高并发操作不是最佳方案。
在任务表中包括了交换机的名称、路由key等信息为了是将任务的处理做成一个通用的功能。考虑分布式系统并发读取任务处理任务的情况发生项目使用乐观锁的方式解决并发问题。
3SpringTask定时任务
3.1需求分析
根据分布式事务的研究结果,订单服务需要定时扫描任务表向MQ发送任务。本节研究定时任务处理的方案,并实现定时任务扫描任务表并向MQ发送消息。实现定时任务的方案如下:1、使用jdk的Timer和TimerTask实现可以实现简单的间隔执行任务,无法实现按日历去调度执行任务。2、使用Quartz实现Quartz是一个异步任务调度框架,功能丰富,可以实现按日历调度。3、使用SpringTask实现Spring3.0后提供SpringTask实现任务调度,支持按日历调度,相比Quartz功能稍简单,但是在开发基本够用,支持注解编程方式。
本项目使用SpringTask实现任务调度。
3.2SpringTask串行任务
3.2.1编写任务类在Springboot启动类上添加注解:@EnableScheduling新建任务测试类TestTask,编写测试方法如下:
例子:0/3*****每隔3秒执行00/5****每隔5分钟执行000***表示每天0点执行0012*WEN每周三12点执行01510*MON-FRI每月的周一到周五10点15分执行01510*MON,FRI每月的周一和周五10点15分执行
3.2.3串行任务测试参考task1方法的的定义方法,再定义task2方法,此时共用两个任务方法。
3.3.1需求分析在项目通常是需要多个不同的任务并行去执行。本节实现SpringTask并行执行任务的方法。3.3.2配置异步任务创建异步任务配置类,需要配置线程池实现多线程调度任务。
@Configuration@EnableSchedulingpublicclassAsyncTaskConfigimplementsSchedulingConfigurer,AsyncConfigurer{//线程池线程数量privateintcorePoolSize=5;@BeanpublicThreadPoolTaskSchedulertaskScheduler(){ThreadPoolTaskSchedulerscheduler=newThreadPoolTaskScheduler();scheduler.initialize();//初始化线程池scheduler.setPoolSize(corePoolSize);//线程池容量returnscheduler;}@OverridepublicExecutorgetAsyncExecutor(){Executorexecutor=taskScheduler();returnexecutor;}@OverridepublicAsyncUncaughtExceptionHandlergetAsyncUncaughtExceptionHandler(){returnnull;}@OverridepublicvoidconfigureTasks(ScheduledTaskRegistrarscheduledTaskRegistrar){scheduledTaskRegistrar.setTaskScheduler(taskScheduler());}}将@EnableScheduling添加到此配置类上,SpringBoot启动类上不用再添加@EnableScheduling3.3.3测试通过测试发现两个任务由不同的线程在并行执行,互不影响。
4.2RabbitMQ配置
4.5.1Dao添加更新任务方法:
publicinterfaceXcTaskRepositoryextendsJpaRepository
@TransactionalpublicintgetTask(StringtaskId,intversion){inti=xcTaskRepository.updateTaskVersion(taskId,version);returni;}3、执行任务类中修改
...//任务idStringtaskId=xcTask.getId();//版本号Integerversion=xcTask.getVersion();//调用乐观锁方法校验任务是否可以执行if(taskService.getTask(taskId,version)>0){//发送选课消息taskService.publish(xcTask,xcTask.getMqExchange(),xcTask.getMqRoutingkey());LOGGER.info("sendchoosecoursetaskid:{}",taskId);}...4自动添加选课开发4.1学习服务添加选课4.1.1需求分析学习服务接收MQ发送添加选课消息,执行添加选课操作。添加选课成功向学生选课表插入记录、向历史任务表插入记录、并向MQ发送“完成选课”消息。
4.1.3Dao学生选课Dao:
publicinterfaceXcLearningCourseRepositoryextendsJpaRepository
publicinterfaceXcTaskHisRepositoryextendsJpaRepository
4.2.1需求分析订单服务接收MQ完成选课的消息,将任务从当前任务表删除,将完成的任务添加到完成任务表。4.2.2Dao1、删除xc_task2、添加xc_task_his定义过程略4.2.3Service在TaskService中定义删除任务方法
//删除任务@TransactionalpublicvoidfinishTask(StringtaskId){Optional
/***接收选课响应结果*/@RabbitListener(queues={RabbitMQConfig.xc_learning_finishaddchoosecourse})publicvoidreceiveFinishChoosecourseTask(XcTasktask,Messagemessage,Channelchannel)throwsIOException{LOGGER.info("receiveChoosecourseTask...{}",task.getId());//接收到的消息idStringid=task.getId();//删除任务,添加历史任务taskService.finishTask(id);}4.3集成测试测试流程如下:1、手动向任务表添加一条任务。2、启动rabbitMQ.3、启动订单服务、选课服务。4、观察日志是否添加选课成功完成任务后将xc_task任务移动到xc_task_his表中完成任务后在选课表中多了一条学生选课记录
测试消息重复消费:1、手动向任务表添加一条任务。2、启动rabbitMQ.3、先启动订单表,等待消息队列是否积累了多个消息。4、再启动选课服务,观察是否重复添加选课