Skip to content

Commit

Permalink
Feature/incremental consumer 2 (#52)
Browse files Browse the repository at this point in the history
* add incremental consumer
  • Loading branch information
andyborn authored Aug 31, 2023
1 parent f205401 commit 924b732
Show file tree
Hide file tree
Showing 3 changed files with 220 additions and 68 deletions.
125 changes: 101 additions & 24 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ type ConsumerOptions struct {
AfterSync func(*ConsumerSync, error)
}

func (o *ConsumerOptions) norm(name string) {
o.ReaderOptions.norm(name)
func (o *ConsumerOptions) norm() {
if o.Interval <= 0 {
o.Interval = time.Minute
}
Expand Down Expand Up @@ -82,7 +81,7 @@ func NewConsumerForRemote(ctx context.Context, remote *bfs.Object, opt *Consumer
if opt != nil {
o = *opt
}
o.norm(remote.Name())
o.norm()

ctx, stop := context.WithCancel(ctx)
c := &consumer{
Expand All @@ -93,29 +92,61 @@ func NewConsumerForRemote(ctx context.Context, remote *bfs.Object, opt *Consumer
cfn: cfn,
}

// run initial sync
if _, err := c.sync(true); err != nil {
_ = c.Close()
return c.run()
}

// NewIncrementalConsumer starts a new incremental feed consumer.
func NewIncrementalConsumer(ctx context.Context, bucketURL string, opt *ConsumerOptions, cfn ConsumeFunc) (Consumer, error) {
bucket, err := bfs.Connect(ctx, bucketURL)
if err != nil {
return nil, err
}

// start continuous loop
go c.loop()
csm, err := NewIncrementalConsumerForBucket(ctx, bucket, opt, cfn)
if err != nil {
_ = bucket.Close()
return nil, err
}
csm.(*consumer).ownBucket = true
return csm, nil
}

return c, nil
// NewIncrementalConsumerForBucket starts a new incremental feed consumer with a bucket.
func NewIncrementalConsumerForBucket(ctx context.Context, bucket bfs.Bucket, opt *ConsumerOptions, cfn ConsumeFunc) (Consumer, error) {
var o ConsumerOptions
if opt != nil {
o = *opt
}
o.norm()

ctx, stop := context.WithCancel(ctx)
c := &consumer{
remote: bfs.NewObjectFromBucket(bucket, "manifest.json"),
ownRemote: true,
bucket: bucket,
opt: o,
ctx: ctx,
stop: stop,
cfn: cfn,
}

return c.run()
}

type consumer struct {
ctx context.Context
stop context.CancelFunc

remote *bfs.Object
ownRemote bool

opt ConsumerOptions
ctx context.Context
stop context.CancelFunc
bucket bfs.Bucket
ownBucket bool

cfn ConsumeFunc
data atomic.Value
opt ConsumerOptions
cfn ConsumeFunc

data atomic.Value
numRead, lastMod, lastSync, lastConsumed int64
}

Expand Down Expand Up @@ -145,19 +176,39 @@ func (c *consumer) LastModified() time.Time {
}

// Close implements Consumer interface.
func (c *consumer) Close() error {
func (c *consumer) Close() (err error) {
c.stop()
if c.ownRemote {
return c.remote.Close()
if c.ownRemote && c.remote != nil {
if e := c.remote.Close(); e != nil {
err = e
}
c.remote = nil
}
if c.ownBucket && c.bucket != nil {
if e := c.bucket.Close(); e != nil {
err = e
}
c.bucket = nil
}
return
}

func (c *consumer) run() (Consumer, error) {
// run initial sync
if _, err := c.sync(true); err != nil {
_ = c.Close()
return nil, err
}
return nil

// start continuous loop
go c.loop()

return c, nil
}

func (c *consumer) sync(force bool) (*ConsumerSync, error) {
syncTime := timestampFromTime(time.Now()).Millis()
defer func() {
atomic.StoreInt64(&c.lastSync, syncTime)
}()
defer atomic.StoreInt64(&c.lastSync, syncTime)

// retrieve original last modified time
lastMod, err := remoteLastModified(c.ctx, c.remote)
Expand All @@ -171,9 +222,15 @@ func (c *consumer) sync(force bool) (*ConsumerSync, error) {
}

// open remote reader
reader, err := NewReader(c.ctx, c.remote, &c.opt.ReaderOptions)
if err != nil {
return nil, err
var reader *Reader
if c.isIncremental() {
if reader, err = c.newIncrementalReader(); err != nil {
return nil, err
}
} else {
if reader, err = NewReader(c.ctx, c.remote, &c.opt.ReaderOptions); err != nil {
return nil, err
}
}
defer reader.Close()

Expand Down Expand Up @@ -212,3 +269,23 @@ func (c *consumer) loop() {
}
}
}

func (c *consumer) isIncremental() bool {
return c.bucket != nil
}

func (c *consumer) newIncrementalReader() (*Reader, error) {
manifest, err := loadManifest(c.ctx, c.remote)
if err != nil {
return nil, err
}

files := manifest.Files
remotes := make([]*bfs.Object, 0, len(files))
for _, file := range files {
remotes = append(remotes, bfs.NewObjectFromBucket(c.bucket, file))
}
r := MultiReader(c.ctx, remotes, &c.opt.ReaderOptions)
r.ownRemotes = true
return r, nil
}
144 changes: 105 additions & 39 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,54 +31,120 @@ var _ = Describe("Consumer", func() {
return msgs, nil
}

BeforeEach(func() {
obj = bfs.NewInMemObject("path/to/file.jsonz")
Expect(writeMulti(obj, 2, mockTime)).To(Succeed())
Describe("NewConsumer", func() {
BeforeEach(func() {
obj = bfs.NewInMemObject("path/to/file.jsonz")
Expect(writeMulti(obj, 2, mockTime)).To(Succeed())

var err error
subject, err = feedx.NewConsumerForRemote(ctx, obj, nil, consume)
Expect(err).NotTo(HaveOccurred())
})
var err error
subject, err = feedx.NewConsumerForRemote(ctx, obj, nil, consume)
Expect(err).NotTo(HaveOccurred())
})

AfterEach(func() {
Expect(subject.Close()).To(Succeed())
})
AfterEach(func() {
Expect(subject.Close()).To(Succeed())
})

It("syncs/retrieves feeds from remote", func() {
Expect(subject.LastSync()).To(BeTemporally("~", time.Now(), time.Second))
Expect(subject.LastConsumed()).To(BeTemporally("==", subject.LastSync()))
Expect(subject.LastModified()).To(BeTemporally("==", mockTime.Truncate(time.Millisecond)))
Expect(subject.NumRead()).To(Equal(2))
Expect(subject.Data()).To(ConsistOf(seed(), seed()))
Expect(subject.Close()).To(Succeed())
})
It("syncs/retrieves feeds from remote", func() {
Expect(subject.LastSync()).To(BeTemporally("~", time.Now(), time.Second))
Expect(subject.LastConsumed()).To(BeTemporally("==", subject.LastSync()))
Expect(subject.LastModified()).To(BeTemporally("==", mockTime.Truncate(time.Millisecond)))
Expect(subject.NumRead()).To(Equal(2))
Expect(subject.Data()).To(ConsistOf(seed(), seed()))
Expect(subject.Close()).To(Succeed())
})

It("consumes feeds only if necessary", func() {
prevSync := subject.LastSync()
time.Sleep(2 * time.Millisecond)

It("consumes feeds only if necessary", func() {
prevSync := subject.LastSync()
time.Sleep(2 * time.Millisecond)
testable := subject.(interface{ TestSync() error })
Expect(testable.TestSync()).To(Succeed())
Expect(subject.LastSync()).To(BeTemporally(">", prevSync))
Expect(subject.LastConsumed()).To(BeTemporally("==", prevSync)) // skipped on last sync
Expect(subject.LastModified()).To(BeTemporally("==", mockTime.Truncate(time.Millisecond)))
Expect(subject.NumRead()).To(Equal(2))
})

testable := subject.(interface{ TestSync() error })
Expect(testable.TestSync()).To(Succeed())
Expect(subject.LastSync()).To(BeTemporally(">", prevSync))
Expect(subject.LastConsumed()).To(BeTemporally("==", prevSync)) // skipped on last sync
Expect(subject.LastModified()).To(BeTemporally("==", mockTime.Truncate(time.Millisecond)))
Expect(subject.NumRead()).To(Equal(2))
It("always consumes if LastModified not set", func() {
noModTime := bfs.NewInMemObject("path/to/file.json")
Expect(writeMulti(noModTime, 2, time.Time{})).To(Succeed())

csmr, err := feedx.NewConsumerForRemote(ctx, noModTime, nil, consume)
Expect(err).NotTo(HaveOccurred())

prevSync := csmr.LastSync()
time.Sleep(2 * time.Millisecond)

testable := csmr.(interface{ TestSync() error })
Expect(testable.TestSync()).To(Succeed())
Expect(csmr.LastSync()).To(BeTemporally(">", prevSync))
Expect(csmr.LastConsumed()).To(BeTemporally("==", csmr.LastSync())) // consumed on last sync
Expect(csmr.LastModified()).To(BeTemporally("==", time.Unix(0, 0)))
})
})

It("always consumes if LastModified not set", func() {
noModTime := bfs.NewInMemObject("path/to/file.json")
Expect(writeMulti(noModTime, 2, time.Time{})).To(Succeed())
Describe("NewIncrementalConsumer", func() {
BeforeEach(func() {
bucket := bfs.NewInMem()
dataFile := bfs.NewObjectFromBucket(bucket, "data-0-20230501-120023123.jsonz")
Expect(writeMulti(dataFile, 2, mockTime)).To(Succeed())

manifest := &feedx.Manifest{
LastModified: feedx.TimestampFromTime(mockTime),
Files: []string{dataFile.Name(), dataFile.Name()},
}
writer := feedx.NewWriter(ctx, bfs.NewObjectFromBucket(bucket, "manifest.json"), &feedx.WriterOptions{LastMod: mockTime})
defer writer.Discard()

Expect(writer.Encode(manifest)).To(Succeed())
Expect(writer.Commit()).To(Succeed())

var err error
subject, err = feedx.NewIncrementalConsumerForBucket(ctx, bucket, nil, consume)
Expect(err).NotTo(HaveOccurred())
})

AfterEach(func() {
Expect(subject.Close()).To(Succeed())
})

It("syncs/retrieves feeds from remote", func() {
Expect(subject.LastSync()).To(BeTemporally("~", time.Now(), time.Second))
Expect(subject.LastConsumed()).To(BeTemporally("==", subject.LastSync()))
Expect(subject.LastModified()).To(BeTemporally("==", mockTime.Truncate(time.Millisecond)))
Expect(subject.NumRead()).To(Equal(4))
Expect(subject.Data()).To(ConsistOf(seed(), seed(), seed(), seed()))
Expect(subject.Close()).To(Succeed())
})

It("consumes feeds only if necessary", func() {
prevSync := subject.LastSync()
time.Sleep(2 * time.Millisecond)

testable := subject.(interface{ TestSync() error })
Expect(testable.TestSync()).To(Succeed())
Expect(subject.LastSync()).To(BeTemporally(">", prevSync))
Expect(subject.LastConsumed()).To(BeTemporally("==", prevSync)) // skipped on last sync
Expect(subject.LastModified()).To(BeTemporally("==", mockTime.Truncate(time.Millisecond)))
Expect(subject.NumRead()).To(Equal(4))
})

It("always consumes if LastModified not set", func() {
noModTime := bfs.NewInMemObject("path/to/file.json")
Expect(writeMulti(noModTime, 2, time.Time{})).To(Succeed())

csmr, err := feedx.NewConsumerForRemote(ctx, noModTime, nil, consume)
Expect(err).NotTo(HaveOccurred())
csmr, err := feedx.NewConsumerForRemote(ctx, noModTime, nil, consume)
Expect(err).NotTo(HaveOccurred())

prevSync := csmr.LastSync()
time.Sleep(2 * time.Millisecond)
prevSync := csmr.LastSync()
time.Sleep(2 * time.Millisecond)

testable := csmr.(interface{ TestSync() error })
Expect(testable.TestSync()).To(Succeed())
Expect(csmr.LastSync()).To(BeTemporally(">", prevSync))
Expect(csmr.LastConsumed()).To(BeTemporally("==", csmr.LastSync())) // consumed on last sync
Expect(csmr.LastModified()).To(BeTemporally("==", time.Unix(0, 0)))
testable := csmr.(interface{ TestSync() error })
Expect(testable.TestSync()).To(Succeed())
Expect(csmr.LastSync()).To(BeTemporally(">", prevSync))
Expect(csmr.LastConsumed()).To(BeTemporally("==", csmr.LastSync())) // consumed on last sync
Expect(csmr.LastModified()).To(BeTemporally("==", time.Unix(0, 0)))
})
})
})
19 changes: 14 additions & 5 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,12 @@ type Reader struct {
ctx context.Context
opt *ReaderOptions

remotes []*bfs.Object
cur *streamReader
remotes []*bfs.Object
ownRemotes bool

cur *streamReader
pos int

num int64
}

Expand Down Expand Up @@ -116,11 +118,18 @@ func (r *Reader) LastModified() (time.Time, error) {
}

// Close closes the reader.
func (r *Reader) Close() error {
func (r *Reader) Close() (err error) {
if r.cur != nil {
return r.cur.Close()
err = r.cur.Close()
}
return nil
if r.ownRemotes {
for _, remote := range r.remotes {
if e := remote.Close(); e != nil {
err = e
}
}
}
return
}

func (r *Reader) ensureCurrent() bool {
Expand Down

0 comments on commit 924b732

Please sign in to comment.