diff --git a/azure-kusto-data/azure/kusto/data/_cloud_settings.py b/azure-kusto-data/azure/kusto/data/_cloud_settings.py index 6976111a..4c983e62 100644 --- a/azure-kusto-data/azure/kusto/data/_cloud_settings.py +++ b/azure-kusto-data/azure/kusto/data/_cloud_settings.py @@ -8,7 +8,7 @@ from azure.core.tracing.decorator import distributed_trace from azure.core.tracing import SpanKind -from ._telemetry import KustoTracingAttributes, KustoTracing +from ._telemetry import Span, MonitoredActivity from .exceptions import KustoServiceError METADATA_ENDPOINT = "v1/rest/auth/metadata" @@ -78,7 +78,7 @@ def get_cloud_info_for_cluster(cls, kusto_uri: str, proxies: Optional[Dict[str, kusto_uri = cls._normalize_uri(kusto_uri) # tracing attributes for cloud info - KustoTracingAttributes.set_cloud_info_attributes(kusto_uri) + Span.set_cloud_info_attributes(kusto_uri) if kusto_uri in cls._cloud_cache: # Double-checked locking to avoid unnecessary lock access return cls._cloud_cache[kusto_uri] @@ -89,10 +89,10 @@ def get_cloud_info_for_cluster(cls, kusto_uri: str, proxies: Optional[Dict[str, url = urljoin(kusto_uri, METADATA_ENDPOINT) # trace http get call for result - http_trace_attributes = KustoTracingAttributes.create_http_attributes(url=url, method="GET") - - result = KustoTracing.call_func_tracing( - requests.get, url, proxies=proxies, allow_redirects=False, name_of_span="CloudSettings.http_get", tracing_attributes=http_trace_attributes + result = MonitoredActivity.invoke( + lambda: requests.get(url, proxies=proxies, allow_redirects=False), + name_of_span="CloudSettings.http_get", + tracing_attributes=Span.create_http_attributes(url=url, method="GET"), ) if result.status_code == 200: diff --git a/azure-kusto-data/azure/kusto/data/_telemetry.py b/azure-kusto-data/azure/kusto/data/_telemetry.py index 551a61db..70201c0e 100644 --- a/azure-kusto-data/azure/kusto/data/_telemetry.py +++ b/azure-kusto-data/azure/kusto/data/_telemetry.py @@ -1,4 +1,4 @@ -from typing import Callable, Optional +from typing import Callable, Optional, TypeVar from azure.core.settings import settings from azure.core.tracing.decorator import distributed_trace @@ -8,7 +8,7 @@ from .client_request_properties import ClientRequestProperties -class KustoTracingAttributes: +class Span: """ Additional ADX attributes for telemetry spans """ @@ -97,58 +97,31 @@ def create_cluster_attributes(cls, cluster_uri: str) -> dict: return cluster_attributes -class KustoTracing: - @staticmethod - def call_func_tracing(func: Callable, *args, **kwargs): - """ - Prepares function for tracing and calls it - :param func: function to trace - :type func: Callable - :key str name_of_span: name of the trace span - :key dict tracing_attributes: key/value dictionary of attributes to include in span of trace - :key str kind: the type of span - :param kwargs: function arguments - """ - name_of_span: str = kwargs.pop("name_of_span", None) - tracing_attributes: dict = kwargs.pop("tracing_attributes", {}) - kind: str = kwargs.pop("kind", SpanKind.CLIENT) +class MonitoredActivity: + """ + Invoker class for telemetry + """ - kusto_trace: Callable = distributed_trace(name_of_span=name_of_span, tracing_attributes=tracing_attributes, kind=kind) - kusto_func: Callable = kusto_trace(func) - return kusto_func(*args, **kwargs) + T = TypeVar("T") @staticmethod - async def call_func_tracing_async(func: Callable, *args, **kwargs): + def invoke(invoker: Callable[[], T], name_of_span: str = None, tracing_attributes=None, kind: str = SpanKind.INTERNAL) -> T: """ - Prepares function for tracing and calls it - :param func: function to trace - :type func: Callable - :key str name_of_span: name of the trace span - :key dict tracing_attributes: key/value dictionary of attributes to include in span of trace - :key str kind: the type of span - :param kwargs: function arguments + Runs the span on given function """ - name_of_span: str = kwargs.pop("name_of_span", None) - tracing_attributes: dict = kwargs.pop("tracing_attributes", {}) - kind: str = kwargs.pop("kind", SpanKind.CLIENT) - - kusto_trace: Callable = distributed_trace_async(name_of_span=name_of_span, tracing_attributes=tracing_attributes, kind=kind) - kusto_func: Callable = kusto_trace(func) - return await kusto_func(*args, **kwargs) + if tracing_attributes is None: + tracing_attributes = {} + span_shell: Callable = distributed_trace(name_of_span=name_of_span, tracing_attributes=tracing_attributes, kind=kind) + span = span_shell(invoker) + return span() @staticmethod - def prepare_func_tracing(func: Callable, **kwargs): + async def invoke_async(invoker: Callable[[], T], name_of_span: str = None, tracing_attributes=None, kind: str = SpanKind.INTERNAL) -> T: """ - Prepares function for tracing - :param func: function to trace - :type func: Callable - :key str name_of_span: name of the trace span - :key dict tracing_attributes: key/value dictionary of attributes to include in span of trace - :key str kind: the type of span + Runs a span on given function """ - name_of_span: str = kwargs.pop("name_of_span", None) - tracing_attributes: dict = kwargs.pop("tracing_attributes", {}) - kind: str = kwargs.pop("kind", SpanKind.CLIENT) - - kusto_trace: Callable = distributed_trace(name_of_span=name_of_span, tracing_attributes=tracing_attributes, kind=kind) - return kusto_trace(func) + if tracing_attributes is None: + tracing_attributes = {} + span_shell: Callable = distributed_trace_async(name_of_span=name_of_span, tracing_attributes=tracing_attributes, kind=kind) + span = span_shell(invoker) + return await span() diff --git a/azure-kusto-data/azure/kusto/data/_token_providers.py b/azure-kusto-data/azure/kusto/data/_token_providers.py index f342dcff..687dca47 100644 --- a/azure-kusto-data/azure/kusto/data/_token_providers.py +++ b/azure-kusto-data/azure/kusto/data/_token_providers.py @@ -16,7 +16,7 @@ from msal import ConfidentialClientApplication, PublicClientApplication from ._cloud_settings import CloudInfo, CloudSettings -from ._telemetry import KustoTracing +from ._telemetry import MonitoredActivity from .exceptions import KustoAioSyntaxError, KustoAsyncUsageError, KustoClientError DeviceCallbackType = Callable[[str, str, datetime], None] @@ -153,10 +153,7 @@ def _get_token(): token = self._get_token_from_cache_impl() if token is None: with self._lock: - token = KustoTracing.call_func_tracing( - self._get_token_impl, name_of_span=f"{self.name()}.get_token_impl", tracing_attributes=self.context() - ) - + token = MonitoredActivity.invoke(self._get_token_impl, name_of_span=f"{self.name()}.get_token_impl", tracing_attributes=self.context()) return self._valid_token_or_throw(token) return _get_token() @@ -190,7 +187,7 @@ async def _get_token_async(): if token is None: async with self._async_lock: - token = await KustoTracing.call_func_tracing_async( + token = await MonitoredActivity.invoke_async( self._get_token_impl_async, name_of_span=f"{self.name()}.get_token_impl_async", tracing_attributes=context ) diff --git a/azure-kusto-data/azure/kusto/data/aio/client.py b/azure-kusto-data/azure/kusto/data/aio/client.py index 7bed051c..3f0eebd0 100644 --- a/azure-kusto-data/azure/kusto/data/aio/client.py +++ b/azure-kusto-data/azure/kusto/data/aio/client.py @@ -7,7 +7,7 @@ from .response import KustoStreamingResponseDataSet from .._decorators import aio_documented_by, documented_by -from .._telemetry import KustoTracing, KustoTracingAttributes +from .._telemetry import MonitoredActivity, Span from ..aio.streaming_response import JsonTokenReader, StreamingDataSetEnumerator from ..client import KustoClient as KustoClientSync from ..client_base import ExecuteRequestParams, _KustoClientBase @@ -51,17 +51,17 @@ async def execute(self, database: Optional[str], query: str, properties: ClientR @distributed_trace_async(name_of_span="KustoClient.query_cmd", kind=SpanKind.CLIENT) @aio_documented_by(KustoClientSync.execute_query) - async def execute_query(self, database: Optional[str], query: str, properties: ClientRequestProperties = None) -> KustoResponseDataSet: + async def execute_query(self, database: str, query: str, properties: ClientRequestProperties = None) -> KustoResponseDataSet: database = self._get_database_or_default(database) - KustoTracingAttributes.set_query_attributes(self._kusto_cluster, database, properties) + Span.set_query_attributes(self._kusto_cluster, database, properties) return await self._execute(self._query_endpoint, database, query, None, KustoClient._query_default_timeout, properties) @distributed_trace_async(name_of_span="KustoClient.control_cmd", kind=SpanKind.CLIENT) @aio_documented_by(KustoClientSync.execute_mgmt) - async def execute_mgmt(self, database: Optional[str], query: str, properties: ClientRequestProperties = None) -> KustoResponseDataSet: + async def execute_mgmt(self, database: str, query: str, properties: ClientRequestProperties = None) -> KustoResponseDataSet: database = self._get_database_or_default(database) - KustoTracingAttributes.set_query_attributes(self._kusto_cluster, database, properties) + Span.set_query_attributes(self._kusto_cluster, database, properties) return await self._execute(self._mgmt_endpoint, database, query, None, KustoClient._mgmt_default_timeout, properties) @@ -77,7 +77,7 @@ async def execute_streaming_ingest( mapping_name: str = None, ): database = self._get_database_or_default(database) - KustoTracingAttributes.set_streaming_ingest_attributes(self._kusto_cluster, database, table, properties) + Span.set_streaming_ingest_attributes(self._kusto_cluster, database, table, properties) stream_format = stream_format.kusto_value if isinstance(stream_format, DataFormat) else DataFormat[stream_format.upper()].kusto_value endpoint = self._streaming_ingest_endpoint + database + "/" + table + "?streamFormat=" + stream_format @@ -107,7 +107,7 @@ async def execute_streaming_query( properties: Optional[ClientRequestProperties] = None, ) -> KustoStreamingResponseDataSet: database = self._get_database_or_default(database) - KustoTracingAttributes.set_query_attributes(self._kusto_cluster, database, properties) + Span.set_query_attributes(self._kusto_cluster, database, properties) response = await self._execute_streaming_query_parsed(database, query, timeout, properties) return KustoStreamingResponseDataSet(response) @@ -144,18 +144,11 @@ async def _execute( if self._aad_helper: request_headers["Authorization"] = await self._aad_helper.acquire_authorization_header_async() - http_trace_attributes = KustoTracingAttributes.create_http_attributes(url=endpoint, method="POST", headers=request_headers) - response = await KustoTracing.call_func_tracing_async( - self._session.post, - endpoint, - headers=request_headers, - json=json_payload, - data=payload, - timeout=timeout.seconds, - proxy=self._proxy_url, - name_of_span="KustoClient.http_post", - tracing_attributes=http_trace_attributes, - allow_redirects=False, + invoker = lambda: self._session.post( + endpoint, headers=request_headers, json=json_payload, data=payload, timeout=timeout.seconds, proxy=self._proxy_url, allow_redirects=False + ) + response = await MonitoredActivity.invoke_async( + invoker, name_of_span="KustoClient.http_post", tracing_attributes=Span.create_http_attributes("POST", endpoint, request_headers) ) if stream_response: @@ -188,5 +181,4 @@ async def _execute( except Exception: response_text = None raise self._handle_http_error(e, endpoint, payload, response, response.status, response_json, response_text) - - return KustoTracing.call_func_tracing(self._kusto_parse_by_endpoint, endpoint, response_json, name_of_span="KustoClient.processing_response") + return MonitoredActivity.invoke(lambda: self._kusto_parse_by_endpoint(endpoint, response_json), name_of_span="KustoClient.processing_response") diff --git a/azure-kusto-data/azure/kusto/data/client.py b/azure-kusto-data/azure/kusto/data/client.py index 6b476136..d7395d79 100644 --- a/azure-kusto-data/azure/kusto/data/client.py +++ b/azure-kusto-data/azure/kusto/data/client.py @@ -13,7 +13,7 @@ from azure.core.tracing.decorator import distributed_trace from azure.core.tracing import SpanKind -from azure.kusto.data._telemetry import KustoTracingAttributes, KustoTracing +from azure.kusto.data._telemetry import Span, MonitoredActivity from .client_base import ExecuteRequestParams, _KustoClientBase from .client_request_properties import ClientRequestProperties @@ -171,7 +171,7 @@ def execute_query(self, database: Optional[str], query: str, properties: Optiona :rtype: azure.kusto.data.response.KustoResponseDataSet """ database = self._get_database_or_default(database) - KustoTracingAttributes.set_query_attributes(self._kusto_cluster, database, properties) + Span.set_query_attributes(self._kusto_cluster, database, properties) return self._execute(self._query_endpoint, database, query, None, self._query_default_timeout, properties) @@ -187,7 +187,7 @@ def execute_mgmt(self, database: Optional[str], query: str, properties: Optional :rtype: azure.kusto.data.response.KustoResponseDataSet """ database = self._get_database_or_default(database) - KustoTracingAttributes.set_query_attributes(self._kusto_cluster, database, properties) + Span.set_query_attributes(self._kusto_cluster, database, properties) return self._execute(self._mgmt_endpoint, database, query, None, self._mgmt_default_timeout, properties) @@ -214,7 +214,7 @@ def execute_streaming_ingest( :param str mapping_name: Pre-defined mapping of the table. Required when stream_format is json/avro. """ database = self._get_database_or_default(database) - KustoTracingAttributes.set_streaming_ingest_attributes(self._kusto_cluster, database, table, properties) + Span.set_streaming_ingest_attributes(self._kusto_cluster, database, table, properties) stream_format = stream_format.kusto_value if isinstance(stream_format, DataFormat) else DataFormat[stream_format.upper()].kusto_value endpoint = self._streaming_ingest_endpoint + database + "/" + table + "?streamFormat=" + stream_format @@ -252,7 +252,7 @@ def execute_streaming_query( :param azure.kusto.data.ClientRequestProperties properties: Optional additional properties. :return KustoStreamingResponseDataSet: """ - KustoTracingAttributes.set_query_attributes(self._kusto_cluster, database, properties) + Span.set_query_attributes(self._kusto_cluster, database, properties) return KustoStreamingResponseDataSet(self._execute_streaming_query_parsed(database, query, timeout, properties)) @@ -288,9 +288,7 @@ def _execute( request_headers["Authorization"] = self._aad_helper.acquire_authorization_header() # trace http post call for response - http_trace_attributes = KustoTracingAttributes.create_http_attributes(url=endpoint, method="POST", headers=request_headers) - response = KustoTracing.call_func_tracing( - self._session.post, + invoker = lambda: self._session.post( endpoint, headers=request_headers, json=json_payload, @@ -298,8 +296,9 @@ def _execute( timeout=timeout.seconds, stream=stream_response, allow_redirects=False, - name_of_span="KustoClient.http_post", - tracing_attributes=http_trace_attributes, + ) + response = MonitoredActivity.invoke( + invoker, name_of_span="KustoClient.http_post", tracing_attributes=Span.create_http_attributes("POST", endpoint, request_headers) ) if stream_response: @@ -320,4 +319,4 @@ def _execute( except Exception as e: raise self._handle_http_error(e, endpoint, payload, response, response.status_code, response_json, response.text) # trace response processing - return KustoTracing.call_func_tracing(self._kusto_parse_by_endpoint, endpoint, response_json, name_of_span="KustoClient.processing_response") + return MonitoredActivity.invoke(lambda: self._kusto_parse_by_endpoint(endpoint, response_json), name_of_span="KustoClient.processing_response") diff --git a/azure-kusto-data/tests/aio/test_async_token_providers.py b/azure-kusto-data/tests/aio/test_async_token_providers.py index c9e39021..03ec6b46 100644 --- a/azure-kusto-data/tests/aio/test_async_token_providers.py +++ b/azure-kusto-data/tests/aio/test_async_token_providers.py @@ -247,7 +247,7 @@ async def test_app_key_provider(self): @pytest.mark.asyncio async def test_app_cert_provider(self): # default details are for kusto-client-e2e-test-app - # to run the test download the certs from Azure Portal + # to invoke the test download the certs from Azure Portal cert_app_id = os.environ.get("CERT_APP_ID", "b699d721-4f6f-4320-bc9a-88d578dfe68f") cert_auth = os.environ.get("CERT_AUTH", "72f988bf-86f1-41af-91ab-2d7cd011db47") thumbprint = os.environ.get("CERT_THUMBPRINT") diff --git a/azure-kusto-data/tests/test_telemetry.py b/azure-kusto-data/tests/test_telemetry.py index c42a2715..f00d4135 100644 --- a/azure-kusto-data/tests/test_telemetry.py +++ b/azure-kusto-data/tests/test_telemetry.py @@ -1,89 +1,118 @@ import pytest -from azure.kusto.data._telemetry import KustoTracing, KustoTracingAttributes +from azure.kusto.data._telemetry import MonitoredActivity, Span from azure.kusto.data.client_request_properties import ClientRequestProperties -class TestTelemetry: - """ - Tests for telemetry class to make sure adding tracing doesn't impact functionality of original code - """ - - @staticmethod - def plus_one(num): - KustoTracingAttributes.add_attributes(tracing_attributes={"foo": "bar"}) - return num + 1 - - @staticmethod - async def plus_one_async(num): - KustoTracingAttributes.add_attributes(tracing_attributes={"foo": "bar"}) - return num + 1 - - @staticmethod - def test_call_func_tracing(): - res = KustoTracing.call_func_tracing(TestTelemetry.plus_one, 1, name_of_span="plus_one") - assert res == 2 - - @staticmethod - def test_prepare_func_tracing(): - res = KustoTracing.prepare_func_tracing(TestTelemetry.plus_one, name_of_span="plus_one") - assert res(1) == 2 - - @staticmethod - @pytest.mark.asyncio - async def test_call_func_tracing_async(): - res = KustoTracing.call_func_tracing_async(TestTelemetry.plus_one_async, 1, name_of_span="plus_one") - assert await res == 2 - - @staticmethod - def test_get_client_request_properties_attributes(): - attributes = ClientRequestProperties().get_tracing_attributes() - keynames = {"client_request_id"} - assert isinstance(attributes, dict) - for key, val in attributes.items(): - assert key in keynames - assert isinstance(val, str) - for key in keynames: - assert key in attributes.keys() - - @staticmethod - def test_create_query_attributes(): - attributes = KustoTracingAttributes.create_query_attributes("cluster_test", "database_test", ClientRequestProperties()) - keynames = {"kusto_cluster", "database", "client_request_id"} - assert isinstance(attributes, dict) - for key, val in attributes.items(): - assert isinstance(val, str) - for key in keynames: - assert key in attributes.keys() - attributes = KustoTracingAttributes.create_query_attributes("cluster_test", "database_test") - keynames = {"kusto_cluster", "database"} - assert isinstance(attributes, dict) - for key, val in attributes.items(): - assert isinstance(val, str) - for key in keynames: - assert key in attributes.keys() - - @staticmethod - def test_create_ingest_attributes(): - attributes = KustoTracingAttributes.create_streaming_ingest_attributes("cluster_test", "database_test", "table", ClientRequestProperties()) - keynames = {"kusto_cluster", "database", "table", "client_request_id"} - assert isinstance(attributes, dict) - for key, val in attributes.items(): - assert isinstance(val, str) - for key in keynames: - assert key in attributes.keys() - attributes = KustoTracingAttributes.create_streaming_ingest_attributes("cluster_test", "database_test", "table") - keynames = {"kusto_cluster", "database", "table"} - assert isinstance(attributes, dict) - for key, val in attributes.items(): - assert isinstance(val, str) - for key in keynames: - assert key in attributes.keys() - - @staticmethod - def test_create_http_attributes(): - attributes = KustoTracingAttributes.create_http_attributes("method_test", "url_test") - assert attributes == {"component": "http", "http.method": "method_test", "http.url": "url_test"} - headers = {"User-Agent": "user_agent_test"} - attributes = KustoTracingAttributes.create_http_attributes("method_test", "url_test", headers) - assert attributes == {"component": "http", "http.method": "method_test", "http.url": "url_test", "http.user_agent": "user_agent_test"} +def test_run_none_invoker(): + # Edge case test for invoke method with None invoker function + with pytest.raises(TypeError): + MonitoredActivity.invoke(None, "test_span") + + +@pytest.mark.asyncio +async def test_run_async_valid_invoker(): + # Happy path test for invoke_async method with valid invoker function and name of span + async def invoker(): + return "Hello World" + + span = await MonitoredActivity.invoke_async(invoker, "test_span") + assert span == "Hello World" + + +def test_run_valid_invoker(): + # Happy path test for invoke method with valid invoker function and name of span + + def invoker(): + return "Hello World" + + span = MonitoredActivity.invoke(invoker, "test_span") + assert span == "Hello World" + + +@pytest.mark.asyncio +async def test_run_async_none_invoker(): + # Edge case test for invoke_async method with None invoker function + with pytest.raises(TypeError): + await MonitoredActivity.invoke_async(None, "test_span") + + +def test_run_sync_behavior(): + # General behavior test for invoke method running the span synchronously + def invoker(): + return "Hello World" + + span = MonitoredActivity.invoke(invoker, "test_span") + assert span == "Hello World" + + +@pytest.mark.asyncio +async def test_run_async_behavior(): + # General behavior test for invoke_async method running the span asynchronously + async def invoker(): + return "Hello World" + + span = await MonitoredActivity.invoke_async(invoker, "test_span") + assert span == "Hello World" + + +def test_tracing_attributes_parameter(): + def invoker(): + return "Hello World" + + tracing_attributes = {"key": "value"} + result = MonitoredActivity.invoke(invoker, tracing_attributes=tracing_attributes) + assert result == "Hello World" + + +def test_get_client_request_properties_attributes(): + attributes = ClientRequestProperties().get_tracing_attributes() + keynames = {"client_request_id"} + assert isinstance(attributes, dict) + for key, val in attributes.items(): + assert key in keynames + assert isinstance(val, str) + for key in keynames: + assert key in attributes.keys() + + +def test_create_query_attributes(): + attributes = Span.create_query_attributes("cluster_test", "database_test", ClientRequestProperties()) + keynames = {"kusto_cluster", "database", "client_request_id"} + assert isinstance(attributes, dict) + for key, val in attributes.items(): + assert isinstance(val, str) + for key in keynames: + assert key in attributes.keys() + attributes = Span.create_query_attributes("cluster_test", "database_test") + keynames = {"kusto_cluster", "database"} + assert isinstance(attributes, dict) + for key, val in attributes.items(): + assert isinstance(val, str) + for key in keynames: + assert key in attributes.keys() + + +def test_create_ingest_attributes(): + attributes = Span.create_streaming_ingest_attributes("cluster_test", "database_test", "table", ClientRequestProperties()) + keynames = {"kusto_cluster", "database", "table", "client_request_id"} + assert isinstance(attributes, dict) + for key, val in attributes.items(): + assert isinstance(val, str) + for key in keynames: + assert key in attributes.keys() + attributes = Span.create_streaming_ingest_attributes("cluster_test", "database_test", "table") + keynames = {"kusto_cluster", "database", "table"} + assert isinstance(attributes, dict) + for key, val in attributes.items(): + assert isinstance(val, str) + for key in keynames: + assert key in attributes.keys() + + +def test_create_http_attributes(): + attributes = Span.create_http_attributes("method_test", "url_test") + assert attributes == {"component": "http", "http.method": "method_test", "http.url": "url_test"} + headers = {"User-Agent": "user_agent_test"} + attributes = Span.create_http_attributes("method_test", "url_test", headers) + assert attributes == {"component": "http", "http.method": "method_test", "http.url": "url_test", "http.user_agent": "user_agent_test"} diff --git a/azure-kusto-ingest/azure/kusto/ingest/_ingest_telemetry.py b/azure-kusto-ingest/azure/kusto/ingest/_ingest_telemetry.py index f3514aa4..93e537e7 100644 --- a/azure-kusto-ingest/azure/kusto/ingest/_ingest_telemetry.py +++ b/azure-kusto-ingest/azure/kusto/ingest/_ingest_telemetry.py @@ -1,6 +1,6 @@ import uuid -from azure.kusto.data._telemetry import KustoTracingAttributes +from azure.kusto.data._telemetry import Span from .descriptors import DescriptorBase from .ingestion_properties import IngestionProperties @@ -16,7 +16,7 @@ class IngestTracingAttributes: @classmethod def set_ingest_descriptor_attributes(cls, descriptor: DescriptorBase, ingestion_properties: IngestionProperties) -> None: - KustoTracingAttributes.add_attributes(tracing_attributes={**ingestion_properties.get_tracing_attributes(), **descriptor.get_tracing_attributes()}) + Span.add_attributes(tracing_attributes={**ingestion_properties.get_tracing_attributes(), **descriptor.get_tracing_attributes()}) @classmethod def create_enqueue_request_attributes(cls, queue_name: str, source_id: uuid.UUID) -> dict: diff --git a/azure-kusto-ingest/azure/kusto/ingest/_resource_manager.py b/azure-kusto-ingest/azure/kusto/ingest/_resource_manager.py index 5f8a7653..b9dc5878 100644 --- a/azure-kusto-ingest/azure/kusto/ingest/_resource_manager.py +++ b/azure-kusto-ingest/azure/kusto/ingest/_resource_manager.py @@ -8,7 +8,7 @@ from azure.kusto.data import KustoClient from azure.kusto.data._models import KustoResultTable -from azure.kusto.data._telemetry import KustoTracing, KustoTracingAttributes +from azure.kusto.data._telemetry import MonitoredActivity, Span from azure.kusto.data.exceptions import KustoThrottlingError _SHOW_VERSION = ".show version" @@ -93,12 +93,14 @@ def _get_resource_by_name(self, table: KustoResultTable, resource_name: str): def _get_ingest_client_resources_from_service(self): # trace all calls to get ingestion resources - trace_get_ingestion_resources = KustoTracing.prepare_func_tracing( - self._kusto_client.execute, - name_of_span="_ResourceManager.get_ingestion_resources", - tracing_attributes=KustoTracingAttributes.create_cluster_attributes(self._kusto_client._kusto_cluster), - ) - result = self._retryer(trace_get_ingestion_resources, "NetDefaultDB", ".get ingestion resources") + def invoker(): + return MonitoredActivity.invoke( + lambda: self._kusto_client.execute("NetDefaultDB", ".get ingestion resources"), + name_of_span="_ResourceManager.get_ingestion_resources", + tracing_attributes=Span.create_cluster_attributes(self._kusto_client._kusto_cluster), + ) + + result = self._retryer(invoker) table = result.primary_results[0] secured_ready_for_aggregation_queues = self._get_resource_by_name(table, "SecuredReadyForAggregationQueue") @@ -120,12 +122,14 @@ def _refresh_authorization_context(self): def _get_authorization_context_from_service(self): # trace all calls to get identity token - trace_get_identity_token = KustoTracing.prepare_func_tracing( - self._kusto_client.execute, - name_of_span="_ResourceManager.get_identity_token", - tracing_attributes=KustoTracingAttributes.create_cluster_attributes(self._kusto_client._kusto_cluster), - ) - result = self._retryer(trace_get_identity_token, "NetDefaultDB", ".get kusto identity token") + def invoker(): + return MonitoredActivity.invoke( + lambda: self._kusto_client.execute("NetDefaultDB", ".get kusto identity token"), + name_of_span="_ResourceManager.get_identity_token", + tracing_attributes=Span.create_cluster_attributes(self._kusto_client._kusto_cluster), + ) + + result = self._retryer(invoker) return result.primary_results[0][0]["AuthorizationContext"] def get_ingestion_queues(self) -> List[_ResourceUri]: diff --git a/azure-kusto-ingest/azure/kusto/ingest/ingest_client.py b/azure-kusto-ingest/azure/kusto/ingest/ingest_client.py index 3b2b17e6..bcc8b031 100644 --- a/azure-kusto-ingest/azure/kusto/ingest/ingest_client.py +++ b/azure-kusto-ingest/azure/kusto/ingest/ingest_client.py @@ -9,7 +9,7 @@ from azure.storage.queue import QueueServiceClient, TextBase64EncodePolicy from azure.kusto.data import KustoClient, KustoConnectionStringBuilder -from azure.kusto.data._telemetry import KustoTracing +from azure.kusto.data._telemetry import MonitoredActivity from azure.kusto.data.exceptions import KustoClosedError, KustoServiceError from ._ingest_telemetry import IngestTracingAttributes @@ -132,14 +132,9 @@ def ingest_from_blob(self, blob_descriptor: BlobDescriptor, ingestion_properties ingestion_blob_info_json = ingestion_blob_info.to_json() with queue_service.get_queue_client(queue=random_queue.object_name, message_encode_policy=TextBase64EncodePolicy()) as queue_client: # trace enqueuing of blob for ingestion + invoker = lambda: queue_client.send_message(content=ingestion_blob_info_json, timeout=self._SERVICE_CLIENT_TIMEOUT_SECONDS) enqueue_trace_attributes = IngestTracingAttributes.create_enqueue_request_attributes(queue_client.queue_name, blob_descriptor.source_id) - KustoTracing.call_func_tracing( - queue_client.send_message, - content=ingestion_blob_info_json, - timeout=self._SERVICE_CLIENT_TIMEOUT_SECONDS, - name_of_span="QueuedIngestClient.enqueue_request", - tracing_attributes=enqueue_trace_attributes, - ) + MonitoredActivity.invoke(invoker, name_of_span="QueuedIngestClient.enqueue_request", tracing_attributes=enqueue_trace_attributes) return IngestionResult( IngestionStatus.QUEUED, ingestion_properties.database, ingestion_properties.table, blob_descriptor.source_id, blob_descriptor.path diff --git a/azure-kusto-ingest/azure/kusto/ingest/managed_streaming_ingest_client.py b/azure-kusto-ingest/azure/kusto/ingest/managed_streaming_ingest_client.py index 897ae43e..37a99546 100644 --- a/azure-kusto-ingest/azure/kusto/ingest/managed_streaming_ingest_client.py +++ b/azure-kusto-ingest/azure/kusto/ingest/managed_streaming_ingest_client.py @@ -9,7 +9,7 @@ from azure.kusto.data import KustoConnectionStringBuilder from azure.kusto.data.exceptions import KustoApiError, KustoClosedError -from azure.kusto.data._telemetry import KustoTracing +from azure.kusto.data._telemetry import MonitoredActivity from . import BlobDescriptor, FileDescriptor, IngestionProperties, StreamDescriptor @@ -123,13 +123,11 @@ def ingest_from_stream(self, stream_descriptor: Union[StreamDescriptor, IO[AnySt stream.seek(0, SEEK_SET) client_request_id = ManagedStreamingIngestClient._get_request_id(stream_descriptor.source_id, attempt.retry_state.attempt_number - 1) # trace attempt to ingest from stream - return KustoTracing.call_func_tracing( - self.streaming_client._ingest_from_stream_with_client_request_id, - stream_descriptor, - ingestion_properties, - client_request_id, - name_of_span="ManagedStreamingIngestClient.ingest_from_stream_attempt", + invoker = lambda: self.streaming_client._ingest_from_stream_with_client_request_id( + stream_descriptor, ingestion_properties, client_request_id ) + return MonitoredActivity.invoke(invoker, name_of_span="ManagedStreamingIngestClient.ingest_from_stream_attempt") + except KustoApiError as ex: error = ex.get_api_error() if error.permanent: