-
Notifications
You must be signed in to change notification settings - Fork 2
/
reader.go
84 lines (73 loc) · 1.57 KB
/
reader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
package json
import (
"io"
"strings"
"github.com/wal-g/json/internal/readbuffer"
)
const (
closeBufSize = 1 << 10
)
type streamReader struct {
buf strings.Builder
readBuf readbuffer.ReadBuffer
dropped int
finished bool
scanner *scanner
}
func newStreamReader(stream io.Reader) *streamReader {
return &streamReader{
buf: strings.Builder{},
readBuf: readbuffer.New(stream),
scanner: newScanner(),
}
}
func (s *streamReader) Len() int {
return len(s.buf.String()) + s.dropped
}
func (s *streamReader) Load(i int) error {
if i < s.Len() {
return nil
}
neededLen := i - s.Len() + 1
buf, err := s.readBuf.Get(neededLen)
n := len(buf)
for j := 0; j < n; j++ {
if opcode := s.scanner.step(s.scanner, buf[j]); opcode == scanError {
return s.scanner.err
}
}
s.buf.Write(buf[:n])
if err == io.EOF {
s.finished = true
if code := s.scanner.eof(); code == scanError {
return s.scanner.err
}
}
return err
}
func (s *streamReader) Get(i int) byte {
return s.buf.String()[i-s.dropped]
}
func (s *streamReader) Range(l, r int) []byte {
return []byte(s.buf.String()[l-s.dropped : r-s.dropped])
}
func (s *streamReader) Drop() {
s.dropped += s.buf.Len()
s.buf.Reset()
}
func (s *streamReader) Close() error {
buf, err := s.readBuf.Get(closeBufSize)
for err == nil {
for i := 0; i < len(buf); i++ {
if opCode := s.scanner.step(s.scanner, buf[i]); opCode == scanError {
return s.scanner.err
}
}
buf, err = s.readBuf.Get(closeBufSize)
}
if opCode := s.scanner.eof(); opCode == scanError {
return s.scanner.err
} else {
return nil
}
}