diff --git a/i18n/cn/docusaurus-plugin-content-docs/current/blog/tool/executors.md b/i18n/cn/docusaurus-plugin-content-docs/current/blog/tool/executors.md index f32231dc4..469d96e5e 100644 --- a/i18n/cn/docusaurus-plugin-content-docs/current/blog/tool/executors.md +++ b/i18n/cn/docusaurus-plugin-content-docs/current/blog/tool/executors.md @@ -184,7 +184,7 @@ func New...(interval time.Duration, container TaskContainer) *PeriodicalExecutor container: container, confirmChan: make(chan lang.PlaceholderType), newTicker: func(d time.Duration) timex.Ticker { - return timex.NewTicker(interval) + return timex.NewTicker(d) }, } ... @@ -214,23 +214,22 @@ func (pe *PeriodicalExecutor) Add(task interface{}) { func (pe *PeriodicalExecutor) addAndCheck(task interface{}) (interface{}, bool) { pe.lock.Lock() defer func() { - // 一开始为 false - var start bool + // 在第一条 task 加入的时候就会执行 backgroundFlush()。后台协程刷 task if !pe.guarded { - // backgroundFlush() 会将 guarded 重新置反 pe.guarded = true - start = true + // defer to unlock quickly + // backgroundFlush() 会将 guarded 重新置反 + defer pe.backgroundFlush() } pe.lock.Unlock() - // 在第一条 task 加入的时候就会执行 if 中的 backgroundFlush()。后台协程刷task - if start { - pe.backgroundFlush() - } }() - // 控制maxTask,>=maxTask 将container中tasks pop, return + + // 控制 maxTask,>=maxTask 将 container 中tasks pop, return if pe.container.AddTask(task) { + // 正在执行的个数 +1 + atomic.AddInt32(&pe.inflight, 1) return pe.container.RemoveAll(), true - } + } return nil, false } @@ -245,6 +244,9 @@ func (pe *PeriodicalExecutor) addAndCheck(task interface{}) (interface{}, bool) func (pe *PeriodicalExecutor) backgroundFlush() { // 封装 go func(){} threading.GoSafe(func() { + // flush before quit goroutine to avoid missing tasks + defer pe.Flush() + ticker := pe.newTicker(pe.interval) defer ticker.Stop() @@ -255,6 +257,7 @@ func (pe *PeriodicalExecutor) backgroundFlush() { // 从channel拿到 []tasks case vals := <-pe.commander: commanded = true + atomic.AddInt32(&pe.inflight, -1) // 实质:wg.Add(1) pe.enterExecution() // 放开 Add() 的阻塞,而且此时暂存区也为空。才开始新的 task 加入 @@ -264,18 +267,13 @@ func (pe *PeriodicalExecutor) backgroundFlush() { last = timex.Now() case <-ticker.Chan(): if commanded { - // 由于select选择的随机性,如果同时满足两个条件同时执行完上面的,此处置反,并跳过本段执行 + // 由于 select 选择的随机性,如果同时满足两个条件同时执行完上面的,此处置反,并跳过本段执行 // https://draveness.me/golang/docs/part2-foundation/ch05-keyword/golang-select/ commanded = false } else if pe.Flush() { - // 刷新完成,定时器清零。暂存区空了,开始下一次定时刷新 + // 刷新完成,定时器清零。暂存区空了,开始下一次定时刷新 last = timex.Now() - } else if timex.Since(last) > pe.interval*idleRound { - // 既没到maxTask,Flush() err,并且 last->now 时间过长,会再次触发 Flush() - // 只有这置反,才会开启一个新的 backgroundFlush() 后台协程 - pe.guarded = false - // 再次刷新,防止漏掉 - pe.Flush() + } else if pe.shallQuit(last) { return } }