-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtarball.go
107 lines (97 loc) · 2.4 KB
/
tarball.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
package main
import (
"archive/tar"
"bytes"
"context"
"errors"
"fmt"
"log"
"strconv"
"time"
"github.com/ZwickyTransientFacility/ztf-go-archivist/schema"
"github.com/actgardner/gogen-avro/container"
)
func tarAlertStream(stream *AlertStream, tarWriter *tar.Writer, progress chan progressReport) (int, error) {
var (
total = 0
batch = progressReport{}
progressTicker = time.NewTicker(updateInterval)
overallTimer = time.NewTimer(*maxRuntime)
lastMessage = time.Now()
)
defer progressTicker.Stop()
defer overallTimer.Stop()
for {
// Emit progress updates, and eventually exit
select {
case <-overallTimer.C:
// Time's up!
total += batch.nEvents
return total, nil
case <-progressTicker.C:
progress <- batch
total += batch.nEvents
batch = progressReport{}
default:
}
ctx, cancel := context.WithTimeout(context.Background(), messageTimeout)
defer cancel()
alert, err := stream.NextAlert(ctx)
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
// Could just be a quiet period.
if time.Since(lastMessage) > *maxQuietPeriod {
// We've had a long silence. There's probably no more data coming.
return total, nil
}
continue
}
total += batch.nEvents
return total, fmt.Errorf("error retrieving data: %v", err)
}
err = writeAlert(tarWriter, alert)
if err != nil {
log.Fatalf("error writing to tar: %v", err)
}
batch.nEvents += 1
lastMessage = time.Now()
}
}
func writeAlert(w *tar.Writer, a *schema.Alert) error {
// Each Alert gets a full OCF framing wrapper with no compression. This is
// wildly inefficient, but it's what has been done historically, and it's very
// simple.
buf := bytes.NewBuffer(nil)
aw, err := schema.NewAlertWriter(buf, container.Null, 64)
if err != nil {
return fmt.Errorf("making an alert writer: %w", err)
}
err = aw.WriteRecord(a)
if err != nil {
return fmt.Errorf("writing alert: %w", err)
}
err = aw.Flush()
if err != nil {
return fmt.Errorf("flushing alert write: %w", err)
}
h := &tar.Header{
Name: strconv.FormatInt(a.Candid, 10) + ".avro",
Size: int64(buf.Len()),
ModTime: time.Now(),
Mode: 0x744,
Typeflag: tar.TypeReg,
Uid: 0,
Gid: 0,
Uname: "root",
Gname: "root",
}
err = w.WriteHeader(h)
if err != nil {
return err
}
_, err = w.Write(buf.Bytes())
if err != nil {
return err
}
return nil
}