-
Notifications
You must be signed in to change notification settings - Fork 125
/
Copy pathbucket.go
133 lines (110 loc) · 3.56 KB
/
bucket.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package timingwheel
import (
"container/list"
"sync"
"sync/atomic"
"unsafe"
)
// Timer represents a single event. When the Timer expires, the given
// task will be executed.
type Timer struct {
expiration int64 // in milliseconds
task func()
// The bucket that holds the list to which this timer's element belongs.
//
// NOTE: This field may be updated and read concurrently,
// through Timer.Stop() and Bucket.Flush().
b unsafe.Pointer // type: *bucket
// The timer's element.
element *list.Element
}
func (t *Timer) getBucket() *bucket {
return (*bucket)(atomic.LoadPointer(&t.b))
}
func (t *Timer) setBucket(b *bucket) {
atomic.StorePointer(&t.b, unsafe.Pointer(b))
}
// Stop prevents the Timer from firing. It returns true if the call
// stops the timer, false if the timer has already expired or been stopped.
//
// If the timer t has already expired and the t.task has been started in its own
// goroutine; Stop does not wait for t.task to complete before returning. If the caller
// needs to know whether t.task is completed, it must coordinate with t.task explicitly.
func (t *Timer) Stop() bool {
stopped := false
for b := t.getBucket(); b != nil; b = t.getBucket() {
// If b.Remove is called just after the timing wheel's goroutine has:
// 1. removed t from b (through b.Flush -> b.remove)
// 2. moved t from b to another bucket ab (through b.Flush -> b.remove and ab.Add)
// this may fail to remove t due to the change of t's bucket.
stopped = b.Remove(t)
// Thus, here we re-get t's possibly new bucket (nil for case 1, or ab (non-nil) for case 2),
// and retry until the bucket becomes nil, which indicates that t has finally been removed.
}
return stopped
}
type bucket struct {
// 64-bit atomic operations require 64-bit alignment, but 32-bit
// compilers do not ensure it. So we must keep the 64-bit field
// as the first field of the struct.
//
// For more explanations, see https://golang.org/pkg/sync/atomic/#pkg-note-BUG
// and https://go101.org/article/memory-layout.html.
expiration int64
mu sync.Mutex
timers *list.List
}
func newBucket() *bucket {
return &bucket{
timers: list.New(),
expiration: -1,
}
}
func (b *bucket) Expiration() int64 {
return atomic.LoadInt64(&b.expiration)
}
func (b *bucket) SetExpiration(expiration int64) bool {
return atomic.SwapInt64(&b.expiration, expiration) != expiration
}
func (b *bucket) Add(t *Timer) {
b.mu.Lock()
e := b.timers.PushBack(t)
t.setBucket(b)
t.element = e
b.mu.Unlock()
}
func (b *bucket) remove(t *Timer) bool {
if t.getBucket() != b {
// If remove is called from within t.Stop, and this happens just after the timing wheel's goroutine has:
// 1. removed t from b (through b.Flush -> b.remove)
// 2. moved t from b to another bucket ab (through b.Flush -> b.remove and ab.Add)
// then t.getBucket will return nil for case 1, or ab (non-nil) for case 2.
// In either case, the returned value does not equal to b.
return false
}
b.timers.Remove(t.element)
t.setBucket(nil)
t.element = nil
return true
}
func (b *bucket) Remove(t *Timer) bool {
b.mu.Lock()
defer b.mu.Unlock()
return b.remove(t)
}
func (b *bucket) Flush(reinsert func(*Timer)) {
b.mu.Lock()
defer b.mu.Unlock()
for e := b.timers.Front(); e != nil; {
next := e.Next()
t := e.Value.(*Timer)
b.remove(t)
// Note that this operation will either execute the timer's task, or
// insert the timer into another bucket belonging to a lower-level wheel.
//
// In either case, no further lock operation will happen to b.mu.
reinsert(t)
e = next
}
b.SetExpiration(-1)
}