Skip to content

Commit

Permalink
more from comments
Browse files Browse the repository at this point in the history
  • Loading branch information
andyborn committed Aug 11, 2023
1 parent 8752624 commit c462535
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 14 deletions.
23 changes: 15 additions & 8 deletions incremental.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@ package feedx

import (
"context"
"sync/atomic"
"time"

"github.com/bsm/bfs"
)

// IncrmentalProduceFunc returns a ProduceFunc closure around an incremental mod time
// IncrmentalProduceFunc returns a ProduceFunc closure around an incremental mod time.
type IncrementalProduceFunc func(time.Time) ProduceFunc

// IncrementalProducer produces a continuous incremental feed.
Expand All @@ -25,6 +24,7 @@ type IncrementalProducer struct {
ipfn IncrementalProduceFunc
}

// IncrementalProducerOptions configure the producer instance.
type IncrementalProducerOptions struct {
ProducerOptions
}
Expand Down Expand Up @@ -92,12 +92,19 @@ func NewIncrementalProducerForBucket(ctx context.Context, bucket bfs.Bucket, opt
}

// Close stops the producer.
func (p *IncrementalProducer) Close() error {
func (p *IncrementalProducer) Close() (err error) {
p.stop()
if p.manifest != nil {
if e := p.manifest.Close(); e != nil {
err = e
}
}
if p.ownBucket {
return p.bucket.Close()
if e := p.bucket.Close(); e != nil {
err = e
}
}
return p.manifest.Close()
return
}

func (p *IncrementalProducer) loop() {
Expand All @@ -119,7 +126,7 @@ func (p *IncrementalProducer) loop() {

func (p *IncrementalProducer) push() (*ProducerPush, error) {
start := time.Now()
atomic.StoreInt64(&p.lastPush, timestampFromTime(start).Millis())
p.ProducerState.updateLastPush(start)

// get last mod time for local records
localLastMod, err := p.opt.LastModCheck(p.ctx)
Expand Down Expand Up @@ -155,8 +162,8 @@ func (p *IncrementalProducer) push() (*ProducerPush, error) {
return nil, err
}

atomic.StoreInt64(&p.numWritten, int64(numWritten))
atomic.StoreInt64(&p.lastMod, timestampFromTime(wopt.LastMod).Millis())
p.ProducerState.updateNumWritten(numWritten)
p.ProducerState.updateLastModified(wopt.LastMod)
return &ProducerPush{ProducerState: p.ProducerState, Updated: true}, nil
}

Expand Down
25 changes: 19 additions & 6 deletions producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,33 @@ type ProducerState struct {
numWritten, lastPush, lastMod int64
}

// LastPush implements ProducerState
// LastPush returns time of last push attempt.
func (p *ProducerState) LastPush() time.Time {
return timestamp(atomic.LoadInt64(&p.lastPush)).Time()
}

// LastModified implements ProducerState
// LastModified returns time at which the remote feed was last modified.
func (p *ProducerState) LastModified() time.Time {
return timestamp(atomic.LoadInt64(&p.lastMod)).Time()
}

// NumWritten implements ProducerState
// NumWritten returns the number of values produced during the last push.
func (p *ProducerState) NumWritten() int {
return int(atomic.LoadInt64(&p.numWritten))
}

func (p *ProducerState) updateLastPush(t time.Time) {
atomic.StoreInt64(&p.lastPush, timestampFromTime(t).Millis())
}

func (p *ProducerState) updateLastModified(t time.Time) {
atomic.StoreInt64(&p.lastMod, timestampFromTime(t).Millis())
}

func (p *ProducerState) updateNumWritten(n int) {
atomic.StoreInt64(&p.numWritten, int64(n))
}

// ProducerOptions configure the producer instance.
type ProducerOptions struct {
WriterOptions
Expand Down Expand Up @@ -135,7 +147,7 @@ func (p *Producer) Close() error {

func (p *Producer) push() (*ProducerPush, error) {
start := time.Now()
atomic.StoreInt64(&p.lastPush, timestampFromTime(start).Millis())
p.ProducerState.updateLastPush(start)

// setup write options
wopt := p.opt.WriterOptions
Expand Down Expand Up @@ -166,8 +178,9 @@ func (p *Producer) push() (*ProducerPush, error) {
return nil, err
}

atomic.StoreInt64(&p.numWritten, int64(writer.NumWritten()))
atomic.StoreInt64(&p.lastMod, timestampFromTime(wopt.LastMod).Millis())
p.ProducerState.updateNumWritten(writer.NumWritten())
p.ProducerState.updateLastModified(wopt.LastMod)

return &ProducerPush{
ProducerState: p.ProducerState,
Updated: true,
Expand Down

0 comments on commit c462535

Please sign in to comment.