From 924b732b2726d602d367738b869ad71eb0eeb43a Mon Sep 17 00:00:00 2001 From: Andy Born Date: Thu, 31 Aug 2023 09:16:47 +0100 Subject: [PATCH] Feature/incremental consumer 2 (#52) * add incremental consumer --- consumer.go | 125 ++++++++++++++++++++++++++++++++-------- consumer_test.go | 144 ++++++++++++++++++++++++++++++++++------------- reader.go | 19 +++++-- 3 files changed, 220 insertions(+), 68 deletions(-) diff --git a/consumer.go b/consumer.go index 4e593d7..698bc37 100644 --- a/consumer.go +++ b/consumer.go @@ -21,8 +21,7 @@ type ConsumerOptions struct { AfterSync func(*ConsumerSync, error) } -func (o *ConsumerOptions) norm(name string) { - o.ReaderOptions.norm(name) +func (o *ConsumerOptions) norm() { if o.Interval <= 0 { o.Interval = time.Minute } @@ -82,7 +81,7 @@ func NewConsumerForRemote(ctx context.Context, remote *bfs.Object, opt *Consumer if opt != nil { o = *opt } - o.norm(remote.Name()) + o.norm() ctx, stop := context.WithCancel(ctx) c := &consumer{ @@ -93,29 +92,61 @@ func NewConsumerForRemote(ctx context.Context, remote *bfs.Object, opt *Consumer cfn: cfn, } - // run initial sync - if _, err := c.sync(true); err != nil { - _ = c.Close() + return c.run() +} + +// NewIncrementalConsumer starts a new incremental feed consumer. +func NewIncrementalConsumer(ctx context.Context, bucketURL string, opt *ConsumerOptions, cfn ConsumeFunc) (Consumer, error) { + bucket, err := bfs.Connect(ctx, bucketURL) + if err != nil { return nil, err } - // start continuous loop - go c.loop() + csm, err := NewIncrementalConsumerForBucket(ctx, bucket, opt, cfn) + if err != nil { + _ = bucket.Close() + return nil, err + } + csm.(*consumer).ownBucket = true + return csm, nil +} - return c, nil +// NewIncrementalConsumerForBucket starts a new incremental feed consumer with a bucket. +func NewIncrementalConsumerForBucket(ctx context.Context, bucket bfs.Bucket, opt *ConsumerOptions, cfn ConsumeFunc) (Consumer, error) { + var o ConsumerOptions + if opt != nil { + o = *opt + } + o.norm() + + ctx, stop := context.WithCancel(ctx) + c := &consumer{ + remote: bfs.NewObjectFromBucket(bucket, "manifest.json"), + ownRemote: true, + bucket: bucket, + opt: o, + ctx: ctx, + stop: stop, + cfn: cfn, + } + + return c.run() } type consumer struct { + ctx context.Context + stop context.CancelFunc + remote *bfs.Object ownRemote bool - opt ConsumerOptions - ctx context.Context - stop context.CancelFunc + bucket bfs.Bucket + ownBucket bool - cfn ConsumeFunc - data atomic.Value + opt ConsumerOptions + cfn ConsumeFunc + data atomic.Value numRead, lastMod, lastSync, lastConsumed int64 } @@ -145,19 +176,39 @@ func (c *consumer) LastModified() time.Time { } // Close implements Consumer interface. -func (c *consumer) Close() error { +func (c *consumer) Close() (err error) { c.stop() - if c.ownRemote { - return c.remote.Close() + if c.ownRemote && c.remote != nil { + if e := c.remote.Close(); e != nil { + err = e + } + c.remote = nil + } + if c.ownBucket && c.bucket != nil { + if e := c.bucket.Close(); e != nil { + err = e + } + c.bucket = nil + } + return +} + +func (c *consumer) run() (Consumer, error) { + // run initial sync + if _, err := c.sync(true); err != nil { + _ = c.Close() + return nil, err } - return nil + + // start continuous loop + go c.loop() + + return c, nil } func (c *consumer) sync(force bool) (*ConsumerSync, error) { syncTime := timestampFromTime(time.Now()).Millis() - defer func() { - atomic.StoreInt64(&c.lastSync, syncTime) - }() + defer atomic.StoreInt64(&c.lastSync, syncTime) // retrieve original last modified time lastMod, err := remoteLastModified(c.ctx, c.remote) @@ -171,9 +222,15 @@ func (c *consumer) sync(force bool) (*ConsumerSync, error) { } // open remote reader - reader, err := NewReader(c.ctx, c.remote, &c.opt.ReaderOptions) - if err != nil { - return nil, err + var reader *Reader + if c.isIncremental() { + if reader, err = c.newIncrementalReader(); err != nil { + return nil, err + } + } else { + if reader, err = NewReader(c.ctx, c.remote, &c.opt.ReaderOptions); err != nil { + return nil, err + } } defer reader.Close() @@ -212,3 +269,23 @@ func (c *consumer) loop() { } } } + +func (c *consumer) isIncremental() bool { + return c.bucket != nil +} + +func (c *consumer) newIncrementalReader() (*Reader, error) { + manifest, err := loadManifest(c.ctx, c.remote) + if err != nil { + return nil, err + } + + files := manifest.Files + remotes := make([]*bfs.Object, 0, len(files)) + for _, file := range files { + remotes = append(remotes, bfs.NewObjectFromBucket(c.bucket, file)) + } + r := MultiReader(c.ctx, remotes, &c.opt.ReaderOptions) + r.ownRemotes = true + return r, nil +} diff --git a/consumer_test.go b/consumer_test.go index 24231fa..e9036a5 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -31,54 +31,120 @@ var _ = Describe("Consumer", func() { return msgs, nil } - BeforeEach(func() { - obj = bfs.NewInMemObject("path/to/file.jsonz") - Expect(writeMulti(obj, 2, mockTime)).To(Succeed()) + Describe("NewConsumer", func() { + BeforeEach(func() { + obj = bfs.NewInMemObject("path/to/file.jsonz") + Expect(writeMulti(obj, 2, mockTime)).To(Succeed()) - var err error - subject, err = feedx.NewConsumerForRemote(ctx, obj, nil, consume) - Expect(err).NotTo(HaveOccurred()) - }) + var err error + subject, err = feedx.NewConsumerForRemote(ctx, obj, nil, consume) + Expect(err).NotTo(HaveOccurred()) + }) - AfterEach(func() { - Expect(subject.Close()).To(Succeed()) - }) + AfterEach(func() { + Expect(subject.Close()).To(Succeed()) + }) - It("syncs/retrieves feeds from remote", func() { - Expect(subject.LastSync()).To(BeTemporally("~", time.Now(), time.Second)) - Expect(subject.LastConsumed()).To(BeTemporally("==", subject.LastSync())) - Expect(subject.LastModified()).To(BeTemporally("==", mockTime.Truncate(time.Millisecond))) - Expect(subject.NumRead()).To(Equal(2)) - Expect(subject.Data()).To(ConsistOf(seed(), seed())) - Expect(subject.Close()).To(Succeed()) - }) + It("syncs/retrieves feeds from remote", func() { + Expect(subject.LastSync()).To(BeTemporally("~", time.Now(), time.Second)) + Expect(subject.LastConsumed()).To(BeTemporally("==", subject.LastSync())) + Expect(subject.LastModified()).To(BeTemporally("==", mockTime.Truncate(time.Millisecond))) + Expect(subject.NumRead()).To(Equal(2)) + Expect(subject.Data()).To(ConsistOf(seed(), seed())) + Expect(subject.Close()).To(Succeed()) + }) + + It("consumes feeds only if necessary", func() { + prevSync := subject.LastSync() + time.Sleep(2 * time.Millisecond) - It("consumes feeds only if necessary", func() { - prevSync := subject.LastSync() - time.Sleep(2 * time.Millisecond) + testable := subject.(interface{ TestSync() error }) + Expect(testable.TestSync()).To(Succeed()) + Expect(subject.LastSync()).To(BeTemporally(">", prevSync)) + Expect(subject.LastConsumed()).To(BeTemporally("==", prevSync)) // skipped on last sync + Expect(subject.LastModified()).To(BeTemporally("==", mockTime.Truncate(time.Millisecond))) + Expect(subject.NumRead()).To(Equal(2)) + }) - testable := subject.(interface{ TestSync() error }) - Expect(testable.TestSync()).To(Succeed()) - Expect(subject.LastSync()).To(BeTemporally(">", prevSync)) - Expect(subject.LastConsumed()).To(BeTemporally("==", prevSync)) // skipped on last sync - Expect(subject.LastModified()).To(BeTemporally("==", mockTime.Truncate(time.Millisecond))) - Expect(subject.NumRead()).To(Equal(2)) + It("always consumes if LastModified not set", func() { + noModTime := bfs.NewInMemObject("path/to/file.json") + Expect(writeMulti(noModTime, 2, time.Time{})).To(Succeed()) + + csmr, err := feedx.NewConsumerForRemote(ctx, noModTime, nil, consume) + Expect(err).NotTo(HaveOccurred()) + + prevSync := csmr.LastSync() + time.Sleep(2 * time.Millisecond) + + testable := csmr.(interface{ TestSync() error }) + Expect(testable.TestSync()).To(Succeed()) + Expect(csmr.LastSync()).To(BeTemporally(">", prevSync)) + Expect(csmr.LastConsumed()).To(BeTemporally("==", csmr.LastSync())) // consumed on last sync + Expect(csmr.LastModified()).To(BeTemporally("==", time.Unix(0, 0))) + }) }) - It("always consumes if LastModified not set", func() { - noModTime := bfs.NewInMemObject("path/to/file.json") - Expect(writeMulti(noModTime, 2, time.Time{})).To(Succeed()) + Describe("NewIncrementalConsumer", func() { + BeforeEach(func() { + bucket := bfs.NewInMem() + dataFile := bfs.NewObjectFromBucket(bucket, "data-0-20230501-120023123.jsonz") + Expect(writeMulti(dataFile, 2, mockTime)).To(Succeed()) + + manifest := &feedx.Manifest{ + LastModified: feedx.TimestampFromTime(mockTime), + Files: []string{dataFile.Name(), dataFile.Name()}, + } + writer := feedx.NewWriter(ctx, bfs.NewObjectFromBucket(bucket, "manifest.json"), &feedx.WriterOptions{LastMod: mockTime}) + defer writer.Discard() + + Expect(writer.Encode(manifest)).To(Succeed()) + Expect(writer.Commit()).To(Succeed()) + + var err error + subject, err = feedx.NewIncrementalConsumerForBucket(ctx, bucket, nil, consume) + Expect(err).NotTo(HaveOccurred()) + }) + + AfterEach(func() { + Expect(subject.Close()).To(Succeed()) + }) + + It("syncs/retrieves feeds from remote", func() { + Expect(subject.LastSync()).To(BeTemporally("~", time.Now(), time.Second)) + Expect(subject.LastConsumed()).To(BeTemporally("==", subject.LastSync())) + Expect(subject.LastModified()).To(BeTemporally("==", mockTime.Truncate(time.Millisecond))) + Expect(subject.NumRead()).To(Equal(4)) + Expect(subject.Data()).To(ConsistOf(seed(), seed(), seed(), seed())) + Expect(subject.Close()).To(Succeed()) + }) + + It("consumes feeds only if necessary", func() { + prevSync := subject.LastSync() + time.Sleep(2 * time.Millisecond) + + testable := subject.(interface{ TestSync() error }) + Expect(testable.TestSync()).To(Succeed()) + Expect(subject.LastSync()).To(BeTemporally(">", prevSync)) + Expect(subject.LastConsumed()).To(BeTemporally("==", prevSync)) // skipped on last sync + Expect(subject.LastModified()).To(BeTemporally("==", mockTime.Truncate(time.Millisecond))) + Expect(subject.NumRead()).To(Equal(4)) + }) + + It("always consumes if LastModified not set", func() { + noModTime := bfs.NewInMemObject("path/to/file.json") + Expect(writeMulti(noModTime, 2, time.Time{})).To(Succeed()) - csmr, err := feedx.NewConsumerForRemote(ctx, noModTime, nil, consume) - Expect(err).NotTo(HaveOccurred()) + csmr, err := feedx.NewConsumerForRemote(ctx, noModTime, nil, consume) + Expect(err).NotTo(HaveOccurred()) - prevSync := csmr.LastSync() - time.Sleep(2 * time.Millisecond) + prevSync := csmr.LastSync() + time.Sleep(2 * time.Millisecond) - testable := csmr.(interface{ TestSync() error }) - Expect(testable.TestSync()).To(Succeed()) - Expect(csmr.LastSync()).To(BeTemporally(">", prevSync)) - Expect(csmr.LastConsumed()).To(BeTemporally("==", csmr.LastSync())) // consumed on last sync - Expect(csmr.LastModified()).To(BeTemporally("==", time.Unix(0, 0))) + testable := csmr.(interface{ TestSync() error }) + Expect(testable.TestSync()).To(Succeed()) + Expect(csmr.LastSync()).To(BeTemporally(">", prevSync)) + Expect(csmr.LastConsumed()).To(BeTemporally("==", csmr.LastSync())) // consumed on last sync + Expect(csmr.LastModified()).To(BeTemporally("==", time.Unix(0, 0))) + }) }) }) diff --git a/reader.go b/reader.go index a806c35..a2fa8ee 100644 --- a/reader.go +++ b/reader.go @@ -34,10 +34,12 @@ type Reader struct { ctx context.Context opt *ReaderOptions - remotes []*bfs.Object - cur *streamReader + remotes []*bfs.Object + ownRemotes bool + cur *streamReader pos int + num int64 } @@ -116,11 +118,18 @@ func (r *Reader) LastModified() (time.Time, error) { } // Close closes the reader. -func (r *Reader) Close() error { +func (r *Reader) Close() (err error) { if r.cur != nil { - return r.cur.Close() + err = r.cur.Close() } - return nil + if r.ownRemotes { + for _, remote := range r.remotes { + if e := remote.Close(); e != nil { + err = e + } + } + } + return } func (r *Reader) ensureCurrent() bool {