Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update tracing example for opentelemetry #664

Merged
merged 2 commits into from
Jan 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ tox>=4.3.0
coverage>=5.3
wheel
# used in unit test only
opencensus>=0.11.0
opentelemetry-sdk
opentelemetry-instrumentation-grpc
httpx>=0.24
pyOpenSSL>=23.2.0
# needed for type checking
Expand Down
169 changes: 117 additions & 52 deletions examples/w3c-tracing/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Example - Distributed tracing

In this sample, we'll run two Python applications: a service application, which exposes two methods, and a client application which will invoke the methods from the service using Dapr. The code is instrumented with [OpenCensus SDK for Python](https://opencensus.io/guides/grpc/python/).
In this sample, we'll run two Python applications: a service application, which exposes two methods, and a client application which will invoke the methods from the service using Dapr. The code is instrumented with [OpenTelemetry SDK for Python](https://opentelemetry.io/docs/languages/python/).
This sample includes:

- invoke-receiver: Exposes the methods to be remotely accessed
Expand Down Expand Up @@ -59,43 +59,72 @@ If Zipkin is not working, [install the newest version of Dapr Cli and initialize

### Run the Demo service sample

The Demo service application exposes two methods that can be remotely invoked. In this example, the service code has two parts:
The Demo service application exposes three methods that can be remotely invoked. In this example, the service code has two parts:

In the `invoke-receiver.py` file, you will find the OpenCensus tracing and exporter initialization in addition to two methods: `say` and `sleep`. The instrumentation for the service happens automatically via the `OpenCensusServerInterceptor` class.
In the `invoke-receiver.py` file, you will find the Opentelemetry tracing and exporter initialization in addition to two methods: `say`, `sleep` and `forward`. The instrumentation for the service happens automatically via the `GrpcInstrumentorServer` class.
```python
tracer_interceptor = server_interceptor.OpenCensusServerInterceptor(AlwaysOnSampler())
app = App(
thread_pool=futures.ThreadPoolExecutor(max_workers=10),
interceptors=(tracer_interceptor,))
grpc_server_instrumentor = GrpcInstrumentorServer()
grpc_server_instrumentor.instrument()
```


The `say` method prints the incoming payload and metadata in console. See the code snippet below:
The `saytrace` method prints the incoming payload and metadata in console. We also return the current trace ID so we can verify whether the trace ID propagated correctly.
See the code snippet below:

```python
@app.method(name='say')
def say(request: InvokeMethodRequest) -> InvokeMethodResponse:
tracer = Tracer(sampler=AlwaysOnSampler())
with tracer.span(name='say') as span:
@app.method(name='saytrace')
def saytrace(request: InvokeMethodRequest) -> InvokeMethodResponse:
with tracer.start_as_current_span(name='say') as span:
data = request.text()
span.add_annotation('Request length', len=len(data))
span.add_event(name='log', attributes={'Request length': len(data)})
print(request.metadata, flush=True)
print(request.text(), flush=True)

return InvokeMethodResponse(b'SAY', "text/plain; charset=UTF-8")
resp = {
'receivedtraceid': span.get_span_context().trace_id,
'method': 'SAY'
}

return InvokeMethodResponse(json.dumps(resp), 'application/json; charset=UTF-8')
```

The `sleep` methods simply waits for two seconds to simulate a slow operation.
```python
@app.method(name='sleep')
def sleep(request: InvokeMethodRequest) -> InvokeMethodResponse:
tracer = Tracer(sampler=AlwaysOnSampler())
with tracer.span(name='sleep') as _:
with tracer.start_as_current_span(name='sleep'):
time.sleep(2)
print(request.metadata, flush=True)
print(request.text(), flush=True)

return InvokeMethodResponse(b'SLEEP', "text/plain; charset=UTF-8")
return InvokeMethodResponse(b'SLEEP', 'text/plain; charset=UTF-8')
```

The `forward` method makes a request to the `saytrace` method while attaching the current trace context. It simply returns the response of the `saytrace` method.
This allows us to verify whether the `traceid` is still correct after this nested callchain.

```python
@app.method(name='forward')
def forward(request: InvokeMethodRequest) -> InvokeMethodResponse:
with tracer.start_as_current_span(name='forward'):
print(request.metadata, flush=True)
print(request.text(), flush=True)

# this helper method can be used to inject the tracing headers into the request
def trace_injector() -> typing.Dict[str, str]:
headers: typing.Dict[str, str] = {}
TraceContextTextMapPropagator().inject(carrier=headers)
return headers

# service invocation uses HTTP, so we need to inject the tracing headers into the request
with DaprClient(headers_callback=trace_injector) as d:
resp = d.invoke_method(
'invoke-receiver',
'saytrace',
data=request.text().encode("utf-8"),
)

return InvokeMethodResponse(json.dumps(resp.json()), 'application/json; charset=UTF-8')
```

Use the following command to execute the service:
Expand All @@ -121,40 +150,72 @@ Once running, the service is now ready to be invoked by Dapr.

### Run the InvokeClient sample

This sample code uses the Dapr SDK for invoking two remote methods (`say` and `sleep`). Again, it is instrumented with OpenCensus with Zipkin exporter. See the code snippet below:
This sample code uses the Dapr SDK for invoking three remote methods (`say`, `sleep` and `forward`). Again, it is instrumented with OpenTelemetry with the Zipkin exporter. See the code snippet below:

```python
ze = ZipkinExporter(
service_name="python-example",
host_name='localhost',
port=9411,
endpoint='/api/v2/spans')
import json
import typing

from opentelemetry import trace
from opentelemetry.exporter.zipkin.json import ZipkinExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.trace.sampling import ALWAYS_ON
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator

from dapr.clients import DaprClient

tracer = Tracer(exporter=ze, sampler=AlwaysOnSampler())
# Create a tracer provider
tracer_provider = TracerProvider(sampler=ALWAYS_ON)

with tracer.span(name="main") as span:
with DaprClient(tracer=tracer) as d:
# Create a span processor
span_processor = BatchSpanProcessor(ZipkinExporter(endpoint="http://localhost:9411/api/v2/spans"))

# Add the span processor to the tracer provider
tracer_provider.add_span_processor(span_processor)

# Set the tracer provider
trace.set_tracer_provider(tracer_provider)

# Get the tracer
tracer = trace.get_tracer(__name__)


# this helper method can be used to inject the tracing headers into the request
def trace_injector() -> typing.Dict[str, str]:
headers: typing.Dict[str, str] = {}
TraceContextTextMapPropagator().inject(carrier=headers)
return headers


with tracer.start_as_current_span(name='main') as span:
with DaprClient(
# service invocation uses HTTP, so we need to inject the tracing headers into the request
headers_callback=lambda: trace_injector()
) as d:
num_messages = 2

for i in range(num_messages):
# Create a typed message with content type and body
resp = d.invoke_method(
'invoke-receiver',
'say',
data=json.dumps({
'id': i,
'message': 'hello world'
}),
'saytrace',
data=json.dumps({'id': i, 'message': 'hello world'}),
)
# Print the response
print(resp.content_type, flush=True)
print(resp.text(), flush=True)
print(resp.json()['method'], flush=True)
traceid = resp.json()['receivedtraceid']

resp = d.invoke_method('invoke-receiver', 'sleep', data='')
# Print the response
print(resp.content_type, flush=True)
print(resp.text(), flush=True)

forwarded_resp = d.invoke_method('invoke-receiver', 'forward', data='')
match_string = 'matches' if (
forwarded_resp.json()["receivedtraceid"] == traceid) else 'does not match'
print(f"Trace ID {match_string} after forwarding", flush=True)
```

The class knows the `app-id` for the remote application. It uses `invoke_method` to invoke API calls on the service endpoint. Instrumentation happens automatically in `Dapr` client via the `tracer` argument.
Expand All @@ -166,12 +227,16 @@ name: Run caller app with tracing
match_order: none
expected_stdout_lines:
- "✅ You're up and running! Both Dapr and your app logs will appear here."
- '== APP == text/plain'
- '== APP == application/json'
- '== APP == SAY'
- '== APP == text/plain'
- '== APP == SLEEP'
- '== APP == text/plain'
- '== APP == Trace ID matches after forwarding'
- '== APP == application/json'
- '== APP == SAY'
- '== APP == text/plain'
- '== APP == SLEEP'
- '== APP == Trace ID matches after forwarding'
- "✅ Exited App successfully"
background: true
sleep: 10
Expand Down Expand Up @@ -217,40 +282,40 @@ To see traces collected through the API:
<!-- STEP
match_order: none
expected_stdout_lines:
- '"calllocal/invoke-receiver/say"'
- '"calllocal/invoke-receiver/saytrace"'
- '"calllocal/invoke-receiver/sleep"'
- '"calllocal/invoke-receiver/say"'
- '"calllocal/invoke-receiver/forward"'
- '"calllocal/invoke-receiver/saytrace"'
- '"calllocal/invoke-receiver/sleep"'
- '"calllocal/invoke-receiver/forward"'
name: Curl validate
-->

```bash
curl -s "http://localhost:9411/api/v2/traces?serviceName=invoke-receiver&spanName=calllocal%2Finvoke-receiver%2Fsay&limit=10" -H "accept: application/json" | jq ".[][] | .name, .duration"
curl -s "http://localhost:9411/api/v2/traces?serviceName=invoke-receiver&spanName=calllocal%2Finvoke-receiver%2Fsaytrace&limit=10" -H "accept: application/json" | jq ".[][] | .name, .duration"
```

<!-- END_STEP -->

The `jq` command line utility is used in the above to give you a nice human readable printout of method calls and their duration:

```
"calllocal/invoke-receiver/say"
12711
"calllocal/invoke-receiver/say"
15218
"calllocal/invoke-receiver/saytrace"
7511
"calllocal/invoke-receiver/sleep"
2005904
"calllocal/invoke-receiver/say"
1407
2006537
"calllocal/invoke-receiver/sleep"
2006538
"calllocal/invoke-receiver/say"
1844
2006268
"calllocal/invoke-receiver/forward"
10965
"calllocal/invoke-receiver/forward"
10490
"calllocal/invoke-receiver/saytrace"
1948
"calllocal/invoke-receiver/saytrace"
1545
"main"
4045202
"calllocal/invoke-receiver/sleep"
2004350
"calllocal/invoke-receiver/sleep"
2004914
4053102
```

## Cleanup
Expand Down
53 changes: 42 additions & 11 deletions examples/w3c-tracing/invoke-caller.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,66 @@
import json
import typing

from opentelemetry import trace
from opentelemetry.exporter.zipkin.json import ZipkinExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.trace.sampling import ALWAYS_ON
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator

from dapr.clients import DaprClient

from opencensus.trace.tracer import Tracer
from opencensus.ext.zipkin.trace_exporter import ZipkinExporter
from opencensus.trace.samplers import AlwaysOnSampler
# Create a tracer provider
tracer_provider = TracerProvider(sampler=ALWAYS_ON)

# Create a span processor
span_processor = BatchSpanProcessor(ZipkinExporter(endpoint='http://localhost:9411/api/v2/spans'))

# Add the span processor to the tracer provider
tracer_provider.add_span_processor(span_processor)

# Set the tracer provider
trace.set_tracer_provider(tracer_provider)

# Get the tracer
tracer = trace.get_tracer(__name__)

ze = ZipkinExporter(
service_name='python-example', host_name='localhost', port=9411, endpoint='/api/v2/spans'
)

tracer = Tracer(exporter=ze, sampler=AlwaysOnSampler())
# this helper method can be used to inject the tracing headers into the request
def trace_injector() -> typing.Dict[str, str]:
headers: typing.Dict[str, str] = {}
TraceContextTextMapPropagator().inject(carrier=headers)
return headers

with tracer.span(name='main') as span:

with tracer.start_as_current_span(name='main') as span:
with DaprClient(
headers_callback=lambda: tracer.propagator.to_headers(tracer.span_context)
# service invocation uses HTTP, so we need to inject the tracing headers into the request
headers_callback=lambda: trace_injector()
) as d:
num_messages = 2

for i in range(num_messages):
# Create a typed message with content type and body
resp = d.invoke_method(
'invoke-receiver',
'say',
'saytrace',
data=json.dumps({'id': i, 'message': 'hello world'}),
)
# Print the response
print(resp.content_type, flush=True)
print(resp.text(), flush=True)
print(resp.json()['method'], flush=True)
traceid = resp.json()['receivedtraceid']

resp = d.invoke_method('invoke-receiver', 'sleep', data='')
# Print the response
print(resp.content_type, flush=True)
print(resp.text(), flush=True)

forwarded_resp = d.invoke_method('invoke-receiver', 'forward', data='')
match_string = (
'matches'
if (forwarded_resp.json()['receivedtraceid'] == traceid)
else 'does not match'
)
print(f'Trace ID {match_string} after forwarding', flush=True)
Loading
Loading