Skip to content

Commit

Permalink
improve error messages for action not implemented (#82)
Browse files Browse the repository at this point in the history
  • Loading branch information
lovromazgon authored Jul 27, 2023
1 parent 4374355 commit b1cb077
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 15 deletions.
7 changes: 3 additions & 4 deletions destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,19 +406,18 @@ func (w *writeStrategyBatch) Write(ctx context.Context, r Record, ack func(error
Msg("last batch was flushed asynchronously")
if result.Err != nil {
return fmt.Errorf("last batch write failed: %w", result.Err)
// TODO should we stop writing new records altogether and just nack? probably
}
default:
// last batch was not flushed yet
}

result := w.batcher.Enqueue(writeBatchItem{
status := w.batcher.Enqueue(writeBatchItem{
ctx: ctx,
record: r,
ack: ack,
})

switch result {
switch status {
case internal.Scheduled:
// This message was scheduled for the next batch.
// We need to check the results channel of the previous batch, in case
Expand All @@ -445,7 +444,7 @@ func (w *writeStrategyBatch) Write(ctx context.Context, r Record, ack func(error
Msg("batch was flushed synchronously")
return result.Err
default:
return fmt.Errorf("unknown batcher enqueue status type: %T", result)
return fmt.Errorf("unknown batcher enqueue status: %v", status)
}
}
func (w *writeStrategyBatch) Flush(ctx context.Context) error {
Expand Down
2 changes: 1 addition & 1 deletion error.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ var (

// ErrUnimplemented is returned in functions of plugins that don't implement
// a certain method.
ErrUnimplemented = errors.New("action not implemented")
ErrUnimplemented = errors.New("the connector plugin does not implement this action, please check the source code of the connector and make sure all required connector methods are implemented")

// ErrMetadataFieldNotFound is returned in metadata utility functions when a
// metadata field is not found.
Expand Down
23 changes: 13 additions & 10 deletions unimplemented.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@

package sdk

import "context"
import (
"context"
"fmt"
)

// UnimplementedDestination should be embedded to have forward compatible implementations.
type UnimplementedDestination struct{}
Expand All @@ -26,22 +29,22 @@ func (UnimplementedDestination) Parameters() map[string]Parameter {

// Configure needs to be overridden in the actual implementation.
func (UnimplementedDestination) Configure(context.Context, map[string]string) error {
return ErrUnimplemented
return fmt.Errorf("action \"Configure\": %w", ErrUnimplemented)
}

// Open needs to be overridden in the actual implementation.
func (UnimplementedDestination) Open(context.Context) error {
return ErrUnimplemented
return fmt.Errorf("action \"Open\": %w", ErrUnimplemented)
}

// Write needs to be overridden in the actual implementation.
func (UnimplementedDestination) Write(context.Context, []Record) (int, error) {
return 0, ErrUnimplemented
return 0, fmt.Errorf("action \"Write\": %w", ErrUnimplemented)
}

// Teardown needs to be overridden in the actual implementation.
func (UnimplementedDestination) Teardown(context.Context) error {
return ErrUnimplemented
return fmt.Errorf("action \"Teardown\": %w", ErrUnimplemented)
}

// LifecycleOnCreated won't do anything by default.
Expand Down Expand Up @@ -71,28 +74,28 @@ func (UnimplementedSource) Parameters() map[string]Parameter {

// Configure needs to be overridden in the actual implementation.
func (UnimplementedSource) Configure(context.Context, map[string]string) error {
return ErrUnimplemented
return fmt.Errorf("action \"Configure\": %w", ErrUnimplemented)
}

// Open needs to be overridden in the actual implementation.
func (UnimplementedSource) Open(context.Context, Position) error {
return ErrUnimplemented
return fmt.Errorf("action \"Open\": %w", ErrUnimplemented)
}

// Read needs to be overridden in the actual implementation.
func (UnimplementedSource) Read(context.Context) (Record, error) {
return Record{}, ErrUnimplemented
return Record{}, fmt.Errorf("action \"Read\": %w", ErrUnimplemented)
}

// Ack should be overridden if acks need to be forwarded to the source,
// otherwise it is optional.
func (UnimplementedSource) Ack(context.Context, Position) error {
return ErrUnimplemented
return fmt.Errorf("action \"Ack\": %w", ErrUnimplemented)
}

// Teardown needs to be overridden in the actual implementation.
func (UnimplementedSource) Teardown(context.Context) error {
return ErrUnimplemented
return fmt.Errorf("action \"Teardown\": %w", ErrUnimplemented)
}

// LifecycleOnCreated won't do anything by default.
Expand Down

0 comments on commit b1cb077

Please sign in to comment.