Skip to content

Commit

Permalink
changes from review
Browse files Browse the repository at this point in the history
  • Loading branch information
andyborn committed Aug 29, 2023
1 parent 16a8035 commit 76d571b
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 30 deletions.
39 changes: 17 additions & 22 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@ type ConsumerOptions struct {
// AfterSync callbacks are triggered after each sync, receiving
// the sync state and error (if occurred).
AfterSync func(*ConsumerSync, error)

// incremental is a flag to denote if feed was written by incremental producer
incremental bool
}

func (o *ConsumerOptions) norm() {
Expand Down Expand Up @@ -120,26 +117,25 @@ func NewIncrementalConsumerForBucket(ctx context.Context, bucket bfs.Bucket, opt
if opt != nil {
o = *opt
}
o.incremental = true
o.norm()

ctx, stop := context.WithCancel(ctx)
c := &consumer{
remote: bfs.NewObjectFromBucket(bucket, "manifest.json"),
bucket: bucket,
opt: o,
ctx: ctx,
stop: stop,
cfn: cfn,
remote: bfs.NewObjectFromBucket(bucket, "manifest.json"),
bucket: bucket,
opt: o,
ctx: ctx,
stop: stop,
cfn: cfn,
incremental: true,
}

return c.run()
}

type consumer struct {
remote *bfs.Object
bucket bfs.Bucket
ownRemote bool
remote *bfs.Object
bucket bfs.Bucket

opt ConsumerOptions
ctx context.Context
Expand All @@ -148,6 +144,7 @@ type consumer struct {
cfn ConsumeFunc
data atomic.Value

ownRemote, incremental bool
numRead, lastMod, lastSync, lastConsumed int64
}

Expand Down Expand Up @@ -222,14 +219,10 @@ func (c *consumer) sync(force bool) (*ConsumerSync, error) {

// open remote reader
var reader *Reader
if c.opt.incremental {
var remotes []*bfs.Object
if reader, remotes, err = c.newIncrementalReader(); err != nil {
if c.incremental {
if reader, err = c.newIncrementalReader(); err != nil {
return nil, err
}
for _, r := range remotes {
defer r.Close()
}
} else {
if reader, err = NewReader(c.ctx, c.remote, &c.opt.ReaderOptions); err != nil {
return nil, err
Expand Down Expand Up @@ -273,16 +266,18 @@ func (c *consumer) loop() {
}
}

func (c *consumer) newIncrementalReader() (*Reader, []*bfs.Object, error) {
func (c *consumer) newIncrementalReader() (*Reader, error) {
manifest, err := loadManifest(c.ctx, c.remote)
if err != nil {
return nil, nil, err
return nil, err
}

files := manifest.Files
remotes := make([]*bfs.Object, 0, len(files))
for _, file := range files {
remotes = append(remotes, bfs.NewObjectFromBucket(c.bucket, file))
}
return MultiReader(c.ctx, remotes, &c.opt.ReaderOptions), remotes, nil
r := MultiReader(c.ctx, remotes, &c.opt.ReaderOptions)
r.ownRemotes = true
return r, nil
}
23 changes: 15 additions & 8 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ type Reader struct {
ctx context.Context
opt *ReaderOptions

remotes []*bfs.Object
cur *streamReader

pos int
num int64
remotes []*bfs.Object
cur *streamReader
ownRemotes bool
pos int
num int64
}

// NewReader inits a new reader.
Expand Down Expand Up @@ -116,11 +116,18 @@ func (r *Reader) LastModified() (time.Time, error) {
}

// Close closes the reader.
func (r *Reader) Close() error {
func (r *Reader) Close() (err error) {
if r.cur != nil {
return r.cur.Close()
err = r.cur.Close()
}
return nil
if r.ownRemotes {
for _, remote := range r.remotes {
if e := remote.Close(); e != nil {
err = e
}
}
}
return
}

func (r *Reader) ensureCurrent() bool {
Expand Down

0 comments on commit 76d571b

Please sign in to comment.