Skip to content

Commit

Permalink
more cleaning up
Browse files Browse the repository at this point in the history
  • Loading branch information
andyborn committed Aug 29, 2023
1 parent 76d571b commit a376fc8
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 24 deletions.
45 changes: 24 additions & 21 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func NewIncrementalConsumer(ctx context.Context, bucketURL string, opt *Consumer
_ = bucket.Close()
return nil, err
}
csm.(*consumer).ownRemote = true
csm.(*consumer).ownBucket = true
return csm, nil
}

Expand All @@ -121,30 +121,32 @@ func NewIncrementalConsumerForBucket(ctx context.Context, bucket bfs.Bucket, opt

ctx, stop := context.WithCancel(ctx)
c := &consumer{
remote: bfs.NewObjectFromBucket(bucket, "manifest.json"),
bucket: bucket,
opt: o,
ctx: ctx,
stop: stop,
cfn: cfn,
incremental: true,
remote: bfs.NewObjectFromBucket(bucket, "manifest.json"),
ownRemote: true,
bucket: bucket,
opt: o,
ctx: ctx,
stop: stop,
cfn: cfn,
}

return c.run()
}

type consumer struct {
remote *bfs.Object
bucket bfs.Bucket

opt ConsumerOptions
ctx context.Context
stop context.CancelFunc

cfn ConsumeFunc
data atomic.Value
remote *bfs.Object
ownRemote bool

bucket bfs.Bucket
ownBucket bool

opt ConsumerOptions
cfn ConsumeFunc

ownRemote, incremental bool
data atomic.Value
numRead, lastMod, lastSync, lastConsumed int64
}

Expand Down Expand Up @@ -176,14 +178,15 @@ func (c *consumer) LastModified() time.Time {
// Close implements Consumer interface.
func (c *consumer) Close() (err error) {
c.stop()
if c.ownRemote {
if c.ownRemote && c.remote != nil {
if e := c.remote.Close(); e != nil {
err = e
}
if c.bucket != nil {
if e := c.bucket.Close(); e != nil {
err = e
}
c.remote = nil
}
if c.ownBucket && c.bucket != nil {
if e := c.bucket.Close(); e != nil {
err = e
}
}
return
Expand Down Expand Up @@ -219,7 +222,7 @@ func (c *consumer) sync(force bool) (*ConsumerSync, error) {

// open remote reader
var reader *Reader
if c.incremental {
if c.bucket != nil {
if reader, err = c.newIncrementalReader(); err != nil {
return nil, err
}
Expand Down
8 changes: 5 additions & 3 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,12 @@ type Reader struct {
opt *ReaderOptions

remotes []*bfs.Object
cur *streamReader
ownRemotes bool
pos int
num int64

cur *streamReader
pos int

num int64
}

// NewReader inits a new reader.
Expand Down

0 comments on commit a376fc8

Please sign in to comment.