调度器-执行器的职责是获取调度对象,发给调度器去按照调度模式进行调度,最终给执行器执行相应逻辑。 整体由四个部分组成:
- Poller:定时从数据源获取调度对象列表,通过遍历把调度对象提交给Scheduler进行调度。
- Subscriber:监听指定主题的消息队列,把拉取到的id提交给Scheduler进行调度。
- Scheduler:处理接收的id,最终获取调度对象并提交给Worker。
- Worker:根据具体的调度对象来执行相应逻辑。
有三种调度模式:
- Cron:调度器根据cron表达式决定执行任务的时机。
- Period:调度器根据时间周期(比如每秒、每10分钟...)决定执行任务的时间。
- Plan:调度器根据指定的具体时间决定执行任务的时机,只会执行一次。
Poll模块分为:
- CronPoller:指定调度时间(默认每5分钟)调用FindCronScheduleObjects获取调度对象列表,通过遍历把id跟调度对象提交给Scheduler进行调度。
- PeriodPoller:指定调度时间(默认每5分钟)调用FindPeriodScheduleObjects获取调度对象列表,通过遍历把id跟调度对象提交给Scheduler进行调度。
- PlanPoller:指定调度时间(默认每5分钟)调用FindPlanScheduleObjects获取调度对象列表,通过遍历把id跟调度对象提交给Scheduler进行调度。
Subcribe作为消息队列的消费者,会根据传入的订阅主题来不断从消息队列拉取相应的调度对象的id,然后把id提交给Scheduler进行调度。
Scheduler作为调度器负责分派调度对象给不同的调度器处理,由下面组成:
- CronScheduler:处理调度模式为cron类型的调度对象,并最终提交到Worker进行处理。
- PeriodScheduler:处理调度模式为period类型的调度对象,并最终提交到Worker进行处理。
- PlanScheduler:处理调度模式为plan类型的调度对象,并最终提交到Worker进行处理。
2个调度器只是创建的runner类型不同,其代码逻辑基本一致。 维护一个重要的成员 runnerMap。顾名思义,它是一个map对象,key值为调度对象的id,value为CronRunner对象,用于存储每个调度对象对应的定时任务对象。 运作过程如下图:(比较简陋,但应该可以知道干了什么)
调度器维护三个重要成员:
- pendingId:监听到的id。
- ids:chan类型,会不断监听该成员,把获取到的id赋值给pendingId。
- scheduleObjectHeap:根据通知时间排序的最小堆。
因为plan类型的在不修改的情况下只会发一次,所以调用UpdatePlanHandledStatusById函数来更新类似状态的字段标识它是否已经执行过了。 运作过程如下图:(比较简陋,但应该可以知道干了什么)
执行器会根据接收的调度对象,来执行Work函数来实现相应的逻辑。Work函数接收的是BaseScheduleObject对象,最终执行的是它的Do函数。
func (worker *Worker) Work(scheduleObject scheduleriface.BaseScheduleObject) error {
err := scheduleObject.Do(nil)
if err != nil {
worker.logger.Warn("WorkFailed",
zap.Error(err),
)
return err
}
return nil
}
调度对象必需实现BeseScheduleObject、BeseScheduleObjectManager接口的方法,其他的话根据自己所需的调度模式来实现对应的接口。
例子在test包下
定义BaseScheduleObject接口,获取调度对象的基础属性,基于此又定义三种调度模式的相应对象接口
- CronScheduleObject:cron类型的调度对象,处理调度时间
- PeriodScheduleObject:period类型的调度对象,处理调度时间
- PlanScheduleObject:plan类型的调度对象,处理调度时间
- PlanScheduleObjectHeapByTime:plan类型的调度需要的堆对象
type BaseScheduleObject interface {
GetId() string
GetName() string
GetScheduleMode() string
GetVersion() int
IsDeleted() bool
Do(context.Context) error
IsCronMode() (CronScheduleObject, bool)
IsPeriodMode() (PeriodScheduleObject, bool)
IsPlanMode() (PlanScheduleObject, bool)
}
type BaseScheduleObjects []BaseScheduleObject
type CronScheduleObject interface {
BaseScheduleObject
GetCronSpec() string
}
type CronScheduleObjects []CronScheduleObject
func (objects CronScheduleObjects) Append(others ...CronScheduleObject) CronScheduleObjects {
return append(objects, others...)
}
type PeriodScheduleObject interface {
BaseScheduleObject
GetSchedulePeriod() time.Duration
}
type PeriodScheduleObjects []PeriodScheduleObject
func (objects PeriodScheduleObjects) Append(others ...PeriodScheduleObject) PeriodScheduleObjects {
return append(objects, others...)
}
type PlanScheduleObject interface {
BaseScheduleObject
GetPlannedScheduleTime() time.Time
}
type PlanScheduleObjects []PlanScheduleObject
func (objects PlanScheduleObjects) Append(others ...PlanScheduleObject) PlanScheduleObjects {
return append(objects, others...)
}
type PlanScheduleObjectHeapByTime PlanScheduleObjects
func (h PlanScheduleObjectHeapByTime) Native() PlanScheduleObjects {
return PlanScheduleObjects(h)
}
func (h PlanScheduleObjectHeapByTime) Len() int {
return len(h)
}
func (h PlanScheduleObjectHeapByTime) Less(i, j int) bool {
return h[i].GetPlannedScheduleTime().Before(h[j].GetPlannedScheduleTime())
}
func (h PlanScheduleObjectHeapByTime) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
}
func (h *PlanScheduleObjectHeapByTime) Push(x interface{}) {
*h = append(*h, x.(PlanScheduleObject))
}
func (h *PlanScheduleObjectHeapByTime) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
*h = old[:n-1]
return x
}
func (h PlanScheduleObjectHeapByTime) Empty() bool {
return h.Len() == 0
}
func (h PlanScheduleObjectHeapByTime) Top() PlanScheduleObject {
if h.Empty() {
return nil
}
return h[0]
}
func (h *PlanScheduleObjectHeapByTime) Enqueue(s PlanScheduleObject) {
heap.Push(h, s)
}
func (h *PlanScheduleObjectHeapByTime) Dequeue() PlanScheduleObject {
return heap.Pop(h).(PlanScheduleObject)
}
type BaseScheduleObjectManager interface {
FindScheduleObjectById(ctx context.Context, id string) (BaseScheduleObject, error)
}
type CronScheduleObjectManager interface {
BaseScheduleObjectManager
FindCronScheduleObjects(ctx context.Context) (CronScheduleObjects, error)
FindCronScheduleObjectById(ctx context.Context, id string) (CronScheduleObject, error)
}
type PeriodScheduleObjectManager interface {
BaseScheduleObjectManager
FindPeriodScheduleObjects(ctx context.Context) (PeriodScheduleObjects, error)
FindPeriodScheduleObjectById(ctx context.Context, id string) (PeriodScheduleObject, error)
}
type PlanScheduleObjectManager interface {
BaseScheduleObjectManager
FindPlanScheduleObjects(ctx context.Context) (PlanScheduleObjects, error)
FindPlanScheduleObjectById(ctx context.Context, id string) (PlanScheduleObject, error)
// UpdatePlanScheduleObjectStatusById 根据id更新已处理的plan类型的调度对象的状态, status为true则说明处理成功,反之处理失败
UpdatePlanScheduleObjectStatusById(ctx context.Context, id string, status bool) error
}
管理接口用于对调度对象的数据源进行处理(获取与修改),根据三种调度模式制定四种抽象接口
- BaseScheduleObjectManager:基础管理接口
- CronScheduleObjectManager:cron类型的,用户需实现ScheduleAble接口的方法
- PeriodScheduleObjectManager:period类型的调度对象,处理版本与调度时间
- PlanScheduleObjectManager:plan类型的调度对象,处理调度时间
type BaseScheduleObjectManager interface {
FindScheduleObjectById(ctx context.Context, id primitive.ObjectID) (BaseScheduleObject, error)
}
type CronScheduleObjectManager interface {
BaseScheduleObjectManager
FindCronScheduleObjects(ctx context.Context) (CronScheduleObjects, error)
FindCronScheduleObjectById(ctx context.Context, id primitive.ObjectID) (CronScheduleObject, error)
}
type PeriodScheduleObjectManager interface {
BaseScheduleObjectManager
FindPeriodScheduleObjects(ctx context.Context) (PeriodScheduleObjects, error)
FindPeriodScheduleObjectById(ctx context.Context, id primitive.ObjectID) (PeriodScheduleObject, error)
}
type PlanScheduleObjectManager interface {
BaseScheduleObjectManager
FindPlanScheduleObjects(ctx context.Context) (PlanScheduleObjects, error)
FindPlanScheduleObjectById(ctx context.Context, id primitive.ObjectID) (PlanScheduleObject, error)
// UpdatePlanScheduleObjectStatusById 根据id更新已处理的plan类型的调度对象的状态, status为true则说明处理成功,反之处理失败
UpdatePlanScheduleObjectStatusById(ctx context.Context, id primitive.ObjectID, status bool) error
}