Skip to content

Commit

Permalink
Merge pull request #8 from deliveryhero/NCR-14625-apply-for-multitoken
Browse files Browse the repository at this point in the history
NCR-14625 add Apply
  • Loading branch information
solokirrik authored Jan 24, 2024
2 parents 05be24a + a3fb29d commit b8ea372
Show file tree
Hide file tree
Showing 23 changed files with 280 additions and 44 deletions.
2 changes: 1 addition & 1 deletion CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
@@ -1 +1 @@
* [email protected]
* [email protected]
95 changes: 72 additions & 23 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,59 @@ If you have another common use case you would like to see covered by this packag

## Cookbook

* [How to run a pipeline until the container is killed](https://github.com/deliveryhero/pipeline#pipelineshutsdownwhencontaineriskilled)
* [How to shut down a pipeline when there is a error](https://github.com/deliveryhero/pipeline#pipelineshutsdownonerror)
* [How to shut down a pipeline after it has finished processing a batch of data](https://github.com/deliveryhero/pipeline#pipelineshutsdownwheninputchannelisclosed)
* [How to run a pipeline until the container is killed](https://github.com/deliveryhero/pipeline#PipelineShutsDownWhenContainerIsKilled)
* [How to shut down a pipeline when there is a error](https://github.com/deliveryhero/pipeline#PipelineShutsDownOnError)
* [How to shut down a pipeline after it has finished processing a batch of data](https://github.com/deliveryhero/pipeline#PipelineShutsDownWhenInputChannelIsClosed)

## Functions

### func [Apply](/apply.go#L34)

`func Apply[A, B, C any](a Processor[A, []B], b Processor[B, C]) Processor[A, []C]`

Apply connects two processes, applying the second to each item of the first output

```golang
transform := pipeline.NewProcessor(func(_ context.Context, s string) ([]string, error) {
return strings.Split(s, ","), nil
}, nil)

double := pipeline.NewProcessor(func(_ context.Context, s string) (string, error) {
return s + s, nil
}, nil)

addLeadingZero := pipeline.NewProcessor(func(_ context.Context, s string) (string, error) {
return "0" + s, nil
}, nil)

apply := pipeline.Apply(
transform,
pipeline.Sequence(
double,
addLeadingZero,
double,
),
)

input := "1,2,3,4,5"

for out := range pipeline.Process(context.Background(), apply, pipeline.Emit(input)) {
for j := range out {
fmt.Printf("process: %s\n", out[j])
}
}
```

Output:

```
process: 011011
process: 022022
process: 033033
process: 044044
process: 055055
```

### func [Buffer](/buffer.go#L5)

`func Buffer[Item any](size int, in <-chan Item) <-chan Item`
Expand All @@ -43,12 +90,12 @@ p := pipeline.Delay(ctx, time.Second/4,

// If the context is canceled, pass the ints to the cancel func for teardown
p = pipeline.Cancel(ctx, func(i int, err error) {
log.Printf("%+v could not be processed, %s", i, err)
fmt.Printf("%+v could not be processed, %s\n", i, err)
}, p)

// Otherwise, process the inputs
for out := range p {
log.Printf("process: %+v", out)
fmt.Printf("process: %+v\n", out)
}
```

Expand Down Expand Up @@ -232,7 +279,6 @@ ProcessBatchConcurrently fans the in channel out to multiple batch Processors ru
then it fans the out channels of the batch Processors back into a single out chan

```golang

// Create a context that times out after 5 seconds
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
Expand All @@ -254,16 +300,18 @@ p = pipeline.ProcessBatchConcurrently(ctx, 2, 2, time.Minute, pipeline.NewProces
for result := range p {
fmt.Printf("result: %d\n", result)
}
```

// Example Output:
// result: 1
// result: 2
// result: 3
// result: 5
// error: could not process [7 8], context deadline exceeded
// error: could not process [4 6], context deadline exceeded
// error: could not process [9], context deadline exceeded
Output:

```
result: 1
result: 2
result: 3
result: 5
error: could not process [7 8], context deadline exceeded
error: could not process [4 6], context deadline exceeded
error: could not process [9], context deadline exceeded
```

### func [ProcessConcurrently](/process.go#L26)
Expand All @@ -274,7 +322,6 @@ ProcessConcurrently fans the in channel out to multiple Processors running concu
then it fans the out channels of the Processors back into a single out chan

```golang

// Create a context that times out after 5 seconds
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
Expand All @@ -296,16 +343,18 @@ p = pipeline.ProcessConcurrently(ctx, 2, pipeline.NewProcessor(func(ctx context.
for result := range p {
log.Printf("result: %d\n", result)
}
```

// Example Output:
// result: 2
// result: 1
// result: 4
// result: 3
// error: could not process 6, process was canceled
// error: could not process 5, process was canceled
// error: could not process 7, context deadline exceeded
Output:

```
result: 2
result: 1
result: 4
result: 3
error: could not process 6, process was canceled
error: could not process 5, process was canceled
error: could not process 7, context deadline exceeded
```

### func [Split](/split.go#L4)
Expand Down
40 changes: 40 additions & 0 deletions apply.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package pipeline

import "context"

type apply[A, B, C any] struct {
a Processor[A, []B]
b Processor[B, C]
}

func (j *apply[A, B, C]) Process(ctx context.Context, a A) ([]C, error) {
bs, err := j.a.Process(ctx, a)
if err != nil {
j.a.Cancel(a, err)
return []C{}, err
}

cs := make([]C, 0, len(bs))

for i := range bs {
c, err := j.b.Process(ctx, bs[i])
if err != nil {
j.b.Cancel(bs[i], err)
return cs, err
}

cs = append(cs, c)
}

return cs, nil
}

func (j *apply[A, B, C]) Cancel(_ A, _ error) {}

// Apply connects two processes, applying the second to each item of the first output
func Apply[A, B, C any](
a Processor[A, []B],
b Processor[B, C],
) Processor[A, []C] {
return &apply[A, B, C]{a, b}
}
47 changes: 47 additions & 0 deletions apply_example_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package pipeline_test

import (
"context"
"fmt"
"strings"

"github.com/deliveryhero/pipeline/v2"
)

func ExampleApply() {
transform := pipeline.NewProcessor(func(_ context.Context, s string) ([]string, error) {
return strings.Split(s, ","), nil
}, nil)

double := pipeline.NewProcessor(func(_ context.Context, s string) (string, error) {
return s + s, nil
}, nil)

addLeadingZero := pipeline.NewProcessor(func(_ context.Context, s string) (string, error) {
return "0" + s, nil
}, nil)

apply := pipeline.Apply(
transform,
pipeline.Sequence(
double,
addLeadingZero,
double,
),
)

input := "1,2,3,4,5"

for out := range pipeline.Process(context.Background(), apply, pipeline.Emit(input)) {
for j := range out {
fmt.Printf("process: %s\n", out[j])
}
}

// Output:
// process: 011011
// process: 022022
// process: 033033
// process: 044044
// process: 055055
}
59 changes: 59 additions & 0 deletions apply_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package pipeline

import (
"context"
"strings"
"testing"
)

func TestLoopApply(t *testing.T) {
t.Parallel()

transform := NewProcessor(func(_ context.Context, s string) ([]string, error) {
return strings.Split(s, ","), nil
}, nil)

double := NewProcessor(func(_ context.Context, s string) (string, error) {
return s + s, nil
}, nil)

addLeadingZero := NewProcessor(func(_ context.Context, s string) (string, error) {
return "0" + s, nil
}, nil)

looper := Apply(
transform,
Sequence(
double,
addLeadingZero,
double,
),
)

gotCount := 0
input := "1,2,3,4,5"
want := []string{"011011", "022022", "033033", "044044", "055055"}

for out := range Process(context.Background(), looper, Emit(input)) {
for j := range out {
gotCount++
if !contains(want, out[j]) {
t.Errorf("does not contains got=%v, want=%v", out[j], want)
}
}
}

if gotCount != len(want) {
t.Errorf("total results got=%v, want=%v", gotCount, len(want))
}
}

func contains(s []string, e string) bool {
for i := range s {
if s[i] == e {
return true
}
}

return false
}
6 changes: 3 additions & 3 deletions cancel_example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package pipeline_test

import (
"context"
"log"
"fmt"
"time"

"github.com/deliveryhero/pipeline/v2"
Expand All @@ -20,12 +20,12 @@ func ExampleCancel() {

// If the context is canceled, pass the ints to the cancel func for teardown
p = pipeline.Cancel(ctx, func(i int, err error) {
log.Printf("%+v could not be processed, %s", i, err)
fmt.Printf("%+v could not be processed, %s\n", i, err)
}, p)

// Otherwise, process the inputs
for out := range p {
log.Printf("process: %+v", out)
fmt.Printf("process: %+v\n", out)
}

// Output:
Expand Down
2 changes: 2 additions & 0 deletions cancel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
)

func TestCancel(t *testing.T) {
t.Parallel()

const testDuration = time.Second

// Collect logs
Expand Down
5 changes: 4 additions & 1 deletion collect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
// 5. After duration with nothing in the buffer, nothing is returned, channel remains open
// 6. Flushes the buffer if the context is canceled
func TestCollect(t *testing.T) {
t.Parallel()

const maxTestDuration = time.Second
type args struct {
maxSize int
Expand Down Expand Up @@ -109,6 +111,8 @@ func TestCollect(t *testing.T) {
},
}} {
t.Run(test.name, func(t *testing.T) {
t.Parallel()

// Create the in channel
in := make(chan int)
go func() {
Expand Down Expand Up @@ -152,7 +156,6 @@ func TestCollect(t *testing.T) {
if !reflect.DeepEqual(test.want.out, outs) {
t.Errorf("out = %v, want %v", outs, test.want.out)
}

})
}
}
4 changes: 4 additions & 0 deletions delay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
)

func TestDelay(t *testing.T) {
t.Parallel()

const maxTestDuration = time.Second
type args struct {
ctxTimeout time.Duration
Expand Down Expand Up @@ -57,6 +59,8 @@ func TestDelay(t *testing.T) {
},
}} {
t.Run(test.name, func(t *testing.T) {
t.Parallel()

// Create in channel
in := Emit(test.args.in...)

Expand Down
2 changes: 0 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
module github.com/deliveryhero/pipeline/v2

go 1.18

require github.com/deliveryhero/pipeline v0.4.0
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,2 +0,0 @@
github.com/deliveryhero/pipeline v0.4.0 h1:rQ6qHTApvVFouP9Y02k53KoFX+myuO6/OAxVX34iXvo=
github.com/deliveryhero/pipeline v0.4.0/go.mod h1:78CQfQT2DONSGPktr7X71xu333ZMPdrcYuV/gY/Mnkg=
2 changes: 2 additions & 0 deletions join_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
)

func TestJoin(t *testing.T) {
t.Parallel()

// Emit 10 numbers
want := []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
ins := Emit(want...)
Expand Down
Loading

0 comments on commit b8ea372

Please sign in to comment.