-
Notifications
You must be signed in to change notification settings - Fork 0
/
blockcache.go
110 lines (98 loc) · 2.67 KB
/
blockcache.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
package s2randomaccess
import (
"github.com/Jille/easymutex"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/klauspost/compress/s2"
)
// globalLRU contains *decompressedBlocks that contain decompressed data, but are not actively in use.
var globalLRU, _ = lru.NewWithEvict[lruKey, *decompressedBlock](100, onEvicted)
type lruKey struct {
inner *inner
blockOffset int64
}
type decompressedBlock struct {
lruKey
decompressed []byte
refcount int // guarded by Seeker.mtx
}
// deref is called when a user of this library is done with the slice they got from Get().
func (d *decompressedBlock) deref() {
d.inner.mtx.Lock()
d.refcount--
if d.refcount == 0 {
delete(d.inner.active, d.blockOffset)
if d.inner.dying {
d.free()
} else {
globalLRU.Add(d.lruKey, d)
}
}
d.inner.mtx.Unlock()
}
// onEvicted is called when a decompressedBlock is thrown out of the LRU cache.
func onEvicted(k lruKey, d *decompressedBlock) {
if d.inner.mtx.TryLock() {
onEvicted_locked(d)
} else {
// Someone has the lock, possibly this eviction was caused by an Add() holding this lock. Clean up asynchronously.
go func() {
d.inner.mtx.Lock()
onEvicted_locked(d)
}()
}
}
func onEvicted_locked(d *decompressedBlock) {
if d.refcount == 0 {
d.free()
}
d.inner.mtx.Unlock()
}
func (d *decompressedBlock) free() {
d.refcount = -666
d.inner.allocator.Free(d.decompressed)
d.decompressed = nil
}
// getDecompressedBlock finds the S2 block
func (s *Seeker) getDecompressedBlock(offset int64, compressedLength, uncompressedLength int) ([]byte, func(), error) {
em := easymutex.LockMutex(&s.mtx)
defer em.Unlock()
if db, ok := s.active[offset]; ok {
db.refcount++
return db.decompressed, db.deref, nil
}
k := lruKey{s.inner, offset}
if db, ok := globalLRU.Get(k); ok && db.refcount >= 0 {
db.refcount++
return db.decompressed, db.deref, nil
}
em.Unlock()
buf := s.allocator.Alloc(uncompressedLength)
decoded, err := s2.Decode(buf, s.data[offset:][:compressedLength])
if err != nil {
return nil, nil, err
}
em.Lock()
db := &decompressedBlock{k, decoded, 1}
s.active[offset] = db
return decoded, db.deref, err
}
// SetGlobalLRUSize configures the number of blocks that can be in the global compressed blocks LRU at any time.
func SetGlobalLRUSize(n int) {
globalLRU.Resize(n)
}
// PurgeGlobalCache purges the global LRU cache that holds decompressed blocks.
// It is safe to call this function at any time.
func PurgeGlobalCache() {
globalLRU.Purge()
}
func (i *inner) removeFromGlobalCache() {
i.mtx.Lock()
i.dying = true
i.mtx.Unlock()
for _, d := range globalLRU.Values() {
if d.inner == i {
// Remove triggers onEvicted.
globalLRU.Remove(d.lruKey)
}
}
}