博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
微服务架构下的轻量级定时任务解决方案
阅读量:6575 次
发布时间:2019-06-24

本文共 5440 字,大约阅读时间需要 18 分钟。

微服务的概念可以说给程序设计打开了一个新世界,带来了众多的优点,但是也将一些以往容易处理的问题变得复杂,例如:缓存、事务、定时任务等。缓存可以用中间件例如redis、memcached等,事务有诸多分布式事务框架解决,定时任务也有分布式的解决方案,例如quartz、elastic job等,今天我要讲的是就是定时任务。

既然已经有成熟的分布式定时任务框架,我要讲的东西并不是用另一种设计去实现相同的功能,而是从不同的角度去解决分布式定时任务的问题。

问题来源

这个问题来起源于一个小功能,我们有一个发送短信的微服务,需要获取短信的状态报告,状态报告对于短信发送不是同步的,短信提交到服务商,服务商要提交运营商发送之后才能生成状态报告,因此有一定的延迟,需要异步获取,并且服务商提供的接口有频率限制,因此需要做一个定时任务,且需要单点执行,那么问题来了,因为这一个功能我就需要引入一个定时任务框架吗,总感觉有点大材小用的意思。

之前我们的定时任务处理既有用过quartz,也用过elastic job,但是只为这样一个小功能就引入一个框架,再加上配置又得好半天,想想都不划算。

例如要用quartz,要创建一堆数据库表,但表里面只存储了一个任务信息。

用elastic job吧,还要使用zookeeper,即便用lite版,也需要一堆配置,远比我写业务的时间要长。

我只想简简单单的写逻辑!!!

解决方案

谈分布式解决方案大致总离不开中间件,联想到上次解决websocket的分布式方案(参见)使用到的Spring Cloud Stream,大概有了思路:

  1. 我需要一个任务分发中心,专门负责触发定时任务
  2. 其他服务如果需要触发定时任务,接收特定的触发消息
  3. 任务执行完成向任务分发中心推送任务完成的确认消息
  4. 为任务执行端提供一个公共的spring boot starter晚上2,3的步骤,实际需要编码的几乎就剩下业务逻辑本身了

详细设计

根据上一步的方案,需要确认一些细节,以及一些特殊的情况,例如定时任务可能是由微服务集群中单个实例执行,也可能存在集体执行(例如更新内存中的缓存),还可能存在分区执行。

客户端(需要定时任务的为服务端)需要建立以下消息队列:

  1. 集群接收的队列,每个微服务实例建立一个,每个微服务实例都会收到相同消息
  2. 单独接收的队列,每个应用集群建立一个,确保消息只被一个实例消费
  3. 按分区接收的队列,每个分区建立一个,确保只被分区内一个实例消费

客户端与服务端需要通过唯一的任务id来确认需要执行的定时任务

服务端(任务分发微服务)需要根据情况将消息推送到不同的队列,不能直接使用Spring Cloud Stream,需要使用rabbitmq

服务端本身也是分布式的,因此需要一个定时任务框架用于任务触发,我这里选择了quartz

代码实现

Spring Cloud Stream的基本知识我不再复述了,中有讲解。

定时任务分发服务

定义定时任务

data class ScheduleTask(    /** 任务的id,全局唯一,与客户端的taskId完全匹配 */    var taskId: String = "",    /** 定时任务的cron 表达式 */    var cron: String = "",    /** 关联应用 */    var appId: Int = 0,    /** 任务描述 */    var description: String = "",    /** 接收任务的分区 */    var zone: String? = null,    /**  调度方式,广播到集群或单例执行,默认单例 */    var dispatchMode: DispatchMode = DispatchMode.Singleton,    /**  是否启用 */    var enabled: Boolean = true,    /** 任务的数据库记录 id,自增 */    var id: Int = -1) 复制代码

任务调度

使用quartz进行任务调度

private fun scheduleJob(task: ScheduleTask) {    val job = JobBuilder.newJob(TaskEmitterJob::class.java)        .withIdentity(task.taskId, task.appId.toString())        .withDescription(task.description)        .storeDurably()        .requestRecovery()        .usingJobData("id", task.id)        .usingJobData("taskId", task.taskId)        .build()    val trigger = TriggerBuilder.newTrigger()        .withIdentity(task.taskId, task.appId.toString())        .withSchedule(CronScheduleBuilder.cronSchedule(task.cron))        .forJob(job)        .build()    scheduler.addJob(job, true, true)    if (scheduler.checkExists(trigger.key)) {      scheduler.rescheduleJob(trigger.key, trigger)    } else {      scheduler.scheduleJob(trigger)    }  }复制代码

ScheduleTask是持久化的,插入的时候同时向quartz插入任务,更新的时候也要向quartz更新,删除的时候同时删除

quartz的任务触发

class TaskEmitterJob : Job {
companion object { private val log = LogFactory.getLog(TaskEmitterJob::class.java) } override fun execute(context: JobExecutionContext) { try { val taskId = context.jobDetail.jobDataMap["taskId"] as String log.info("任务分发:$taskId") val service = ScheduleCenterApplication.context.getBean(ScheduleTaskService::class.java) service.launch(taskId) } catch (e: Exception) { log.error("任务失败$[taskId]", e) } }}复制代码

rabbitmq的发送逻辑

/**   * 发布定时任务事件   */  fun launch(task: ScheduleTask) {    val exchange = when (task.dispatchMode) {      Cluster   -> "aegisScheduleCluster"      Singleton -> "aegisScheduleSingleton"    }    val routingKey = when (task.dispatchMode) {      Cluster   -> exchange      Singleton -> "$exchange.${task.appName}"    }    val executeTaskInfo = ScheduleTaskInfo(task.taskId, task.appName!!)    amqpTemplate.convertAndSend(exchange, routingKey,        executeTaskInfo)    taskExecuteRecordDAO.save(        TaskExecuteRecord(executeTaskInfo.uid, task.id, Date())    )  }复制代码

客户端spring boot starter的实现

定义定时任务接口,只要在项目中实现该接口并将实现声明为bean,即可完成定时任务的定义

@FunctionalInterfaceinterface ScheduledJob {  /**   * 执行定时任务   */  fun execute(properties: Map
) /** * 获取定时任务id * @return 定时任务id,对应任务分发中心ScheduleTask的taskId */ fun getId(): String}复制代码

接收任务

/**   * 接收单例任务   */  @StreamListener(SINGLETON_INPUT)  fun acceptGroupTask(taskInfo: ScheduleTaskInfo) {    if (taskInfo.app == application) {      val receivedTime = Date()      val job = jobsProvider.ifAvailable?.firstOrNull {        it.getId() == taskInfo.id      }      job?.execute(taskInfo.properties ?: mapOf())      singletonOutput.send(GenericMessage(          ConfirmInfo(taskInfo.id, taskInfo.uid, job != null, receivedTime, Date())      ))    }  }复制代码

集群全体执行任务与单例任务的区别只在stream的配置,一个需要声明binding的group,一个不需要,这属于Spring Cloud Stream的知识范畴,可以自己看官方文档或查看我前面提到的文档,如果有不懂的可以私聊我。

stream的事件流声明

/** * 定时任务信息的事件流接口 * @author 吴昊 * @since 0.1.0 */interface AegisScheduleClient {  companion object {    const val CLUSTER_INPUT = "aegisScheduleClusterInput"    const val SINGLETON_INPUT = "aegisScheduleSingletonInput"    const val CONFIRM_OUTPUT = "aegisScheduleGroupOutput"  }  /**   *   * @return   */  @Input(CLUSTER_INPUT)  fun scheduleInput(): SubscribableChannel  /**   *   * @return   */  @Input(SINGLETON_INPUT)  fun singletonScheduleInput(): SubscribableChannel  /**   *   * @return   */  @Output(CONFIRM_OUTPUT)  fun confirmOutput(): MessageChannel}复制代码

最后再加上服务端确认消息的接收代码:

@StreamListener(CONFIRM_INPUT)  fun acceptGroupTask(confirmInfo: ConfirmInfo) {    LOG.info("接收到确认消息:$confirmInfo")    scheduleTaskService.confirm(confirmInfo)  }复制代码

主要的代码已经全部放上来了,整体思路也很简单,后面仍有很多需要优化的地方,例如消息推送失败,或者确认消息未送达等等,于整体设计并没有多大的影响了。

这样在微服务端如果需要添加定时任务,只需要

  1. 引入starter
  2. 实现ScheduledJob接口
  3. 在任务调度中心添加任务

至于在任务中心添加任务,主题代码有了,实现个简单管理界面很容易对不对,也就几个字段的输入。

最后附上管理界面的截图:

任务列表

任务详情

我的其他文章:

转载地址:http://flgjo.baihongyu.com/

你可能感兴趣的文章
在java中String类为什么要设计成final
查看>>
前端框架——Jquery——基础篇7__工具函数(Utils)
查看>>
日常学习随笔-数组、单链表、双链表三种形式实现队列结构的基本操作(源码注释)...
查看>>
select 中添加option的注意
查看>>
Codeforces #369 div2 D.Directed Roads
查看>>
vijos1153猫狗大战
查看>>
炮(棋盘DP)
查看>>
改造二叉树 (长乐一中模拟赛day2T1)
查看>>
Cloud Foundry 在 Azure 中国正式发布
查看>>
transform 二维转变
查看>>
v-on指令
查看>>
[存档]xx-09210xxx-2010-ACM-ICPC竞赛总结
查看>>
万能的林萧说:我来告诉你,一个草根程序员如何进入BAT。 - 今日头条(www.toutiao.com)...
查看>>
devenv /ResetSkipPkgs
查看>>
【转载】如何使员工更敬业
查看>>
[转注自官网]Cocos2d-x Tutorial 4 - 如何放出子弹(Glede Edition for 2.0.3)
查看>>
第十一讲:集合
查看>>
jQuery幸运大转盘_jQuery+PHP抽奖程序
查看>>
瑞星对Windows7捆绑杀毒软件等消息的回应
查看>>
Silverlight 2.5D RPG游戏技巧与特“.NET技术”效处理:(十一)AI系统
查看>>