Skip to content

Commit

Permalink
add multi reader (#49)
Browse files Browse the repository at this point in the history
* add multi reader
  • Loading branch information
andyborn authored Aug 25, 2023
1 parent 32599d6 commit 467ee0e
Show file tree
Hide file tree
Showing 3 changed files with 197 additions and 65 deletions.
2 changes: 1 addition & 1 deletion consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func (c *consumer) sync(force bool) (*ConsumerSync, error) {
// update stores
previous := c.data.Load()
c.data.Store(data)
atomic.StoreInt64(&c.numRead, int64(reader.NumRead()))
atomic.StoreInt64(&c.numRead, reader.NumRead())
atomic.StoreInt64(&c.lastMod, lastMod.Millis())
atomic.StoreInt64(&c.lastConsumed, syncTime)
return &ConsumerSync{
Expand Down
166 changes: 130 additions & 36 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package feedx

import (
"context"
"errors"
"io"
"time"

Expand Down Expand Up @@ -30,75 +31,168 @@ func (o *ReaderOptions) norm(name string) {

// Reader reads data from a remote feed.
type Reader struct {
remote *bfs.Object
opt ReaderOptions
ctx context.Context
num int
ctx context.Context
opt *ReaderOptions

br io.ReadCloser // bfs reader
cr io.ReadCloser // compression reader
fd FormatDecoder
remotes []*bfs.Object
cur *streamReader

pos int
num int64
}

// NewReader inits a new reader.
func NewReader(ctx context.Context, remote *bfs.Object, opt *ReaderOptions) (*Reader, error) {
var o ReaderOptions
if opt != nil {
o = *opt
}
o.norm(remote.Name())
return MultiReader(ctx, []*bfs.Object{remote}, opt), nil
}

// MultiReader inits a new reader for multiple remotes. Remotes are read sequentially as if concatenated.
// Once all remotes are fully read, Read will return EOF.
func MultiReader(ctx context.Context, remotes []*bfs.Object, opt *ReaderOptions) *Reader {
return &Reader{
remote: remote,
opt: o,
ctx: ctx,
}, nil
remotes: remotes,
opt: opt,
ctx: ctx,
}
}

// Read reads raw bytes from the feed.
// At end of feed, Read returns 0, io.EOF.
func (r *Reader) Read(p []byte) (int, error) {
if err := r.ensureOpen(); err != nil {
return 0, err
if !r.ensureCurrent() {
return 0, io.EOF
}

return r.cr.Read(p)
n, err := r.cur.Read(p)
if errors.Is(err, io.EOF) {
if more, err := r.nextRemote(); err != nil {
return n, err
} else if more {
return n, nil // dont return EOF until all remotes read
}
}
return n, err
}

// Decode decodes the next formatted value from the feed.
// At end of feed, Read returns io.EOF.
func (r *Reader) Decode(v interface{}) error {
if err := r.ensureOpen(); err != nil {
return err
if !r.ensureCurrent() {
return io.EOF
}

if r.fd == nil {
fd, err := r.opt.Format.NewDecoder(r.cr)
if err != nil {
err := r.cur.Decode(v)
if errors.Is(err, io.EOF) {
if more, err := r.nextRemote(); err != nil {
return err
} else if more {
return r.Decode(v) // start decoding from next remote
}
r.fd = fd
} else if err == nil {
r.num++
}

if err := r.fd.Decode(v); err != nil {
return err
}

r.num++
return nil
return err
}

// NumRead returns the number of read values.
func (r *Reader) NumRead() int {
func (r *Reader) NumRead() int64 {
return r.num
}

// LastModified returns the last modified time of the remote feed.
func (r *Reader) LastModified() (time.Time, error) {
lastMod, err := remoteLastModified(r.ctx, r.remote)
return lastMod.Time(), err
var lastMod timestamp
for _, remote := range r.remotes {
t, err := remoteLastModified(r.ctx, remote)
if err != nil {
return time.Time{}, err
}
if t > lastMod {
lastMod = t
}
}

return lastMod.Time(), nil
}

// Close closes the reader.
func (r *Reader) Close() error {
if r.cur != nil {
return r.cur.Close()
}
return nil
}

func (r *Reader) ensureCurrent() bool {
if r.pos >= len(r.remotes) {
return false
}

if r.cur == nil {
remote := r.remotes[r.pos]

var o ReaderOptions
if r.opt != nil {
o = *r.opt
}
o.norm(remote.Name())

r.cur = &streamReader{
remote: remote,
opt: o,
ctx: r.ctx,
}
}
return true
}

func (r *Reader) nextRemote() (bool, error) {
if err := r.cur.Close(); err != nil {
return false, err
}
// unset current, increment cursor
r.cur = nil
r.pos++
return r.pos < len(r.remotes), nil
}

type streamReader struct {
remote *bfs.Object
opt ReaderOptions
ctx context.Context

br io.ReadCloser // bfs reader
cr io.ReadCloser // compression reader
fd FormatDecoder
}

// Read reads raw bytes from the feed.
func (r *streamReader) Read(p []byte) (int, error) {
if err := r.ensureOpen(); err != nil {
return 0, err
}
return r.cr.Read(p)
}

// Decode decodes the next formatted value from the feed.
func (r *streamReader) Decode(v interface{}) error {
if err := r.ensureOpen(); err != nil {
return err
}

if r.fd == nil {
fd, err := r.opt.Format.NewDecoder(r.cr)
if err != nil {
return err
}
r.fd = fd
}

return r.fd.Decode(v)
}

// Close closes the reader.
func (r *streamReader) Close() error {
var err error
if r.fd != nil {
if e := r.fd.Close(); e != nil {
Expand All @@ -118,7 +212,7 @@ func (r *Reader) Close() error {
return err
}

func (r *Reader) ensureOpen() error {
func (r *streamReader) ensureOpen() error {
if r.br == nil {
br, err := r.remote.Open(r.ctx)
if err != nil {
Expand Down
94 changes: 66 additions & 28 deletions reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package feedx_test
import (
"context"
"io"
"io/ioutil"
"time"

"github.com/bsm/bfs"
Expand All @@ -18,39 +17,78 @@ var _ = Describe("Reader", func() {
var obj *bfs.Object
var ctx = context.Background()

BeforeEach(func() {
obj = bfs.NewInMemObject("path/to/file.json")
Expect(writeMulti(obj, 3, time.Time{})).To(Succeed())
Describe("NewReader", func() {
BeforeEach(func() {
obj = bfs.NewInMemObject("path/to/file.json.gz")
Expect(writeMulti(obj, 3, time.Time{})).To(Succeed())

var err error
subject, err = feedx.NewReader(ctx, obj, nil)
Expect(err).NotTo(HaveOccurred())
})
var err error
subject, err = feedx.NewReader(ctx, obj, nil)
Expect(err).NotTo(HaveOccurred())
})

AfterEach(func() {
Expect(subject.Close()).To(Succeed())
})
AfterEach(func() {
Expect(subject.Close()).To(Succeed())
})

It("reads", func() {
data, err := ioutil.ReadAll(subject)
Expect(err).NotTo(HaveOccurred())
Expect(len(data)).To(BeNumerically("~", 110, 20))
Expect(subject.NumRead()).To(Equal(0))
})
It("reads", func() {
data, err := io.ReadAll(subject)
Expect(err).NotTo(HaveOccurred())
Expect(len(data)).To(Equal(111))
Expect(subject.NumRead()).To(Equal(int64(0)))
})

It("decodes", func() {
var msgs []*testdata.MockMessage
for {
var msg testdata.MockMessage
err := subject.Decode(&msg)
if err == io.EOF {
break
It("decodes", func() {
var msgs []*testdata.MockMessage
for {
var msg testdata.MockMessage
err := subject.Decode(&msg)
if err == io.EOF {
break
}
Expect(err).NotTo(HaveOccurred())
msgs = append(msgs, &msg)
}

Expect(msgs).To(ConsistOf(seed(), seed(), seed()))
Expect(subject.NumRead()).To(Equal(int64(3)))
})
})

Describe("MultiReader", func() {
BeforeEach(func() {
obj = bfs.NewInMemObject("path/to/file.json.gz")
Expect(writeMulti(obj, 3, time.Time{})).To(Succeed())

subject = feedx.MultiReader(ctx, []*bfs.Object{obj, obj}, nil)
})

AfterEach(func() {
Expect(subject.Close()).To(Succeed())
})

It("reads", func() {
data, err := io.ReadAll(subject)
Expect(err).NotTo(HaveOccurred())
msgs = append(msgs, &msg)
}
Expect(len(data)).To(Equal(222))
Expect(subject.NumRead()).To(Equal(int64(0)))
})

Expect(msgs).To(ConsistOf(seed(), seed(), seed()))
Expect(subject.NumRead()).To(Equal(3))
It("decodes", func() {
var msgs []*testdata.MockMessage
for {
var msg testdata.MockMessage
err := subject.Decode(&msg)
if err == io.EOF {
break
}
Expect(err).NotTo(HaveOccurred())
msgs = append(msgs, &msg)
}

Expect(msgs).To(ConsistOf(seed(), seed(), seed(), seed(), seed(), seed()))
Expect(subject.NumRead()).To(Equal(int64(6)))
})
})

})

0 comments on commit 467ee0e

Please sign in to comment.