Skip to content

Commit

Permalink
introduce csync and cchan utility packages
Browse files Browse the repository at this point in the history
  • Loading branch information
lovromazgon committed Oct 6, 2023
1 parent 0426464 commit 2c5de03
Show file tree
Hide file tree
Showing 11 changed files with 816 additions and 301 deletions.
9 changes: 5 additions & 4 deletions destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/conduitio/conduit-connector-protocol/cpluginv1"
"github.com/conduitio/conduit-connector-sdk/internal"
"github.com/conduitio/conduit-connector-sdk/internal/csync"
"go.uber.org/multierr"
)

Expand Down Expand Up @@ -99,7 +100,7 @@ func NewDestinationPlugin(impl Destination) cpluginv1.DestinationPlugin {
type destinationPluginAdapter struct {
impl Destination

lastPosition *internal.AtomicValueWatcher[Position]
lastPosition *csync.ValueWatcher[Position]
openCancel context.CancelFunc

// write is the chosen write strategy, either single records or batches
Expand Down Expand Up @@ -166,7 +167,7 @@ func (a *destinationPluginAdapter) configureWriteStrategy(ctx context.Context, c
}

func (a *destinationPluginAdapter) Start(ctx context.Context, _ cpluginv1.DestinationStartRequest) (cpluginv1.DestinationStartResponse, error) {
a.lastPosition = new(internal.AtomicValueWatcher[Position])
a.lastPosition = new(csync.ValueWatcher[Position])

// detach context, so we can control when it's canceled
ctxOpen := internal.DetachContext(ctx)
Expand Down Expand Up @@ -205,7 +206,7 @@ func (a *destinationPluginAdapter) Run(ctx context.Context, stream cpluginv1.Des
err = a.writeStrategy.Write(ctx, r, func(err error) error {
return a.ack(r, err, stream)
})
a.lastPosition.Store(r.Position)
a.lastPosition.Set(r.Position)
if err != nil {
return err
}
Expand Down Expand Up @@ -237,7 +238,7 @@ func (a *destinationPluginAdapter) Stop(ctx context.Context, req cpluginv1.Desti
defer cancel()

// wait for last record to be received
err := a.lastPosition.Await(waitCtx, func(val Position) bool {
_, err := a.lastPosition.Watch(waitCtx, func(val Position) bool {
return bytes.Equal(val, req.LastPosition)
})

Expand Down
117 changes: 0 additions & 117 deletions internal/atomicvaluewatcher.go

This file was deleted.

160 changes: 0 additions & 160 deletions internal/atomicvaluewatcher_test.go

This file was deleted.

Loading

0 comments on commit 2c5de03

Please sign in to comment.