Skip to content

Commit

Permalink
Add support for concurrent exports (#790)
Browse files Browse the repository at this point in the history
* Add support for concurrent exports

* Remove cancellation support from `::await()`
  • Loading branch information
Nevay authored Aug 10, 2022
1 parent 0013e47 commit d0a600f
Show file tree
Hide file tree
Showing 18 changed files with 114 additions and 58 deletions.
28 changes: 28 additions & 0 deletions src/SDK/Common/Future/CompletedFuture.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
<?php

declare(strict_types=1);

namespace OpenTelemetry\SDK\Common\Future;

/**
* @template T
* @template-implements FutureInterface<T>
*/
final class CompletedFuture implements FutureInterface
{
/** @var T */
private $value;

/**
* @param T $value
*/
public function __construct($value)
{
$this->value = $value;
}

public function await()
{
return $this->value;
}
}
16 changes: 16 additions & 0 deletions src/SDK/Common/Future/FutureInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<?php

declare(strict_types=1);

namespace OpenTelemetry\SDK\Common\Future;

/**
* @template T
*/
interface FutureInterface
{
/**
* @psalm-return T
*/
public function await();
}
8 changes: 4 additions & 4 deletions src/SDK/Trace/Behavior/SpanExporterDecoratorTrait.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace OpenTelemetry\SDK\Trace\Behavior;

use OpenTelemetry\SDK\Common\Future\CancellationInterface;
use OpenTelemetry\SDK\Common\Future\FutureInterface;
use OpenTelemetry\SDK\Trace\SpanDataInterface;
use OpenTelemetry\SDK\Trace\SpanExporterInterface;

Expand All @@ -14,16 +15,15 @@ trait SpanExporterDecoratorTrait

/**
* @param iterable<SpanDataInterface> $spans
* @return int
* @psalm-return SpanExporterInterface::STATUS_*
* @return FutureInterface<int>
*/
public function export(iterable $spans, ?CancellationInterface $cancellation = null): int
public function export(iterable $spans, ?CancellationInterface $cancellation = null): FutureInterface
{
$response = $this->decorated->export(
$this->beforeExport($spans),
$cancellation,
);
$this->afterExport($spans, $response);
$this->afterExport($spans, $response->await());

return $response;
}
Expand Down
15 changes: 7 additions & 8 deletions src/SDK/Trace/Behavior/SpanExporterTrait.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
namespace OpenTelemetry\SDK\Trace\Behavior;

use OpenTelemetry\SDK\Common\Future\CancellationInterface;
use OpenTelemetry\SDK\Common\Future\CompletedFuture;
use OpenTelemetry\SDK\Common\Future\FutureInterface;
use OpenTelemetry\SDK\Trace\SpanDataInterface;
use OpenTelemetry\SDK\Trace\SpanExporterInterface;

Expand All @@ -29,19 +31,16 @@ public function forceFlush(?CancellationInterface $cancellation = null): bool
abstract public static function fromConnectionString(string $endpointUrl, string $name, string $args);

/**
* @param iterable<SpanDataInterface> $spans Batch of spans to export
*
* @see https://github.com/open-telemetry/opentelemetry-specification/blob/v1.7.0/specification/trace/sdk.md#exportbatch
*
* @psalm-return SpanExporterInterface::STATUS_*
* @param iterable<SpanDataInterface> $spans
* @return FutureInterface<int>
*/
public function export(iterable $spans, ?CancellationInterface $cancellation = null): int
public function export(iterable $spans, ?CancellationInterface $cancellation = null): FutureInterface
{
if (!$this->running) {
return SpanExporterInterface::STATUS_FAILED_NOT_RETRYABLE;
return new CompletedFuture(SpanExporterInterface::STATUS_FAILED_NOT_RETRYABLE);
}

return $this->doExport($spans); /** @phpstan-ignore-line */
return new CompletedFuture($this->doExport($spans)); /** @phpstan-ignore-line */
}

/**
Expand Down
5 changes: 3 additions & 2 deletions src/SDK/Trace/SpanExporterInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace OpenTelemetry\SDK\Trace;

use OpenTelemetry\SDK\Common\Future\CancellationInterface;
use OpenTelemetry\SDK\Common\Future\FutureInterface;

/**
* @see https://github.com/open-telemetry/opentelemetry-specification/blob/v1.7.0/specification/trace/sdk.md#span-exporter
Expand All @@ -25,9 +26,9 @@ public static function fromConnectionString(string $endpointUrl, string $name, s
*
* @see https://github.com/open-telemetry/opentelemetry-specification/blob/v1.7.0/specification/trace/sdk.md#exportbatch
*
* @psalm-return SpanExporterInterface::STATUS_*
* @psalm-return FutureInterface<int>
*/
public function export(iterable $spans, ?CancellationInterface $cancellation = null): int;
public function export(iterable $spans, ?CancellationInterface $cancellation = null): FutureInterface;

/** @see https://github.com/open-telemetry/opentelemetry-specification/blob/v1.7.0/specification/trace/sdk.md#shutdown-2 */
public function shutdown(?CancellationInterface $cancellation = null): bool;
Expand Down
2 changes: 1 addition & 1 deletion src/SDK/Trace/SpanProcessor/BatchSpanProcessor.php
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public function forceFlush(?CancellationInterface $cancellation = null): bool
return true;
}

$this->exporter->export($this->queue);
$this->exporter->export($this->queue)->await();
$this->queue = [];
$this->stopwatch->reset();
$this->exporter->forceFlush();
Expand Down
2 changes: 1 addition & 1 deletion src/SDK/Trace/SpanProcessor/SimpleSpanProcessor.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public function onEnd(ReadableSpanInterface $span): void
}

if (null !== $this->exporter) {
$this->exporter->export([$span->toSpanData()]);
$this->exporter->export([$span->toSpanData()])->await();
}
}

Expand Down
4 changes: 2 additions & 2 deletions tests/Unit/Contrib/AbstractHttpExporterTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public function test_exporter_response_status($responseStatus, $expected): void
$expected,
$this->createExporter()->export([
$this->createMock(SpanData::class),
])
])->await(),
);
}

Expand Down Expand Up @@ -107,7 +107,7 @@ public function test_client_exception_decides_return_code($exception, $expected)
$expected,
$this->createExporter()->export([
$this->createMock(SpanData::class),
])
])->await(),
);
}

Expand Down
12 changes: 6 additions & 6 deletions tests/Unit/Contrib/AgentExporterTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,18 @@

declare(strict_types=1);

namespace OpenTelemetry\Tests\Contrib\Unit;
namespace OpenTelemetry\Tests\Unit\Contrib;

use OpenTelemetry\Contrib\Jaeger\AgentExporter;
use OpenTelemetry\SDK\Trace\SpanExporterInterface;
use OpenTelemetry\Tests\Unit\SDK\Util\SpanData;
use PHPUnit\Framework\TestCase;

/**
* @covers OpenTelemetry\Contrib\Jaeger\AgentExporter
* @covers OpenTelemetry\Contrib\Jaeger\JaegerTransport
* @covers OpenTelemetry\Contrib\Jaeger\ThriftUdpTransport
* @covers OpenTelemetry\Contrib\Jaeger\ParsedEndpointUrl
* @covers \OpenTelemetry\Contrib\Jaeger\AgentExporter
* @covers \OpenTelemetry\Contrib\Jaeger\JaegerTransport
* @covers \OpenTelemetry\Contrib\Jaeger\ThriftUdpTransport
* @covers \OpenTelemetry\Contrib\Jaeger\ParsedEndpointUrl
*/
class AgentExporterTest extends TestCase
{
Expand All @@ -24,7 +24,7 @@ public function test_happy_path()
'someServiceName',
);

$status = $exporter->export([new SpanData()]);
$status = $exporter->export([new SpanData()])->await();

$this->assertSame(SpanExporterInterface::STATUS_SUCCESS, $status);

Expand Down
14 changes: 7 additions & 7 deletions tests/Unit/Contrib/JaegerHttpCollectorExporterTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@
use PHPUnit\Framework\TestCase;

/**
* @covers OpenTelemetry\Contrib\Jaeger\HttpCollectorExporter
* @covers OpenTelemetry\Contrib\Jaeger\HttpSender
* @covers OpenTelemetry\Contrib\Jaeger\ThriftHttpTransport
* @covers OpenTelemetry\Contrib\Jaeger\ParsedEndpointUrl
* @covers OpenTelemetry\Contrib\Jaeger\BatchAdapter\BatchAdapter
* @covers OpenTelemetry\Contrib\Jaeger\BatchAdapter\BatchAdapterFactory
* @covers \OpenTelemetry\Contrib\Jaeger\HttpCollectorExporter
* @covers \OpenTelemetry\Contrib\Jaeger\HttpSender
* @covers \OpenTelemetry\Contrib\Jaeger\ThriftHttpTransport
* @covers \OpenTelemetry\Contrib\Jaeger\ParsedEndpointUrl
* @covers \OpenTelemetry\Contrib\Jaeger\BatchAdapter\BatchAdapter
* @covers \OpenTelemetry\Contrib\Jaeger\BatchAdapter\BatchAdapterFactory
*
*/
class JaegerHttpCollectorExporterTest extends TestCase
Expand All @@ -35,7 +35,7 @@ public function test_happy_path()
$this->getStreamFactoryInterfaceMock()
);

$status = $exporter->export([new SpanData()]);
$status = $exporter->export([new SpanData()])->await();

$this->assertSame(SpanExporterInterface::STATUS_SUCCESS, $status);
}
Expand Down
8 changes: 4 additions & 4 deletions tests/Unit/Contrib/OTLPGrpcExporterTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
use org\bovigo\vfs\vfsStream;

/**
* @covers OpenTelemetry\Contrib\OtlpGrpc\Exporter
* @covers \OpenTelemetry\Contrib\OtlpGrpc\Exporter
*/
class OTLPGrpcExporterTest extends AbstractExporterTest
{
Expand Down Expand Up @@ -55,7 +55,7 @@ public function test_exporter_happy_path(): void
])
);

$exporterStatusCode = $exporter->export([new SpanData()]);
$exporterStatusCode = $exporter->export([new SpanData()])->await();

$this->assertSame(SpanExporterInterface::STATUS_SUCCESS, $exporterStatusCode);
}
Expand All @@ -80,14 +80,14 @@ public function test_exporter_unexpected_grpc_response_status(): void
])
);

$exporterStatusCode = $exporter->export([new SpanData()]);
$exporterStatusCode = $exporter->export([new SpanData()])->await();

$this->assertSame(SpanExporterInterface::STATUS_FAILED_NOT_RETRYABLE, $exporterStatusCode);
}

public function test_exporter_grpc_responds_as_unavailable(): void
{
$this->assertEquals(SpanExporterInterface::STATUS_FAILED_RETRYABLE, (new Exporter())->export([new SpanData()]));
$this->assertEquals(SpanExporterInterface::STATUS_FAILED_RETRYABLE, (new Exporter())->export([new SpanData()])->await());
}

public function test_set_headers_with_environment_variables(): void
Expand Down
10 changes: 5 additions & 5 deletions tests/Unit/Contrib/OTLPHttpExporterTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
use Psr\Http\Client\NetworkExceptionInterface;

/**
* @covers OpenTelemetry\Contrib\OtlpHttp\Exporter
* @covers \OpenTelemetry\Contrib\OtlpHttp\Exporter
*/
class OTLPHttpExporterTest extends AbstractExporterTest
{
Expand Down Expand Up @@ -59,7 +59,7 @@ public function test_exporter_response_status($responseStatus, $expected): void

$this->assertEquals(
$expected,
$exporter->export([new SpanData()])
$exporter->export([new SpanData()])->await(),
);
}

Expand Down Expand Up @@ -90,7 +90,7 @@ public function test_client_exceptions_should_decide_return_code($exception, $ex

$this->assertEquals(
$expected,
$exporter->export([new SpanData()])
$exporter->export([new SpanData()])->await(),
);
}

Expand Down Expand Up @@ -129,7 +129,7 @@ public function test_exporter_with_config_via_env_vars(?string $endpoint, string
$client = new Client(['handler' => $stack]);
$exporter = new Exporter($client, new HttpFactory(), new HttpFactory());

$exporter->export([new SpanData()]);
$exporter->export([new SpanData()])->await();

$request = $container[0]['request'];

Expand Down Expand Up @@ -164,7 +164,7 @@ public function test_should_be_ok_to_exporter_empty_spans_collection(): void
$this->getClientInterfaceMock(),
$this->getRequestFactoryInterfaceMock(),
$this->getStreamFactoryInterfaceMock()
))->export([])
))->export([])->await(),
);
}

Expand Down
2 changes: 1 addition & 1 deletion tests/Unit/SDK/Trace/SpanExporter/AbstractExporterTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,6 @@ public function test_fails_if_not_test_running(): void
$span = $this->createMock(SpanData::class);
$exporter->shutdown();

$this->assertSame(SpanExporterInterface::STATUS_FAILED_NOT_RETRYABLE, $exporter->export([$span]));
$this->assertSame(SpanExporterInterface::STATUS_FAILED_NOT_RETRYABLE, $exporter->export([$span])->await());
}
}
8 changes: 4 additions & 4 deletions tests/Unit/SDK/Trace/SpanExporter/ConsoleSpanExporterTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
use OpenTelemetry\SDK\Trace\SpanExporterInterface;

/**
* @covers OpenTelemetry\SDK\Trace\SpanExporter\ConsoleSpanExporter
* @covers \OpenTelemetry\SDK\Trace\SpanExporter\ConsoleSpanExporter
*/
class ConsoleSpanExporterTest extends AbstractExporterTest
{
Expand Down Expand Up @@ -64,7 +64,7 @@ public function test_export_success(): void
SpanExporterInterface::STATUS_SUCCESS,
(new ConsoleSpanExporter($converter))->export([
$this->createMock(SpanDataInterface::class),
])
])->await(),
);

ob_end_clean();
Expand All @@ -84,7 +84,7 @@ public function test_export_failed(): void
SpanExporterInterface::STATUS_FAILED_NOT_RETRYABLE,
(new ConsoleSpanExporter($converter))->export([
$this->createMock(SpanDataInterface::class),
])
])->await(),
);

ob_end_clean();
Expand All @@ -108,7 +108,7 @@ public function test_export_output(): void

(new ConsoleSpanExporter($converter))->export([
$this->createMock(SpanDataInterface::class),
]);
])->await();
}

public function test_from_connection_string(): void
Expand Down
Loading

0 comments on commit d0a600f

Please sign in to comment.