-
Notifications
You must be signed in to change notification settings - Fork 43
/
sparse-file.go
336 lines (295 loc) · 8.62 KB
/
sparse-file.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
package desync
import (
"errors"
"io"
"io/ioutil"
"os"
"sort"
"sync"
"github.com/boljen/go-bitmap"
)
// SparseFile represents a file that is written as it is read (Copy-on-read). It is
// used as a fast cache. Any chunk read from the store to satisfy a read operation
// is written to the file.
type SparseFile struct {
name string
idx Index
opt SparseFileOptions
loader *sparseFileLoader
}
type SparseFileOptions struct {
// Optional, save the state of the sparse file on exit or SIGHUP. The state file
// contains information which chunks from the index have been read and are
// populated in the sparse file. If the state and sparse file exist and match,
// the sparse file is used as is (not re-populated).
StateSaveFile string
// Optional, load all chunks that are marked as read in this state file. It is used
// to pre-populate a new sparse file if the sparse file or the save state file aren't
// present or don't match the index. SaveStateFile and StateInitFile can be the same.
StateInitFile string
// Optional, number of goroutines to preload chunks from StateInitFile.
StateInitConcurrency int
}
// SparseFileHandle is used to access a sparse file. All read operations performed
// on the handle are either done on the file if the required ranges are available
// or loaded from the store and written to the file.
type SparseFileHandle struct {
sf *SparseFile
file *os.File
}
func NewSparseFile(name string, idx Index, s Store, opt SparseFileOptions) (*SparseFile, error) {
f, err := os.OpenFile(name, os.O_WRONLY|os.O_CREATE, 0755)
if err != nil {
return nil, err
}
defer f.Close()
loader := newSparseFileLoader(name, idx, s)
sf := &SparseFile{
name: name,
idx: idx,
loader: loader,
opt: opt,
}
// Simple check to see if the file is correct for the given index by
// just comparing the size. If it's not, then just reset the file and
// don't load a state.
stat, err := f.Stat()
if err != nil {
return nil, err
}
sparseFileMatch := stat.Size() == idx.Length()
// If the sparse-file looks like it's of the right size, and we have a
// save state file, try to use those. No need to further initialize if
// that's successful
if sparseFileMatch && opt.StateSaveFile != "" {
stateFile, err := os.Open(opt.StateSaveFile)
if err == nil {
defer stateFile.Close()
// If we can load the state file, we have everything needed,
// no need to initialize it.
if err := loader.loadState(stateFile); err == nil {
return sf, nil
}
}
}
// Create the new file at full size, that was we can skip loading null-chunks,
// this should be a NOP if the file matches the index size already.
if err = f.Truncate(idx.Length()); err != nil {
return nil, err
}
// Try to initialize the sparse file from a prior state file if one is provided.
// This will concurrently load all chunks marked "done" in the state file and
// write them to the sparse file.
if opt.StateInitFile != "" {
initFile, err := os.Open(opt.StateInitFile)
if err != nil {
return nil, err
}
defer initFile.Close()
if err := loader.preloadChunksFromState(initFile, opt.StateInitConcurrency); err != nil {
return nil, err
}
}
return sf, nil
}
// Open returns a handle for a sparse file.
func (sf *SparseFile) Open() (*SparseFileHandle, error) {
file, err := os.Open(sf.name)
return &SparseFileHandle{
sf: sf,
file: file,
}, err
}
// Length returns the size of the index used for the sparse file.
func (sf *SparseFile) Length() int64 {
return sf.idx.Length()
}
// WriteState saves the state of file, basically which chunks were loaded
// and which ones weren't.
func (sf *SparseFile) WriteState() error {
if sf.opt.StateSaveFile == "" {
return nil
}
f, err := os.Create(sf.opt.StateSaveFile)
if err != nil {
return err
}
defer f.Close()
return sf.loader.writeState(f)
}
// ReadAt reads from the sparse file. All accessed ranges are first written
// to the file and then returned.
func (h *SparseFileHandle) ReadAt(b []byte, offset int64) (int, error) {
if err := h.sf.loader.loadRange(offset, int64(len(b))); err != nil {
return 0, err
}
return h.file.ReadAt(b, offset)
}
func (h *SparseFileHandle) Close() error {
return h.file.Close()
}
type sparseIndexChunk struct {
IndexChunk
once sync.Once
}
// Loader for sparse files
type sparseFileLoader struct {
name string
done bitmap.Bitmap
mu sync.RWMutex
s Store
nullChunk *NullChunk
chunks []*sparseIndexChunk
}
func newSparseFileLoader(name string, idx Index, s Store) *sparseFileLoader {
chunks := make([]*sparseIndexChunk, 0, len(idx.Chunks))
for _, c := range idx.Chunks {
chunks = append(chunks, &sparseIndexChunk{IndexChunk: c})
}
return &sparseFileLoader{
name: name,
done: bitmap.New(len(idx.Chunks)),
chunks: chunks,
s: s,
nullChunk: NewNullChunk(idx.Index.ChunkSizeMax),
}
}
// For a given byte range, returns the index of the first and last chunk needed to populate it
func (l *sparseFileLoader) indexRange(start, length int64) (int, int) {
end := uint64(start + length - 1)
firstChunk := sort.Search(len(l.chunks), func(i int) bool { return start < int64(l.chunks[i].Start+l.chunks[i].Size) })
if length < 1 {
return firstChunk, firstChunk
}
if firstChunk >= len(l.chunks) { // reading past the end, load the last chunk
return len(l.chunks) - 1, len(l.chunks) - 1
}
// Could do another binary search to find the last, but in reality, most reads are short enough to fall
// into one or two chunks only, so may as well use a for loop here.
lastChunk := firstChunk
for i := firstChunk + 1; i < len(l.chunks); i++ {
if end < l.chunks[i].Start {
break
}
lastChunk++
}
return firstChunk, lastChunk
}
// Loads all the chunks needed to populate the given byte range (if not already loaded)
func (l *sparseFileLoader) loadRange(start, length int64) error {
first, last := l.indexRange(start, length)
var chunksNeeded []int
l.mu.RLock()
for i := first; i <= last; i++ {
b := l.done.Get(i)
if b {
continue
}
// The file is truncated and blank, so no need to load null chunks
if l.chunks[i].ID == l.nullChunk.ID {
continue
}
chunksNeeded = append(chunksNeeded, i)
}
l.mu.RUnlock()
// TODO: Load the chunks concurrently
for _, chunk := range chunksNeeded {
if err := l.loadChunk(chunk); err != nil {
return err
}
}
return nil
}
func (l *sparseFileLoader) loadChunk(i int) error {
var loadErr error
l.chunks[i].once.Do(func() {
c, err := l.s.GetChunk(l.chunks[i].ID)
if err != nil {
loadErr = err
return
}
b, err := c.Data()
if err != nil {
loadErr = err
return
}
f, err := os.OpenFile(l.name, os.O_RDWR, 0666)
if err != nil {
loadErr = err
return
}
defer f.Close()
if _, err := f.WriteAt(b, int64(l.chunks[i].Start)); err != nil {
loadErr = err
return
}
l.mu.Lock()
l.done.Set(i, true)
l.mu.Unlock()
})
return loadErr
}
// writeState saves the current internal state about which chunks have
// been loaded. It's a bitmap of the
// same length as the index, with 0 = chunk has not been loaded and
// 1 = chunk has been loaded.
func (l *sparseFileLoader) writeState(w io.Writer) error {
l.mu.Lock()
defer l.mu.Unlock()
_, err := w.Write(l.done.Data(false))
return err
}
// loadState reads the "done" state from a reader. It's expected to be
// a list of '0' and '1' bytes where 0 means the chunk hasn't been
// written to the sparse file yet.
func (l *sparseFileLoader) loadState(r io.Reader) error {
done, err := l.stateFromReader(r)
if err != nil {
return err
}
l.mu.Lock()
defer l.mu.Unlock()
l.done = done
return nil
}
// Starts n goroutines to pre-load chunks that were marked as "done" in a state
// file.
func (l *sparseFileLoader) preloadChunksFromState(r io.Reader, n int) error {
state, err := l.stateFromReader(r)
if err != nil {
return err
}
// Start the workers for parallel pre-loading
ch := make(chan int)
for i := 0; i < n; i++ {
go func() {
for chunkIdx := range ch {
_ = l.loadChunk(chunkIdx)
}
}()
}
// Start the feeder. Iterate over the chunks and see if any of them
// are marked done in the state. If so, load those chunks.
go func() {
for chunkIdx := range l.chunks {
if state.Get(chunkIdx) {
ch <- chunkIdx
}
}
close(ch)
}()
return nil
}
func (l *sparseFileLoader) stateFromReader(r io.Reader) (bitmap.Bitmap, error) {
b, err := ioutil.ReadAll(r)
if err != nil {
return nil, err
}
// Very basic check that the state file really is for the sparse
// file and not something else.
chunks := len(l.chunks)
if (chunks%8 == 0 && len(b) != chunks/8) || (chunks%8 != 0 && len(b) != 1+chunks/8) {
return nil, errors.New("sparse state file does not match the index")
}
return b, nil
}