-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathutils.go
117 lines (103 loc) · 2.12 KB
/
utils.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
package mbbolt
import (
"bufio"
"fmt"
"io"
"runtime"
"strings"
"sync"
"time"
"unsafe"
"go.oneofone.dev/genh"
)
type DBer interface {
CurrentIndex(bucket string) uint64
NextIndex(bucket string) (uint64, error)
SetNextIndex(bucket string, index uint64) error
Buckets() []string
Get(bucket, key string, v any) error
ForEachBytes(bucket string, fn func(k, v []byte) error) error
Put(bucket, key string, v any) error
Delete(bucket, key string) error
}
var (
_ DBer = (*DB)(nil)
_ DBer = (*SegDB)(nil)
)
type (
ConvertFn = func(bucket string, k, v []byte) ([]byte, bool)
batcher interface {
UseBatch(v bool) bool
}
)
func ConvertDB(dst, src DBer, fn ConvertFn) error {
// batching greatly slows down sync operations
if dst, ok := dst.(batcher); ok {
defer dst.UseBatch(dst.UseBatch(false))
}
if src, ok := src.(batcher); ok {
defer src.UseBatch(src.UseBatch(false))
}
if fn == nil {
fn = func(bucket string, k, v []byte) ([]byte, bool) {
return v, true
}
}
for _, bkt := range src.Buckets() {
if err := dst.SetNextIndex(bkt, src.CurrentIndex(bkt)); err != nil {
return err
}
if err := src.ForEachBytes(bkt, func(k, v []byte) error {
v, ok := fn(bkt, k, v)
if !ok {
return nil
}
return dst.Put(bkt, string(k), v)
}); err != nil {
return err
}
}
return nil
}
func FramesToString(frs *runtime.Frames) string {
var buf strings.Builder
for {
fr, ok := frs.Next()
if !ok {
break
}
fmt.Fprintf(&buf, "- %s:%d [%s]\n", fr.File, fr.Line, fr.Func.Name())
}
return buf.String()
}
type slowUpdate struct {
fn OnSlowUpdateFn
min time.Duration
sync.Mutex
}
type stringCap struct {
string
int
}
func unsafeBytes(s string) (out []byte) {
return *(*[]byte)(unsafe.Pointer(&stringCap{s, len(s)}))
}
var bufPool = genh.Pool[bufio.Writer]{
New: func() *bufio.Writer {
return bufio.NewWriterSize(nil, 8*1024*1024)
},
}
func getBuf(w io.Writer) *bufio.Writer {
if b, ok := w.(*bufio.Writer); ok {
return b
}
buf := bufPool.Get()
buf.Reset(w)
return buf
}
func putBufAndFlush(buf *bufio.Writer) error {
err := buf.Flush()
buf.Reset(nil)
bufPool.Put(buf)
return err
}