forked from folbricht/desync
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathuntar.go
156 lines (145 loc) · 3.55 KB
/
untar.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package desync
import (
"bytes"
"context"
"fmt"
"io"
"reflect"
"golang.org/x/sync/errgroup"
)
// UnTar implements the untar command, decoding a catar file and writing the
// contained tree to a target directory.
func UnTar(ctx context.Context, r io.Reader, fs FilesystemWriter) error {
dec := NewArchiveDecoder(r)
loop:
for {
// See if we're meant to stop
select {
case <-ctx.Done():
return Interrupted{}
default:
}
c, err := dec.Next()
if err != nil {
return err
}
switch n := c.(type) {
case NodeDirectory:
err = fs.CreateDir(n)
case NodeFile:
err = fs.CreateFile(n)
case NodeDevice:
err = fs.CreateDevice(n)
case NodeSymlink:
err = fs.CreateSymlink(n)
case nil:
break loop
default:
err = fmt.Errorf("unsupported type %s", reflect.TypeOf(c))
}
if err != nil {
return err
}
}
return nil
}
// UnTarIndex takes an index file (of a chunked catar), re-assembles the catar
// and decodes it on-the-fly into the target directory 'dst'. Uses n gorountines
// to retrieve and decompress the chunks.
func UnTarIndex(ctx context.Context, fs FilesystemWriter, index Index, s Store, n int, pb ProgressBar) error {
type requestJob struct {
chunk IndexChunk // requested chunk
data chan ([]byte) // channel for the (decompressed) chunk
}
var (
req = make(chan requestJob)
assemble = make(chan chan []byte, n)
)
g, ctx := errgroup.WithContext(ctx)
// Initialize and start progress bar if one was provided
pb.SetTotal(len(index.Chunks))
pb.Start()
defer pb.Finish()
// Use a pipe as input to untar and write the chunks into that (in the right
// order of course)
r, w := io.Pipe()
// Workers - getting chunks from the store
for i := 0; i < n; i++ {
g.Go(func() error {
for r := range req {
// Pull the chunk from the store
chunk, err := s.GetChunk(r.chunk.ID)
if err != nil {
close(r.data)
return err
}
b, err := chunk.Data()
if err != nil {
close(r.data)
return err
}
// Might as well verify the chunk size while we're at it
if r.chunk.Size != uint64(len(b)) {
close(r.data)
return fmt.Errorf("unexpected size for chunk %s", r.chunk.ID)
}
r.data <- b
close(r.data)
}
return nil
})
}
// Feeder - requesting chunks from the workers and handing a result data channel
// to the assembler
g.Go(func() error {
loop:
for _, c := range index.Chunks {
data := make(chan []byte, 1)
select {
case <-ctx.Done():
break loop
case req <- requestJob{chunk: c, data: data}: // request the chunk
select {
case <-ctx.Done():
break loop
case assemble <- data: // and hand over the data channel to the assembler
}
}
}
close(req) // tell the workers this is it
close(assemble) // tell the assembler we're done
return nil
})
// Assember - Read from data channels push the chunks into the pipe that untar reads from
g.Go(func() error {
defer w.Close() // No more chunks to come, stop the untar
loop:
for {
select {
case data := <-assemble:
if data == nil {
break loop
}
pb.Increment()
b := <-data
if _, err := io.Copy(w, bytes.NewReader(b)); err != nil {
return err
}
case <-ctx.Done():
break loop
}
}
return nil
})
// UnTar - Read from the pipe that Assembler pushes into
g.Go(func() error {
err := UnTar(ctx, r, fs)
if err != nil {
// If an error has occurred during the UnTar, we need to stop the Assembler.
// If we don't, then it would stall on writing to the pipe.
r.CloseWithError(err)
}
return err
})
return g.Wait()
}