Skip to content

Commit

Permalink
NCR-14625 fix examples and codeowners
Browse files Browse the repository at this point in the history
  • Loading branch information
solokirrik committed Jan 23, 2024
1 parent 8e1ba5a commit 84d323a
Show file tree
Hide file tree
Showing 7 changed files with 30 additions and 45 deletions.
2 changes: 1 addition & 1 deletion CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
@@ -1 +1 @@
* @deliveryhero/adtech-tracking-billing-sdk
* dh_adtech_tracking@deliveryhero.com
16 changes: 11 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,18 @@ If you have another common use case you would like to see covered by this packag

## Cookbook

* [How to run a pipeline until the container is killed](https://github.com/deliveryhero/pipeline#pipelineshutsdownwhencontaineriskilled)
* [How to shut down a pipeline when there is a error](https://github.com/deliveryhero/pipeline#pipelineshutsdownonerror)
* [How to shut down a pipeline after it has finished processing a batch of data](https://github.com/deliveryhero/pipeline#pipelineshutsdownwheninputchannelisclosed)
* [How to run a pipeline until the container is killed](https://github.com/deliveryhero/pipeline#PipelineShutsDownWhenContainerIsKilled)
* [How to shut down a pipeline when there is a error](https://github.com/deliveryhero/pipeline#PipelineShutsDownOnError)
* [How to shut down a pipeline after it has finished processing a batch of data](https://github.com/deliveryhero/pipeline#PipelineShutsDownWhenInputChannelIsClosed)

## Functions

### func [Apply](/apply.go#L34)

`func Apply[A, B, C any](a Processor[A, []B], b Processor[B, C]) Processor[A, []C]`

Apply connects two processes, applying the second to each item of the first output

### func [Buffer](/buffer.go#L5)

`func Buffer[Item any](size int, in <-chan Item) <-chan Item`
Expand All @@ -43,12 +49,12 @@ p := pipeline.Delay(ctx, time.Second/4,

// If the context is canceled, pass the ints to the cancel func for teardown
p = pipeline.Cancel(ctx, func(i int, err error) {
log.Printf("%+v could not be processed, %s", i, err)
fmt.Printf("%+v could not be processed, %s\n", i, err)
}, p)

// Otherwise, process the inputs
for out := range p {
log.Printf("process: %+v", out)
fmt.Printf("process: %+v\n", out)
}
```

Expand Down
11 changes: 4 additions & 7 deletions cancel_example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,13 @@ package pipeline_test

import (
"context"
"log"
"testing"
"fmt"
"time"

"github.com/deliveryhero/pipeline/v2"
)

func TestExampleCancel(t *testing.T) {
t.Parallel()

func ExampleCancel() {
// Create a context that lasts for 1 second
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
Expand All @@ -23,12 +20,12 @@ func TestExampleCancel(t *testing.T) {

// If the context is canceled, pass the ints to the cancel func for teardown
p = pipeline.Cancel(ctx, func(i int, err error) {
log.Printf("%+v could not be processed, %s", i, err)
fmt.Printf("%+v could not be processed, %s\n", i, err)
}, p)

// Otherwise, process the inputs
for out := range p {
log.Printf("process: %+v", out)
fmt.Printf("process: %+v\n", out)
}

// Output:
Expand Down
7 changes: 3 additions & 4 deletions merge_example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@ package pipeline_test

import (
"fmt"
"testing"

"github.com/deliveryhero/pipeline/v2"
)

func TestExampleMerge(t *testing.T) {
func ExampleMerge() {
one := pipeline.Emit(1)
two := pipeline.Emit(2, 2)
three := pipeline.Emit(3, 3, 3)
Expand All @@ -18,12 +17,12 @@ func TestExampleMerge(t *testing.T) {

fmt.Println("done")

// Output:
// Example Output:
// Output:: 1
// Output:: 3
// Output:: 2
// Output:: 2
// Output:: 3
// Output:: 3
// Output:: 3
// done
}
19 changes: 6 additions & 13 deletions pipeline_example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,14 @@ import (
"os"
"os/signal"
"syscall"
"testing"
"time"

"github.com/deliveryhero/pipeline/v2"
)

// The following example shows how you can shutdown a pipeline
// gracefully when it receives an error message
func TestExample_pipelineShutsDownOnError(t *testing.T) {
t.Parallel()

func Example_pipelineShutsDownOnError() {
// Create a context that can be canceled
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -44,7 +41,7 @@ func TestExample_pipelineShutsDownOnError(t *testing.T) {

fmt.Println("exiting the pipeline after all data is processed")

// Output:
// Example Output:
// could not process 2: 2 caused the shutdown
// result: 1
// could not process 3: context canceled
Expand All @@ -60,9 +57,7 @@ func TestExample_pipelineShutsDownOnError(t *testing.T) {

// The following example demonstrates a pipeline
// that naturally finishes its run when the input channel is closed
func TestExample_pipelineShutsDownWhenInputChannelIsClosed(t *testing.T) {
t.Parallel()

func Example_pipelineShutsDownWhenInputChannelIsClosed() {
// Create a context that can be canceled
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand All @@ -84,7 +79,7 @@ func TestExample_pipelineShutsDownWhenInputChannelIsClosed(t *testing.T) {

fmt.Println("exiting after the input channel is closed")

// Output:
// Example Output:
// result: 2
// result: 4
// result: 6
Expand All @@ -100,9 +95,7 @@ func TestExample_pipelineShutsDownWhenInputChannelIsClosed(t *testing.T) {

// This example demonstrates a pipline
// that runs until the os / container the pipline is running in kills it
func TestExample_pipelineShutsDownWhenContainerIsKilled(t *testing.T) {
t.Parallel()

func Example_pipelineShutsDownWhenContainerIsKilled() {
// Gracefully shutdown the pipeline when the the system is shutting down
// by canceling the context when os.Kill or os.Interrupt signal is sent
ctx, cancel := signal.NotifyContext(context.Background(), os.Kill, os.Interrupt)
Expand Down Expand Up @@ -139,7 +132,7 @@ func TestExample_pipelineShutsDownWhenContainerIsKilled(t *testing.T) {

fmt.Println("exiting after the input channel is closed")

// Output:
// Example Output:
// error processing '1': '1' is an odd number
// result: 2
//
Expand Down
11 changes: 3 additions & 8 deletions process_batch_example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,12 @@ package pipeline_test
import (
"context"
"fmt"
"testing"
"time"

"github.com/deliveryhero/pipeline/v2"
)

func TestExampleProcessBatch(t *testing.T) {
t.Parallel()

func ExampleProcessBatch() {
// Create a context that times out after 5 seconds
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
Expand Down Expand Up @@ -41,9 +38,7 @@ func TestExampleProcessBatch(t *testing.T) {
// error: could not multiply [5 6], context deadline exceeded
}

func TestExampleProcessBatchConcurrently(t *testing.T) {
t.Parallel()

func ExampleProcessBatchConcurrently() {
// Create a context that times out after 5 seconds
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
Expand All @@ -66,7 +61,7 @@ func TestExampleProcessBatchConcurrently(t *testing.T) {
fmt.Printf("result: %d\n", result)
}

// Example Output:
// Example Output
// result: 1
// result: 2
// result: 3
Expand Down
9 changes: 2 additions & 7 deletions process_example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,12 @@ import (
"context"
"fmt"
"log"
"testing"
"time"

"github.com/deliveryhero/pipeline/v2"
)

func TestExampleProcess(t *testing.T) {
t.Parallel()

func ExampleProcess() {
// Create a context that times out after 5 seconds
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
Expand Down Expand Up @@ -41,9 +38,7 @@ func TestExampleProcess(t *testing.T) {
// error: could not multiply 6, context deadline exceeded
}

func TestExampleProcessConcurrently(t *testing.T) {
t.Parallel()

func ExampleProcessConcurrently() {
// Create a context that times out after 5 seconds
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
Expand Down

0 comments on commit 84d323a

Please sign in to comment.