-
Notifications
You must be signed in to change notification settings - Fork 43
/
Copy pathfileseed.go
239 lines (217 loc) · 6.66 KB
/
fileseed.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
package desync
import (
"context"
"fmt"
"io"
"os"
"sync"
)
// FileSeed is used to copy or clone blocks from an existing index+blob during
// file extraction.
type FileSeed struct {
srcFile string
index Index
pos map[ChunkID][]int
canReflink bool
isInvalid bool
mu sync.RWMutex
}
// NewIndexSeed initializes a new seed that uses an existing index and its blob
func NewIndexSeed(dstFile string, srcFile string, index Index) (*FileSeed, error) {
s := FileSeed{
srcFile: srcFile,
pos: make(map[ChunkID][]int),
index: index,
canReflink: CanClone(dstFile, srcFile),
isInvalid: false,
}
for i, c := range s.index.Chunks {
s.pos[c.ID] = append(s.pos[c.ID], i)
}
return &s, nil
}
// LongestMatchWith returns the longest sequence of chunks anywhere in Source
// that match `chunks` starting at chunks[0], limiting the maximum number of chunks
// if reflinks are not supported. If there is no match, it returns a length of zero
// and a nil SeedSegment.
func (s *FileSeed) LongestMatchWith(chunks []IndexChunk) (int, SeedSegment) {
s.mu.RLock()
// isInvalid can be concurrently read or wrote. Use a mutex to avoid a race
if len(chunks) == 0 || len(s.index.Chunks) == 0 || s.isInvalid {
return 0, nil
}
s.mu.RUnlock()
pos, ok := s.pos[chunks[0].ID]
if !ok {
return 0, nil
}
// From every position of chunks[0] in the source, find a slice of
// matching chunks. Then return the longest of those slices.
var (
match []IndexChunk
max int
limit int
)
if !s.canReflink {
// Limit the maximum number of chunks, in a single sequence, to avoid
// having jobs that are too unbalanced.
// However, if reflinks are supported, we don't limit it to make it faster and
// take less space.
limit = 100
}
for _, p := range pos {
m := s.maxMatchFrom(chunks, p, limit)
if len(m) > max {
match = m
max = len(m)
}
if limit != 0 && limit == max {
break
}
}
return max, newFileSeedSegment(s.srcFile, match, s.canReflink)
}
func (s *FileSeed) RegenerateIndex(ctx context.Context, n int, attempt int, seedNumber int) error {
chunkingPrefix := fmt.Sprintf("Attempt %d: Chunking Seed %d ", attempt, seedNumber)
index, _, err := IndexFromFile(ctx, s.srcFile, n, s.index.Index.ChunkSizeMin, s.index.Index.ChunkSizeAvg,
s.index.Index.ChunkSizeMax, NewProgressBar(chunkingPrefix))
if err != nil {
return err
}
s.index = index
s.SetInvalid(false)
s.pos = make(map[ChunkID][]int, len(s.index.Chunks))
for i, c := range s.index.Chunks {
s.pos[c.ID] = append(s.pos[c.ID], i)
}
return nil
}
func (s *FileSeed) SetInvalid(value bool) {
s.mu.Lock()
defer s.mu.Unlock()
s.isInvalid = value
}
func (s *FileSeed) IsInvalid() bool {
s.mu.Lock()
defer s.mu.Unlock()
return s.isInvalid
}
// Returns a slice of chunks from the seed. Compares chunks from position 0
// with seed chunks starting at p. A "limit" value of zero means that there is no limit.
func (s *FileSeed) maxMatchFrom(chunks []IndexChunk, p int, limit int) []IndexChunk {
if len(chunks) == 0 {
return nil
}
var (
sp int
dp = p
)
for {
if limit != 0 && sp == limit {
break
}
if dp >= len(s.index.Chunks) || sp >= len(chunks) {
break
}
if chunks[sp].ID != s.index.Chunks[dp].ID {
break
}
dp++
sp++
}
return s.index.Chunks[p:dp]
}
type fileSeedSegment struct {
file string
chunks []IndexChunk
canReflink bool
needValidation bool
}
func newFileSeedSegment(file string, chunks []IndexChunk, canReflink bool) *fileSeedSegment {
return &fileSeedSegment{
canReflink: canReflink,
file: file,
chunks: chunks,
}
}
func (s *fileSeedSegment) FileName() string {
return s.file
}
func (s *fileSeedSegment) Size() uint64 {
if len(s.chunks) == 0 {
return 0
}
last := s.chunks[len(s.chunks)-1]
return last.Start + last.Size - s.chunks[0].Start
}
func (s *fileSeedSegment) WriteInto(dst *os.File, offset, length, blocksize uint64, isBlank bool) (uint64, uint64, error) {
if length != s.Size() {
return 0, 0, fmt.Errorf("unable to copy %d bytes from %s to %s : wrong size", length, s.file, dst.Name())
}
src, err := os.Open(s.file)
if err != nil {
return 0, 0, err
}
defer src.Close()
// Do a straight copy if reflinks are not supported or blocks aren't aligned
if !s.canReflink || s.chunks[0].Start%blocksize != offset%blocksize {
return s.copy(dst, src, s.chunks[0].Start, length, offset)
}
return s.clone(dst, src, s.chunks[0].Start, length, offset, blocksize)
}
// Validate compares all chunks in this slice of the seed index to the underlying data
// and fails if they don't match.
func (s *fileSeedSegment) Validate(file *os.File) error {
for _, c := range s.chunks {
b := make([]byte, c.Size)
if _, err := file.ReadAt(b, int64(c.Start)); err != nil {
return err
}
sum := Digest.Sum(b)
if sum != c.ID {
return fmt.Errorf("seed index for %s doesn't match its data", s.file)
}
}
return nil
}
// Performs a plain copy of everything in the seed to the target, not cloning
// of blocks.
func (s *fileSeedSegment) copy(dst, src *os.File, srcOffset, length, dstOffset uint64) (uint64, uint64, error) {
if _, err := dst.Seek(int64(dstOffset), os.SEEK_SET); err != nil {
return 0, 0, err
}
if _, err := src.Seek(int64(srcOffset), os.SEEK_SET); err != nil {
return 0, 0, err
}
// Copy using a fixed buffer. Using io.Copy() with a LimitReader will make it
// create a buffer matching N of the LimitReader which can be too large
copied, err := io.CopyBuffer(dst, io.LimitReader(src, int64(length)), make([]byte, 64*1024))
return uint64(copied), 0, err
}
// Reflink the overlapping blocks in the two ranges and copy the bit before and
// after the blocks.
func (s *fileSeedSegment) clone(dst, src *os.File, srcOffset, srcLength, dstOffset, blocksize uint64) (uint64, uint64, error) {
if srcOffset%blocksize != dstOffset%blocksize {
return 0, 0, fmt.Errorf("reflink ranges not aligned between %s and %s", src.Name(), dst.Name())
}
srcAlignStart := (srcOffset/blocksize + 1) * blocksize
srcAlignEnd := (srcOffset + srcLength) / blocksize * blocksize
dstAlignStart := (dstOffset/blocksize + 1) * blocksize
alignLength := srcAlignEnd - srcAlignStart
dstAlignEnd := dstAlignStart + alignLength
// fill the area before the first aligned block
var copied uint64
c1, _, err := s.copy(dst, src, srcOffset, srcAlignStart-srcOffset, dstOffset)
if err != nil {
return c1, 0, err
}
copied += c1
// fill the area after the last aligned block
c2, _, err := s.copy(dst, src, srcAlignEnd, srcOffset+srcLength-srcAlignEnd, dstAlignEnd)
if err != nil {
return copied + c2, 0, err
}
copied += c2
// close the aligned blocks
return copied, alignLength, CloneRange(dst, src, srcAlignStart, alignLength, dstAlignStart)
}