Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update executors.md #42

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func New...(interval time.Duration, container TaskContainer) *PeriodicalExecutor
container: container,
confirmChan: make(chan lang.PlaceholderType),
newTicker: func(d time.Duration) timex.Ticker {
return timex.NewTicker(interval)
return timex.NewTicker(d)
},
}
...
Expand Down Expand Up @@ -214,23 +214,22 @@ func (pe *PeriodicalExecutor) Add(task interface{}) {
func (pe *PeriodicalExecutor) addAndCheck(task interface{}) (interface{}, bool) {
pe.lock.Lock()
defer func() {
// 一开始为 false
var start bool
// 在第一条 task 加入的时候就会执行 backgroundFlush()。后台协程刷 task
if !pe.guarded {
// backgroundFlush() 会将 guarded 重新置反
pe.guarded = true
start = true
// defer to unlock quickly
// backgroundFlush() 会将 guarded 重新置反
defer pe.backgroundFlush()
}
pe.lock.Unlock()
// 在第一条 task 加入的时候就会执行 if 中的 backgroundFlush()。后台协程刷task
if start {
pe.backgroundFlush()
}
}()
// 控制maxTask,>=maxTask 将container中tasks pop, return

// 控制 maxTask,>=maxTask 将 container 中tasks pop, return
if pe.container.AddTask(task) {
// 正在执行的个数 +1
atomic.AddInt32(&pe.inflight, 1)
return pe.container.RemoveAll(), true
}
}

return nil, false
}
Expand All @@ -245,6 +244,9 @@ func (pe *PeriodicalExecutor) addAndCheck(task interface{}) (interface{}, bool)
func (pe *PeriodicalExecutor) backgroundFlush() {
// 封装 go func(){}
threading.GoSafe(func() {
// flush before quit goroutine to avoid missing tasks
defer pe.Flush()

ticker := pe.newTicker(pe.interval)
defer ticker.Stop()

Expand All @@ -255,6 +257,7 @@ func (pe *PeriodicalExecutor) backgroundFlush() {
// 从channel拿到 []tasks
case vals := <-pe.commander:
commanded = true
atomic.AddInt32(&pe.inflight, -1)
// 实质:wg.Add(1)
pe.enterExecution()
// 放开 Add() 的阻塞,而且此时暂存区也为空。才开始新的 task 加入
Expand All @@ -264,18 +267,13 @@ func (pe *PeriodicalExecutor) backgroundFlush() {
last = timex.Now()
case <-ticker.Chan():
if commanded {
// 由于select选择的随机性,如果同时满足两个条件同时执行完上面的,此处置反,并跳过本段执行
// 由于 select 选择的随机性,如果同时满足两个条件同时执行完上面的,此处置反,并跳过本段执行
// https://draveness.me/golang/docs/part2-foundation/ch05-keyword/golang-select/
commanded = false
} else if pe.Flush() {
// 刷新完成,定时器清零。暂存区空了,开始下一次定时刷新
// 刷新完成,定时器清零。暂存区空了,开始下一次定时刷新
last = timex.Now()
} else if timex.Since(last) > pe.interval*idleRound {
// 既没到maxTask,Flush() err,并且 last->now 时间过长,会再次触发 Flush()
// 只有这置反,才会开启一个新的 backgroundFlush() 后台协程
pe.guarded = false
// 再次刷新,防止漏掉
pe.Flush()
} else if pe.shallQuit(last) {
return
}
}
Expand Down