From dbe4d6ae32fc39c2ab1526194381bdc8a4ebf16f Mon Sep 17 00:00:00 2001 From: Lars Michelsen Date: Sun, 14 Jul 2024 20:22:32 +0200 Subject: [PATCH] Use RetryingExporter in HTTP log exporter This unifies the implementation of the OTLP exporters and the HTTP log exporter. Next step is to consolidate the remaining HTTP exporters. Fixes #4043. --- .../otlp/proto/http/_log_exporter/__init__.py | 55 ++++++++----------- .../tests/test_proto_log_exporter.py | 10 ++-- 2 files changed, 29 insertions(+), 36 deletions(-) diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py index 597b012a49a..d36588130d3 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py @@ -18,12 +18,12 @@ from io import BytesIO from os import environ from typing import Dict, Optional, Sequence -from time import sleep import requests -from opentelemetry.exporter.otlp.proto.common._internal import ( - _create_exp_backoff_generator, +from opentelemetry.exporter.otlp.proto.common.exporter import ( + RetryableExportError, + RetryingExporter, ) from opentelemetry.exporter.otlp.proto.common._log_encoder import encode_logs from opentelemetry.sdk.environment_variables import ( @@ -124,6 +124,7 @@ def __init__( {"Content-Encoding": self._compression.value} ) self._shutdown = False + self._exporter = RetryingExporter(self._export, LogExportResult) def _export(self, serialized_data: bytes): data = serialized_data @@ -135,7 +136,7 @@ def _export(self, serialized_data: bytes): elif self._compression == Compression.Deflate: data = zlib.compress(serialized_data) - return self._session.post( + resp = self._session.post( url=self._endpoint, data=data, verify=self._certificate_file, @@ -143,6 +144,23 @@ def _export(self, serialized_data: bytes): cert=self._client_cert, ) + if resp.ok: + return LogExportResult.SUCCESS + + if self._retryable(resp): + _logger.warning( + "Transient error %s encountered while exporting logs batch.", + resp.reason, + ) + raise RetryableExportError(None) + + _logger.error( + "Failed to export logs batch code: %s, reason: %s", + resp.status_code, + resp.text, + ) + return LogExportResult.FAILURE + @staticmethod def _retryable(resp: requests.Response) -> bool: if resp.status_code == 408: @@ -159,34 +177,7 @@ def export(self, batch: Sequence[LogData]) -> LogExportResult: return LogExportResult.FAILURE serialized_data = encode_logs(batch).SerializeToString() - - for delay in _create_exp_backoff_generator( - max_value=self._MAX_RETRY_TIMEOUT - ): - - if delay == self._MAX_RETRY_TIMEOUT: - return LogExportResult.FAILURE - - resp = self._export(serialized_data) - # pylint: disable=no-else-return - if resp.ok: - return LogExportResult.SUCCESS - elif self._retryable(resp): - _logger.warning( - "Transient error %s encountered while exporting logs batch, retrying in %ss.", - resp.reason, - delay, - ) - sleep(delay) - continue - else: - _logger.error( - "Failed to export logs batch code: %s, reason: %s", - resp.status_code, - resp.text, - ) - return LogExportResult.FAILURE - return LogExportResult.FAILURE + return self._exporter.export_with_retry(serialized_data) def force_flush(self, timeout_millis: float = 10_000) -> bool: """Nothing is buffered in this exporter, so this method does nothing.""" diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py index f5606794620..d2e443580aa 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py @@ -270,7 +270,7 @@ def test_exported_log_without_span_id(self): self.fail("No log records found") @responses.activate - @patch("opentelemetry.exporter.otlp.proto.http._log_exporter.sleep") + @patch("opentelemetry.exporter.otlp.proto.common.exporter.sleep") def test_exponential_backoff(self, mock_sleep): # return a retryable error responses.add( @@ -358,12 +358,14 @@ def _get_sdk_log_data() -> List[LogData]: return [log1, log2, log3, log4] - @patch.object(OTLPLogExporter, "_export", return_value=Mock(ok=True)) - def test_2xx_status_code(self, mock_otlp_metric_exporter): + def test_2xx_status_code(self): """ Test that any HTTP 2XX code returns a successful result """ self.assertEqual( - OTLPLogExporter().export(MagicMock()), LogExportResult.SUCCESS + OTLPLogExporter( + session=Mock(**{"post.return_value": Mock(ok=True)}) + ).export(MagicMock()), + LogExportResult.SUCCESS, )