Skip to content

Commit

Permalink
fix(opentelemetry-sdk-trace-base): always wait on pending export in S…
Browse files Browse the repository at this point in the history
…impleSpanProcessor
  • Loading branch information
anuraaga committed Jan 7, 2025
1 parent 90afa28 commit 8ae7697
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,16 @@ import { Resource } from '@opentelemetry/resources';
*/
export class SimpleSpanProcessor implements SpanProcessor {
private _shutdownOnce: BindOnceFuture<void>;
private _unresolvedExports: Set<Promise<void>>;
private _pendingExports: Set<Promise<void>>;

constructor(private readonly _exporter: SpanExporter) {
this._shutdownOnce = new BindOnceFuture(this._shutdown, this);
this._unresolvedExports = new Set<Promise<void>>();
this._pendingExports = new Set<Promise<void>>();
}

async forceFlush(): Promise<void> {
// await unresolved resources before resolving
await Promise.all(Array.from(this._unresolvedExports));
// await pending exports
await Promise.all(Array.from(this._pendingExports));
if (this._exporter.forceFlush) {
await this._exporter.forceFlush();
}
Expand All @@ -64,10 +64,15 @@ export class SimpleSpanProcessor implements SpanProcessor {
return;
}

let exportPromise: Promise<void> | undefined = undefined;

const doExport = () =>
internal
._export(this._exporter, [span])
.then((result: ExportResult) => {
if (exportPromise) {
this._pendingExports.delete(exportPromise);
}
if (result.code !== ExportResultCode.SUCCESS) {
globalErrorHandler(
result.error ??
Expand All @@ -81,26 +86,17 @@ export class SimpleSpanProcessor implements SpanProcessor {
globalErrorHandler(error);
});

// Avoid scheduling a promise to make the behavior more predictable and easier to test
if (span.resource.asyncAttributesPending) {
const exportPromise = (span.resource as Resource)
exportPromise = (span.resource as Resource)
.waitForAsyncAttributes?.()
.then(
() => {
if (exportPromise != null) {
this._unresolvedExports.delete(exportPromise);
}
return doExport();
},
err => globalErrorHandler(err)
);

// store the unresolved exports
if (exportPromise != null) {
this._unresolvedExports.add(exportPromise);
}
.then(doExport, err => globalErrorHandler(err));
} else {
void doExport();
exportPromise = doExport();
}

// store the unresolved exports
if (exportPromise) {
this._pendingExports.add(exportPromise);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,40 @@ describe('SimpleSpanProcessor', () => {
);
});

it('should await doExport() and delete from _unresolvedExports', async () => {
it('should await doExport() and delete from _pendingExports', async () => {
const testExporterWithDelay = new TestExporterWithDelay();
const processor = new SimpleSpanProcessor(testExporterWithDelay);
const spanContext: SpanContext = {
traceId: 'a3cda95b652f4a1592b449d5929fda1b',
spanId: '5e0c63257de34c92',
traceFlags: TraceFlags.SAMPLED,
};
const tracer = provider.getTracer('default');
const span = new SpanImpl({
scope: tracer.instrumentationLibrary,
resource: tracer['_resource'],
context: ROOT_CONTEXT,
spanContext,
name: 'span-name',
kind: SpanKind.CLIENT,
spanLimits: tracer.getSpanLimits(),
spanProcessor: tracer['_spanProcessor'],
});
processor.onStart(span, ROOT_CONTEXT);
processor.onEnd(span);

assert.strictEqual(processor['_pendingExports'].size, 1);

await processor.forceFlush();

assert.strictEqual(processor['_pendingExports'].size, 0);

const exportedSpans = testExporterWithDelay.getFinishedSpans();

assert.strictEqual(exportedSpans.length, 1);
});

it('should await doExport() and delete from _pendingExports with async resource', async () => {
const testExporterWithDelay = new TestExporterWithDelay();
const processor = new SimpleSpanProcessor(testExporterWithDelay);

Expand Down Expand Up @@ -247,11 +280,11 @@ describe('SimpleSpanProcessor', () => {
processor.onStart(span, ROOT_CONTEXT);
processor.onEnd(span);

assert.strictEqual(processor['_unresolvedExports'].size, 1);
assert.strictEqual(processor['_pendingExports'].size, 1);

await processor.forceFlush();

assert.strictEqual(processor['_unresolvedExports'].size, 0);
assert.strictEqual(processor['_pendingExports'].size, 0);

const exportedSpans = testExporterWithDelay.getFinishedSpans();

Expand Down

0 comments on commit 8ae7697

Please sign in to comment.