Skip to content

Commit

Permalink
port over changeset from our fork
Browse files Browse the repository at this point in the history
new files

undo defaultContextProp changes

remove bad calls to method no longer in this pr

remove health check from this PR

remove default adding of otel

Add Health check API to worker and service interface (#617)
  • Loading branch information
AngerM-DD committed Oct 26, 2021
1 parent dca853a commit 2156317
Show file tree
Hide file tree
Showing 13 changed files with 838 additions and 256 deletions.
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ dependencies {
compile group: 'io.micrometer', name: 'micrometer-core', version: '1.1.2'
compile group: 'javax.annotation', name: 'javax.annotation-api', version: '1.3.2'
compile group: 'com.auth0', name: 'java-jwt', version:'3.10.2'
compile group: 'io.opentelemetry', name: 'opentelemetry-sdk', version: '1.1.0'

testCompile group: 'junit', name: 'junit', version: '4.12'
testCompile group: 'com.googlecode.junit-toolbox', name: 'junit-toolbox', version: '2.4'
Expand Down
30 changes: 30 additions & 0 deletions src/main/java/com/uber/cadence/context/ContextPropagator.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
* Context Propagators are used to propagate information from workflow to activity, workflow to
* child workflow, and workflow to child thread (using {@link com.uber.cadence.workflow.Async}).
*
* <p>It is important to note that all threads share one ContextPropagator instance, so your
* implementation <b>must</b> be thread-safe and store any state in ThreadLocal variables.
*
* <p>A sample <code>ContextPropagator</code> that copies all {@link org.slf4j.MDC} entries starting
* with a given prefix along the code path looks like this:
*
Expand Down Expand Up @@ -136,4 +139,31 @@ public interface ContextPropagator {

/** Sets the current context */
void setCurrentContext(Object context);

/**
* This is a lifecycle method, called after the context has been propagated to the
* workflow/activity thread but the workflow/activity has not yet started.
*/
default void setUp() {
// No-op
}

/**
* This is a lifecycle method, called after the workflow/activity has completed. If the method
* finished without exception, {@code successful} will be true. Otherwise, it will be false and
* {@link #onError(Throwable)} will have already been called.
*/
default void finish() {
// No-op
}

/**
* This is a lifecycle method, called when the workflow/activity finishes by throwing an unhandled
* exception. {@link #finish()} is called after this method.
*
* @param t The unhandled exception that caused the workflow/activity to terminate
*/
default void onError(Throwable t) {
// No-op
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License is
* located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.uber.cadence.context;

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.baggage.Baggage;
import io.opentelemetry.api.baggage.propagation.W3CBaggagePropagator;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.context.propagation.TextMapGetter;
import io.opentelemetry.context.propagation.TextMapPropagator;
import io.opentelemetry.context.propagation.TextMapSetter;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nullable;
import org.slf4j.MDC;

public class OpenTelemetryContextPropagator implements ContextPropagator {

private static final TextMapPropagator w3cTraceContextPropagator =
W3CTraceContextPropagator.getInstance();
private static final TextMapPropagator w3cBaggagePropagator = W3CBaggagePropagator.getInstance();
private static ThreadLocal<Scope> currentContextOtelScope = new ThreadLocal<>();
private static ThreadLocal<Span> currentOtelSpan = new ThreadLocal<>();
private static ThreadLocal<Scope> currentOtelScope = new ThreadLocal<>();
private static ThreadLocal<Iterable<String>> otelKeySet = new ThreadLocal<>();
private static final TextMapSetter<Map<String, String>> setter = Map::put;
private static final TextMapGetter<Map<String, String>> getter =
new TextMapGetter<Map<String, String>>() {
@Override
public Iterable<String> keys(Map<String, String> carrier) {
return otelKeySet.get();
}

@Nullable
@Override
public String get(Map<String, String> carrier, String key) {
return MDC.get(key);
}
};

@Override
public String getName() {
return "OpenTelemetry";
}

@Override
public Map<String, byte[]> serializeContext(Object context) {
Map<String, byte[]> serializedContext = new HashMap<>();
Map<String, String> contextMap = (Map<String, String>) context;
if (contextMap != null) {
for (Map.Entry<String, String> entry : contextMap.entrySet()) {
serializedContext.put(entry.getKey(), entry.getValue().getBytes(Charset.defaultCharset()));
}
}
return serializedContext;
}

@Override
public Object deserializeContext(Map<String, byte[]> context) {
Map<String, String> contextMap = new HashMap<>();
for (Map.Entry<String, byte[]> entry : context.entrySet()) {
contextMap.put(entry.getKey(), new String(entry.getValue(), Charset.defaultCharset()));
}
return contextMap;
}

@Override
public Object getCurrentContext() {
Map<String, String> carrier = new HashMap<>();
w3cTraceContextPropagator.inject(Context.current(), carrier, setter);
w3cBaggagePropagator.inject(Context.current(), carrier, setter);
return carrier;
}

@Override
public void setCurrentContext(Object context) {
Map<String, String> contextMap = (Map<String, String>) context;
if (contextMap != null) {
for (Map.Entry<String, String> entry : contextMap.entrySet()) {
MDC.put(entry.getKey(), entry.getValue());
}
otelKeySet.set(contextMap.keySet());
}
}

@Override
@SuppressWarnings("MustBeClosedChecker")
public void setUp() {
Context context =
Baggage.fromContext(w3cBaggagePropagator.extract(Context.current(), null, getter))
.toBuilder()
.build()
.storeInContext(w3cTraceContextPropagator.extract(Context.current(), null, getter));

currentContextOtelScope.set(context.makeCurrent());

Span span =
GlobalOpenTelemetry.getTracer("cadence-client")
.spanBuilder("cadence.workflow")
.setParent(context)
.setSpanKind(SpanKind.CLIENT)
.startSpan();

Scope scope = span.makeCurrent();
currentOtelSpan.set(span);
currentOtelScope.set(scope);
}

@Override
public void finish() {
Scope scope = currentOtelScope.get();
if (scope != null) {
scope.close();
}
Span span = currentOtelSpan.get();
if (span != null) {
span.end();
}
Scope contextScope = currentContextOtelScope.get();
if (contextScope != null) {
contextScope.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,14 @@
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** This class holds the current set of context propagators */
/** This class holds the current set of context propagators. */
public class ContextThreadLocal {

private static final Logger log = LoggerFactory.getLogger(ContextThreadLocal.class);

private static WorkflowThreadLocal<List<ContextPropagator>> contextPropagators =
WorkflowThreadLocal.withInitial(
new Supplier<List<ContextPropagator>>() {
Expand All @@ -37,7 +41,7 @@ public List<ContextPropagator> get() {
}
});

/** Sets the list of context propagators for the thread */
/** Sets the list of context propagators for the thread. */
public static void setContextPropagators(List<ContextPropagator> propagators) {
if (propagators == null || propagators.isEmpty()) {
return;
Expand All @@ -57,6 +61,11 @@ public static Map<String, Object> getCurrentContextForPropagation() {
return contextData;
}

/**
* Injects the context data into the thread for each configured context propagator.
*
* @param contextData The context data received from the server.
*/
public static void propagateContextToCurrentThread(Map<String, Object> contextData) {
if (contextData == null || contextData.isEmpty()) {
return;
Expand All @@ -67,4 +76,44 @@ public static void propagateContextToCurrentThread(Map<String, Object> contextDa
}
}
}

/** Calls {@link ContextPropagator#setUp()} for each propagator. */
public static void setUpContextPropagators() {
for (ContextPropagator propagator : contextPropagators.get()) {
try {
propagator.setUp();
} catch (Throwable t) {
// Don't let an error in one propagator block the others
log.error("Error calling setUp() on a contextpropagator", t);
}
}
}

/**
* Calls {@link ContextPropagator#onError(Throwable)} for each propagator.
*
* @param t The Throwable that caused the workflow/activity to finish.
*/
public static void onErrorContextPropagators(Throwable t) {
for (ContextPropagator propagator : contextPropagators.get()) {
try {
propagator.onError(t);
} catch (Throwable t1) {
// Don't let an error in one propagator block the others
log.error("Error calling onError() on a contextpropagator", t1);
}
}
}

/** Calls {@link ContextPropagator#finish()} for each propagator. */
public static void finishContextPropagators() {
for (ContextPropagator propagator : contextPropagators.get()) {
try {
propagator.finish();
} catch (Throwable t) {
// Don't let an error in one propagator block the others
log.error("Error calling finish() on a contextpropagator", t);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

final class WorkflowContext {

Expand Down Expand Up @@ -166,7 +167,18 @@ Map<String, Object> getPropagatedContexts() {

Map<String, Object> contextData = new HashMap<>();
for (ContextPropagator propagator : contextPropagators) {
contextData.put(propagator.getName(), propagator.deserializeContext(headerData));
// Only send the context propagator the fields that belong to them
// Change the map from MyPropagator:foo -> bar to foo -> bar
Map<String, byte[]> filteredData =
headerData
.entrySet()
.stream()
.filter(e -> e.getKey().startsWith(propagator.getName()))
.collect(
Collectors.toMap(
e -> e.getKey().substring(propagator.getName().length() + 1),
Map.Entry::getValue));
contextData.put(propagator.getName(), propagator.deserializeContext(filteredData));
}

return contextData;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -453,7 +454,18 @@ private Map<String, byte[]> extractContextsAndConvertToBytes(
}
Map<String, byte[]> result = new HashMap<>();
for (ContextPropagator propagator : contextPropagators) {
result.putAll(propagator.serializeContext(propagator.getCurrentContext()));
// Get the serialized context from the propagator
Map<String, byte[]> serializedContext =
propagator.serializeContext(propagator.getCurrentContext());
// Namespace each entry in case of overlaps, so foo -> bar becomes MyPropagator:foo -> bar
Map<String, byte[]> namespacedSerializedContext =
serializedContext
.entrySet()
.stream()
.collect(
Collectors.toMap(
e -> propagator.getName() + ":" + e.getKey(), Map.Entry::getValue));
result.putAll(namespacedSerializedContext);
}
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

class WorkflowStubImpl implements WorkflowStub {

Expand Down Expand Up @@ -204,7 +205,18 @@ private Map<String, byte[]> extractContextsAndConvertToBytes(
}
Map<String, byte[]> result = new HashMap<>();
for (ContextPropagator propagator : contextPropagators) {
result.putAll(propagator.serializeContext(propagator.getCurrentContext()));
// Get the serialized context from the propagator
Map<String, byte[]> serializedContext =
propagator.serializeContext(propagator.getCurrentContext());
// Namespace each entry in case of overlaps, so foo -> bar becomes MyPropagator:foo -> bar
Map<String, byte[]> namespacedSerializedContext =
serializedContext
.entrySet()
.stream()
.collect(
Collectors.toMap(
k -> propagator.getName() + ":" + k.getKey(), Map.Entry::getValue));
result.putAll(namespacedSerializedContext);
}
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ public void run() {
// Repopulate the context(s)
ContextThreadLocal.setContextPropagators(this.contextPropagators);
ContextThreadLocal.propagateContextToCurrentThread(this.propagatedContexts);
ContextThreadLocal.setUpContextPropagators();

try {
// initialYield blocks thread until the first runUntilBlocked is called.
Expand All @@ -99,6 +100,7 @@ public void run() {
cancellationScope.run();
} catch (DestroyWorkflowThreadError e) {
if (!threadContext.isDestroyRequested()) {
ContextThreadLocal.onErrorContextPropagators(e);
threadContext.setUnhandledException(e);
}
} catch (Error e) {
Expand All @@ -111,9 +113,11 @@ public void run() {
log.error(
String.format("Workflow thread \"%s\" run failed with Error:\n%s", name, stackTrace));
}
ContextThreadLocal.onErrorContextPropagators(e);
threadContext.setUnhandledException(e);
} catch (CancellationException e) {
if (!isCancelRequested()) {
ContextThreadLocal.onErrorContextPropagators(e);
threadContext.setUnhandledException(e);
}
if (log.isDebugEnabled()) {
Expand All @@ -130,8 +134,10 @@ public void run() {
"Workflow thread \"%s\" run failed with unhandled exception:\n%s",
name, stackTrace));
}
ContextThreadLocal.onErrorContextPropagators(e);
threadContext.setUnhandledException(e);
} finally {
ContextThreadLocal.finishContextPropagators();
DeterministicRunnerImpl.setCurrentThreadInternal(null);
threadContext.setStatus(Status.DONE);
thread.setName(originalName);
Expand Down
Loading

0 comments on commit 2156317

Please sign in to comment.