Skip to content

Commit

Permalink
Improve ctxpropagation sample
Browse files Browse the repository at this point in the history
  • Loading branch information
alexshtin authored Oct 9, 2020
2 parents 3291d14 + 5ee7762 commit 14980b3
Show file tree
Hide file tree
Showing 7 changed files with 101 additions and 40 deletions.
30 changes: 20 additions & 10 deletions ctxpropagation/README.md
Original file line number Diff line number Diff line change
@@ -1,21 +1,31 @@
This sample workflow demos context propagation through a workflow. Details about context propagation are
This sample Workflow demos context propagation through a Workflow. Details about context propagation are
available [here](https://docs.temporal.io/docs/go-tracing).

The sample workflow initializes the client with a context propagator which propagates
specific information in the `context.Context` object across the workflow. The `context.Context` object is populated
with the information prior to calling `StartWorkflow`. The workflow demonstrates that the information is available
in the workflow and any activities executed.
The sample Workflow initializes the client with a context propagator which propagates
specific information in the `context.Context` object across the Workflow. The `context.Context` object is populated
with the information prior to calling `StartWorkflow`. The Workflow demonstrates that the information is available
in the Workflow and any activities executed.

Also, this sample initializes a Jaeger global tracer and pass it to the client. The sample will work without
actual Jaeger instance -- just report every tracer call to the log. To see traces in Jaeger run it with follow command:
```
$ docker run --publish 6831:6831 --publish 16686:16686 jaegertracing/all-in-one:latest
```

Steps to run this sample:
1) You need a Temporal service running. See details README.md
2) Run the following command multiple times on different console window. This is to simulate running workers on multiple different machines.
1) You need a Temporal service running. See details README.md.
2) Run
```
go run ctxpropagation/worker/main.go
```
3) Run the following command to execute the context:
to start worker for `ctxpropagation` Workflow.

3) Run:
```
go run ctxpropagation/starter/main.go
```
to start Workflow.

You should see prints showing the context information available in the workflow and activities.


You should see prints showing the context information available in the workflow
and activities.
35 changes: 13 additions & 22 deletions ctxpropagation/propagator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package ctxpropagation
import (
"context"

commonpb "go.temporal.io/api/common/v1"
"go.temporal.io/sdk/converter"
"go.temporal.io/sdk/workflow"
)
Expand All @@ -28,7 +27,7 @@ var PropagateKey = contextKey{}

// propagationKey is the key used by the propagator to pass values through the
// Temporal server headers
const propagationKey = "_prop"
const propagationKey = "custom-header"

// NewContextPropagator returns a context propagator that propagates a set of
// string key-value pairs across a workflow
Expand Down Expand Up @@ -60,34 +59,26 @@ func (s *propagator) InjectFromWorkflow(ctx workflow.Context, writer workflow.He

// Extract extracts values from headers and puts them into context
func (s *propagator) Extract(ctx context.Context, reader workflow.HeaderReader) (context.Context, error) {
if err := reader.ForEachKey(func(key string, value *commonpb.Payload) error {
if key == propagationKey {
var values Values
if err := converter.GetDefaultDataConverter().FromPayload(value, &values); err != nil {
return err
}
ctx = context.WithValue(ctx, PropagateKey, values)
if value, ok := reader.Get(propagationKey); ok {
var values Values
if err := converter.GetDefaultDataConverter().FromPayload(value, &values); err != nil {
return ctx, nil
}
return nil
}); err != nil {
return nil, err
ctx = context.WithValue(ctx, PropagateKey, values)
}

return ctx, nil
}

// ExtractToWorkflow extracts values from headers and puts them into context
func (s *propagator) ExtractToWorkflow(ctx workflow.Context, reader workflow.HeaderReader) (workflow.Context, error) {
if err := reader.ForEachKey(func(key string, value *commonpb.Payload) error {
if key == propagationKey {
var values Values
if err := converter.GetDefaultDataConverter().FromPayload(value, &values); err != nil {
return err
}
ctx = workflow.WithValue(ctx, PropagateKey, values)
if value, ok := reader.Get(propagationKey); ok {
var values Values
if err := converter.GetDefaultDataConverter().FromPayload(value, &values); err != nil {
return ctx, nil
}
return nil
}); err != nil {
return nil, err
ctx = workflow.WithValue(ctx, PropagateKey, values)
}

return ctx, nil
}
6 changes: 6 additions & 0 deletions ctxpropagation/starter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"log"

"github.com/opentracing/opentracing-go"
"github.com/pborman/uuid"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/workflow"
Expand All @@ -12,9 +13,14 @@ import (
)

func main() {
// Set tracer which will be returned by opentracing.GlobalTracer().
closer := ctxpropagation.SetJaegerGlobalTracer()
defer func() { _ = closer.Close() }()

// The client is a heavyweight object that should be created once per process.
c, err := client.NewClient(client.Options{
HostPort: client.DefaultHostPort,
Tracer: opentracing.GlobalTracer(),
ContextPropagators: []workflow.ContextPropagator{ctxpropagation.NewContextPropagator()},
})
if err != nil {
Expand Down
31 changes: 31 additions & 0 deletions ctxpropagation/tracer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package ctxpropagation

import (
"io"

"github.com/opentracing/opentracing-go"
"github.com/uber/jaeger-client-go"
"github.com/uber/jaeger-client-go/config"
)

func SetJaegerGlobalTracer() io.Closer {
cfg := config.Configuration{
ServiceName: "ctx-propogation-sample",
Sampler: &config.SamplerConfig{
Type: jaeger.SamplerTypeConst,
Param: 1,
},
Reporter: &config.ReporterConfig{
LogSpans: true,
},
}
tracer, closer, err := cfg.NewTracer(
config.Logger(jaeger.StdLogger),
)
if err != nil {
panic(err)
}
opentracing.SetGlobalTracer(tracer)

return closer
}
12 changes: 8 additions & 4 deletions ctxpropagation/worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"log"

"github.com/opentracing/opentracing-go"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
"go.temporal.io/sdk/workflow"
Expand All @@ -11,12 +12,15 @@ import (
)

func main() {
// Set tracer which will be returned by opentracing.GlobalTracer().
closer := ctxpropagation.SetJaegerGlobalTracer()
defer func() { _ = closer.Close() }()

// The client and worker are heavyweight objects that should be created once per process.
c, err := client.NewClient(client.Options{
HostPort: client.DefaultHostPort,
ContextPropagators: []workflow.ContextPropagator{
ctxpropagation.NewContextPropagator(),
},
HostPort: client.DefaultHostPort,
ContextPropagators: []workflow.ContextPropagator{ctxpropagation.NewContextPropagator()},
Tracer: opentracing.GlobalTracer(),
})
if err != nil {
log.Fatalln("Unable to create client", err)
Expand Down
10 changes: 8 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,21 @@ module github.com/temporalio/samples-go
go 1.14

require (
github.com/HdrHistogram/hdrhistogram-go v0.9.0 // indirect
github.com/golang/mock v1.4.4
github.com/google/uuid v1.1.2 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/opentracing/opentracing-go v1.2.0
github.com/pborman/uuid v1.2.1
github.com/stretchr/objx v0.3.0 // indirect
github.com/stretchr/testify v1.6.1
github.com/uber/jaeger-client-go v2.25.0+incompatible
github.com/uber/jaeger-lib v2.4.0+incompatible // indirect
go.temporal.io/api v1.0.0
go.temporal.io/sdk v1.0.0
go.temporal.io/sdk v1.1.0
go.uber.org/atomic v1.7.0 // indirect
golang.org/x/net v0.0.0-20201006153459-a7d1128ccaa0 // indirect
golang.org/x/sys v0.0.0-20201006155630-ac719f4daadf // indirect
golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e // indirect
google.golang.org/genproto v0.0.0-20201006033701-bcad7cf615f2 // indirect
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776
)
17 changes: 15 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/HdrHistogram/hdrhistogram-go v0.9.0 h1:dpujRju0R4M/QZzcnR1LH1qm+TVG3UzkWdp5tH1WMcg=
github.com/HdrHistogram/hdrhistogram-go v0.9.0/go.mod h1:nxrse8/Tzg2tg3DZcZjm6qEclQKK70g0KxO61gFFZD4=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
Expand Down Expand Up @@ -88,13 +90,17 @@ github.com/uber-go/tally v3.3.17+incompatible h1:nFHIuW3VQ22wItiE9kPXic8dEgExWOs
github.com/uber-go/tally v3.3.17+incompatible/go.mod h1:YDTIBxdXyOU/sCWilKB4bgyufu1cEi0jdVnRdxvjnmU=
github.com/uber/jaeger-client-go v2.23.1+incompatible h1:uArBYHQR0HqLFFAypI7RsWTzPSj/bDpmZZuQjMLSg1A=
github.com/uber/jaeger-client-go v2.23.1+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk=
github.com/uber/jaeger-client-go v2.25.0+incompatible h1:IxcNZ7WRY1Y3G4poYlx24szfsn/3LvK9QHCq9oQw8+U=
github.com/uber/jaeger-client-go v2.25.0+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk=
github.com/uber/jaeger-lib v2.2.0+incompatible h1:MxZXOiR2JuoANZ3J6DE/U0kSFv/eJ/GfSYVCjK7dyaw=
github.com/uber/jaeger-lib v2.2.0+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U=
github.com/uber/jaeger-lib v2.4.0+incompatible h1:fY7QsGQWiCt8pajv4r7JEvmATdCVaWxXbjwyYwsNaLQ=
github.com/uber/jaeger-lib v2.4.0+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
go.temporal.io/api v1.0.0 h1:mWtvS+5ENYvG4ZPZ/4/bxCj4j3gIF4D05C2GVrhLpjc=
go.temporal.io/api v1.0.0/go.mod h1:AgbKINgV3KR9SlTH8nQRsNadVbxVI+/LnZ1uFModMIA=
go.temporal.io/sdk v1.0.0 h1:Cfrr/RkcoGu+B/vpOII1D7xv4rhRHa5eYQStnuASS7k=
go.temporal.io/sdk v1.0.0/go.mod h1:mtuFg8AafvaXeAE3gwBjZZN05uwTGjzVf1qkXmq4hpM=
go.temporal.io/sdk v1.1.0 h1:qhrjW0Z2cxN1DVpAX0D+krnmoFOS4bQJ4aCrp231HV8=
go.temporal.io/sdk v1.1.0/go.mod h1:3MLsVXjrvQ9z4XaxRie5OOJdiZ/Fu2v3fDCVcAqssEs=
go.uber.org/atomic v1.6.0 h1:Ezj3JGmsOnG1MoRWQkPBsKLe9DwWD9QeXzTRzzldNVk=
go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
Expand All @@ -121,6 +127,8 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLL
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200925080053-05aa5d4ee321 h1:lleNcKRbcaC8MqgLwghIkzZ2JBQAb7QQ9MiwRt1BisA=
golang.org/x/net v0.0.0-20200925080053-05aa5d4ee321/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20201006153459-a7d1128ccaa0 h1:wBouT66WTYFXdxfVdz9sVWARVd/2vfGcmI45D2gj45M=
golang.org/x/net v0.0.0-20201006153459-a7d1128ccaa0/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
Expand All @@ -133,6 +141,9 @@ golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200923182605-d9f96fdee20d h1:L/IKR6COd7ubZrs2oTnTi73IhgqJ71c9s80WsQnh0Es=
golang.org/x/sys v0.0.0-20200923182605-d9f96fdee20d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201006155630-ac719f4daadf h1:Bg47KQy0JhTHuf4sLiQwTMKwUMfSDwgSGatrxGR7nLM=
golang.org/x/sys v0.0.0-20201006155630-ac719f4daadf/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
Expand Down Expand Up @@ -165,6 +176,8 @@ google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo=
google.golang.org/genproto v0.0.0-20200925023002-c2d885f95484 h1:Rr9EZdYRq2WLckzJQVtN3ISKoP7dvgwi7jbglILNZ34=
google.golang.org/genproto v0.0.0-20200925023002-c2d885f95484/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20201006033701-bcad7cf615f2 h1:rBG1miiV00OG3NTbgxg42kPWohjVNn1sGhJyt4xyPLQ=
google.golang.org/genproto v0.0.0-20201006033701-bcad7cf615f2/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/grpc v1.12.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
Expand Down

0 comments on commit 14980b3

Please sign in to comment.