forked from RussellLuo/timingwheel
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbucket.go
121 lines (101 loc) · 3.03 KB
/
bucket.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
package timingwheel
import (
"container/list"
"sync"
"sync/atomic"
"unsafe"
)
// Timer represents a single event. When the Timer expires, the given
// task will be executed.
type Timer struct {
expiration int64 // in milliseconds
task func()
// The bucket that holds the list to which this timer's element belongs.
//
// NOTE: This field may be updated and read concurrently,
// through Timer.Stop() and Bucket.Flush().
b unsafe.Pointer // type: *bucket
// The timer's element.
element *list.Element
}
func (t *Timer) getBucket() *bucket {
return (*bucket)(atomic.LoadPointer(&t.b))
}
func (t *Timer) setBucket(b *bucket) {
atomic.StorePointer(&t.b, unsafe.Pointer(b))
}
// Stop prevents the Timer from firing. It returns true if the call
// stops the timer, false if the timer has already expired or been stopped.
//
// If the timer t has already expired and the t.task has been started in its own
// goroutine; Stop does not wait for t.task to complete before returning. If the caller
// needs to know whether t.task is completed, it must coordinate with t.task explicitly.
func (t *Timer) Stop() bool {
stopped := false
for b := t.getBucket(); b != nil; b = t.getBucket() {
// If b.Remove is called just after the timing wheel's goroutine has:
// 1. removed t from b (through b.Flush -> b.remove)
// 2. moved t from b to another bucket ab (through b.Flush -> b.remove and ab.Add)
// this may fail to remove t due to the change of t's bucket.
stopped = b.Remove(t)
// Thus, here we re-get t's possibly new bucket (nil for case 1, or ab (non-nil) for case 2),
// and retry until the bucket becomes nil, which indicates that t has finally been removed.
}
return stopped
}
type bucket struct {
mu sync.Mutex
timers *list.List
expiration int64
}
func newBucket() *bucket {
return &bucket{
timers: list.New(),
expiration: -1,
}
}
func (b *bucket) Expiration() int64 {
return atomic.LoadInt64(&b.expiration)
}
func (b *bucket) SetExpiration(expiration int64) bool {
return atomic.SwapInt64(&b.expiration, expiration) != expiration
}
func (b *bucket) Add(t *Timer) {
b.mu.Lock()
e := b.timers.PushBack(t)
t.setBucket(b)
t.element = e
b.mu.Unlock()
}
func (b *bucket) remove(t *Timer) bool {
if t.getBucket() != b {
// If remove is called from t.Stop, and this happens just after the timing wheel's goroutine has:
// 1. removed t from b (through b.Flush -> b.remove)
// 2. moved t from b to another bucket ab (through b.Flush -> b.remove and ab.Add)
// then t.getBucket will return nil for case 1, or ab (non-nil) for case 2.
// In either case, the returned value does not equal to b.
return false
}
b.timers.Remove(t.element)
t.setBucket(nil)
t.element = nil
return true
}
func (b *bucket) Remove(t *Timer) bool {
b.mu.Lock()
defer b.mu.Unlock()
return b.remove(t)
}
func (b *bucket) Flush(reinsert func(*Timer)) {
b.mu.Lock()
e := b.timers.Front()
for e != nil {
next := e.Next()
t := e.Value.(*Timer)
b.remove(t)
reinsert(t)
e = next
}
b.mu.Unlock()
b.SetExpiration(-1)
}