Skip to content

Commit

Permalink
improved readability (#476)
Browse files Browse the repository at this point in the history
* improved readability

* formatted

* remove resource call

* remove with resource call

* condensed Span class

* added tests

* formatted

* formatted

* async fix

* formatted

* function signature

* comments

* merge

* inline

* spelling
  • Loading branch information
enmoed authored May 17, 2023
1 parent 0286495 commit fb0e671
Show file tree
Hide file tree
Showing 11 changed files with 194 additions and 207 deletions.
12 changes: 6 additions & 6 deletions azure-kusto-data/azure/kusto/data/_cloud_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from azure.core.tracing.decorator import distributed_trace
from azure.core.tracing import SpanKind

from ._telemetry import KustoTracingAttributes, KustoTracing
from ._telemetry import Span, MonitoredActivity
from .exceptions import KustoServiceError

METADATA_ENDPOINT = "v1/rest/auth/metadata"
Expand Down Expand Up @@ -78,7 +78,7 @@ def get_cloud_info_for_cluster(cls, kusto_uri: str, proxies: Optional[Dict[str,
kusto_uri = cls._normalize_uri(kusto_uri)

# tracing attributes for cloud info
KustoTracingAttributes.set_cloud_info_attributes(kusto_uri)
Span.set_cloud_info_attributes(kusto_uri)

if kusto_uri in cls._cloud_cache: # Double-checked locking to avoid unnecessary lock access
return cls._cloud_cache[kusto_uri]
Expand All @@ -89,10 +89,10 @@ def get_cloud_info_for_cluster(cls, kusto_uri: str, proxies: Optional[Dict[str,
url = urljoin(kusto_uri, METADATA_ENDPOINT)

# trace http get call for result
http_trace_attributes = KustoTracingAttributes.create_http_attributes(url=url, method="GET")

result = KustoTracing.call_func_tracing(
requests.get, url, proxies=proxies, allow_redirects=False, name_of_span="CloudSettings.http_get", tracing_attributes=http_trace_attributes
result = MonitoredActivity.invoke(
lambda: requests.get(url, proxies=proxies, allow_redirects=False),
name_of_span="CloudSettings.http_get",
tracing_attributes=Span.create_http_attributes(url=url, method="GET"),
)

if result.status_code == 200:
Expand Down
69 changes: 21 additions & 48 deletions azure-kusto-data/azure/kusto/data/_telemetry.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Callable, Optional
from typing import Callable, Optional, TypeVar

from azure.core.settings import settings
from azure.core.tracing.decorator import distributed_trace
Expand All @@ -8,7 +8,7 @@
from .client_request_properties import ClientRequestProperties


class KustoTracingAttributes:
class Span:
"""
Additional ADX attributes for telemetry spans
"""
Expand Down Expand Up @@ -97,58 +97,31 @@ def create_cluster_attributes(cls, cluster_uri: str) -> dict:
return cluster_attributes


class KustoTracing:
@staticmethod
def call_func_tracing(func: Callable, *args, **kwargs):
"""
Prepares function for tracing and calls it
:param func: function to trace
:type func: Callable
:key str name_of_span: name of the trace span
:key dict tracing_attributes: key/value dictionary of attributes to include in span of trace
:key str kind: the type of span
:param kwargs: function arguments
"""
name_of_span: str = kwargs.pop("name_of_span", None)
tracing_attributes: dict = kwargs.pop("tracing_attributes", {})
kind: str = kwargs.pop("kind", SpanKind.CLIENT)
class MonitoredActivity:
"""
Invoker class for telemetry
"""

kusto_trace: Callable = distributed_trace(name_of_span=name_of_span, tracing_attributes=tracing_attributes, kind=kind)
kusto_func: Callable = kusto_trace(func)
return kusto_func(*args, **kwargs)
T = TypeVar("T")

@staticmethod
async def call_func_tracing_async(func: Callable, *args, **kwargs):
def invoke(invoker: Callable[[], T], name_of_span: str = None, tracing_attributes=None, kind: str = SpanKind.INTERNAL) -> T:
"""
Prepares function for tracing and calls it
:param func: function to trace
:type func: Callable
:key str name_of_span: name of the trace span
:key dict tracing_attributes: key/value dictionary of attributes to include in span of trace
:key str kind: the type of span
:param kwargs: function arguments
Runs the span on given function
"""
name_of_span: str = kwargs.pop("name_of_span", None)
tracing_attributes: dict = kwargs.pop("tracing_attributes", {})
kind: str = kwargs.pop("kind", SpanKind.CLIENT)

kusto_trace: Callable = distributed_trace_async(name_of_span=name_of_span, tracing_attributes=tracing_attributes, kind=kind)
kusto_func: Callable = kusto_trace(func)
return await kusto_func(*args, **kwargs)
if tracing_attributes is None:
tracing_attributes = {}
span_shell: Callable = distributed_trace(name_of_span=name_of_span, tracing_attributes=tracing_attributes, kind=kind)
span = span_shell(invoker)
return span()

@staticmethod
def prepare_func_tracing(func: Callable, **kwargs):
async def invoke_async(invoker: Callable[[], T], name_of_span: str = None, tracing_attributes=None, kind: str = SpanKind.INTERNAL) -> T:
"""
Prepares function for tracing
:param func: function to trace
:type func: Callable
:key str name_of_span: name of the trace span
:key dict tracing_attributes: key/value dictionary of attributes to include in span of trace
:key str kind: the type of span
Runs a span on given function
"""
name_of_span: str = kwargs.pop("name_of_span", None)
tracing_attributes: dict = kwargs.pop("tracing_attributes", {})
kind: str = kwargs.pop("kind", SpanKind.CLIENT)

kusto_trace: Callable = distributed_trace(name_of_span=name_of_span, tracing_attributes=tracing_attributes, kind=kind)
return kusto_trace(func)
if tracing_attributes is None:
tracing_attributes = {}
span_shell: Callable = distributed_trace_async(name_of_span=name_of_span, tracing_attributes=tracing_attributes, kind=kind)
span = span_shell(invoker)
return await span()
9 changes: 3 additions & 6 deletions azure-kusto-data/azure/kusto/data/_token_providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from msal import ConfidentialClientApplication, PublicClientApplication

from ._cloud_settings import CloudInfo, CloudSettings
from ._telemetry import KustoTracing
from ._telemetry import MonitoredActivity
from .exceptions import KustoAioSyntaxError, KustoAsyncUsageError, KustoClientError

DeviceCallbackType = Callable[[str, str, datetime], None]
Expand Down Expand Up @@ -153,10 +153,7 @@ def _get_token():
token = self._get_token_from_cache_impl()
if token is None:
with self._lock:
token = KustoTracing.call_func_tracing(
self._get_token_impl, name_of_span=f"{self.name()}.get_token_impl", tracing_attributes=self.context()
)

token = MonitoredActivity.invoke(self._get_token_impl, name_of_span=f"{self.name()}.get_token_impl", tracing_attributes=self.context())
return self._valid_token_or_throw(token)

return _get_token()
Expand Down Expand Up @@ -190,7 +187,7 @@ async def _get_token_async():

if token is None:
async with self._async_lock:
token = await KustoTracing.call_func_tracing_async(
token = await MonitoredActivity.invoke_async(
self._get_token_impl_async, name_of_span=f"{self.name()}.get_token_impl_async", tracing_attributes=context
)

Expand Down
34 changes: 13 additions & 21 deletions azure-kusto-data/azure/kusto/data/aio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from .response import KustoStreamingResponseDataSet
from .._decorators import aio_documented_by, documented_by
from .._telemetry import KustoTracing, KustoTracingAttributes
from .._telemetry import MonitoredActivity, Span
from ..aio.streaming_response import JsonTokenReader, StreamingDataSetEnumerator
from ..client import KustoClient as KustoClientSync
from ..client_base import ExecuteRequestParams, _KustoClientBase
Expand Down Expand Up @@ -51,17 +51,17 @@ async def execute(self, database: Optional[str], query: str, properties: ClientR

@distributed_trace_async(name_of_span="KustoClient.query_cmd", kind=SpanKind.CLIENT)
@aio_documented_by(KustoClientSync.execute_query)
async def execute_query(self, database: Optional[str], query: str, properties: ClientRequestProperties = None) -> KustoResponseDataSet:
async def execute_query(self, database: str, query: str, properties: ClientRequestProperties = None) -> KustoResponseDataSet:
database = self._get_database_or_default(database)
KustoTracingAttributes.set_query_attributes(self._kusto_cluster, database, properties)
Span.set_query_attributes(self._kusto_cluster, database, properties)

return await self._execute(self._query_endpoint, database, query, None, KustoClient._query_default_timeout, properties)

@distributed_trace_async(name_of_span="KustoClient.control_cmd", kind=SpanKind.CLIENT)
@aio_documented_by(KustoClientSync.execute_mgmt)
async def execute_mgmt(self, database: Optional[str], query: str, properties: ClientRequestProperties = None) -> KustoResponseDataSet:
async def execute_mgmt(self, database: str, query: str, properties: ClientRequestProperties = None) -> KustoResponseDataSet:
database = self._get_database_or_default(database)
KustoTracingAttributes.set_query_attributes(self._kusto_cluster, database, properties)
Span.set_query_attributes(self._kusto_cluster, database, properties)

return await self._execute(self._mgmt_endpoint, database, query, None, KustoClient._mgmt_default_timeout, properties)

Expand All @@ -77,7 +77,7 @@ async def execute_streaming_ingest(
mapping_name: str = None,
):
database = self._get_database_or_default(database)
KustoTracingAttributes.set_streaming_ingest_attributes(self._kusto_cluster, database, table, properties)
Span.set_streaming_ingest_attributes(self._kusto_cluster, database, table, properties)

stream_format = stream_format.kusto_value if isinstance(stream_format, DataFormat) else DataFormat[stream_format.upper()].kusto_value
endpoint = self._streaming_ingest_endpoint + database + "/" + table + "?streamFormat=" + stream_format
Expand Down Expand Up @@ -107,7 +107,7 @@ async def execute_streaming_query(
properties: Optional[ClientRequestProperties] = None,
) -> KustoStreamingResponseDataSet:
database = self._get_database_or_default(database)
KustoTracingAttributes.set_query_attributes(self._kusto_cluster, database, properties)
Span.set_query_attributes(self._kusto_cluster, database, properties)

response = await self._execute_streaming_query_parsed(database, query, timeout, properties)
return KustoStreamingResponseDataSet(response)
Expand Down Expand Up @@ -144,18 +144,11 @@ async def _execute(
if self._aad_helper:
request_headers["Authorization"] = await self._aad_helper.acquire_authorization_header_async()

http_trace_attributes = KustoTracingAttributes.create_http_attributes(url=endpoint, method="POST", headers=request_headers)
response = await KustoTracing.call_func_tracing_async(
self._session.post,
endpoint,
headers=request_headers,
json=json_payload,
data=payload,
timeout=timeout.seconds,
proxy=self._proxy_url,
name_of_span="KustoClient.http_post",
tracing_attributes=http_trace_attributes,
allow_redirects=False,
invoker = lambda: self._session.post(
endpoint, headers=request_headers, json=json_payload, data=payload, timeout=timeout.seconds, proxy=self._proxy_url, allow_redirects=False
)
response = await MonitoredActivity.invoke_async(
invoker, name_of_span="KustoClient.http_post", tracing_attributes=Span.create_http_attributes("POST", endpoint, request_headers)
)

if stream_response:
Expand Down Expand Up @@ -188,5 +181,4 @@ async def _execute(
except Exception:
response_text = None
raise self._handle_http_error(e, endpoint, payload, response, response.status, response_json, response_text)

return KustoTracing.call_func_tracing(self._kusto_parse_by_endpoint, endpoint, response_json, name_of_span="KustoClient.processing_response")
return MonitoredActivity.invoke(lambda: self._kusto_parse_by_endpoint(endpoint, response_json), name_of_span="KustoClient.processing_response")
21 changes: 10 additions & 11 deletions azure-kusto-data/azure/kusto/data/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from azure.core.tracing.decorator import distributed_trace
from azure.core.tracing import SpanKind

from azure.kusto.data._telemetry import KustoTracingAttributes, KustoTracing
from azure.kusto.data._telemetry import Span, MonitoredActivity

from .client_base import ExecuteRequestParams, _KustoClientBase
from .client_request_properties import ClientRequestProperties
Expand Down Expand Up @@ -171,7 +171,7 @@ def execute_query(self, database: Optional[str], query: str, properties: Optiona
:rtype: azure.kusto.data.response.KustoResponseDataSet
"""
database = self._get_database_or_default(database)
KustoTracingAttributes.set_query_attributes(self._kusto_cluster, database, properties)
Span.set_query_attributes(self._kusto_cluster, database, properties)

return self._execute(self._query_endpoint, database, query, None, self._query_default_timeout, properties)

Expand All @@ -187,7 +187,7 @@ def execute_mgmt(self, database: Optional[str], query: str, properties: Optional
:rtype: azure.kusto.data.response.KustoResponseDataSet
"""
database = self._get_database_or_default(database)
KustoTracingAttributes.set_query_attributes(self._kusto_cluster, database, properties)
Span.set_query_attributes(self._kusto_cluster, database, properties)

return self._execute(self._mgmt_endpoint, database, query, None, self._mgmt_default_timeout, properties)

Expand All @@ -214,7 +214,7 @@ def execute_streaming_ingest(
:param str mapping_name: Pre-defined mapping of the table. Required when stream_format is json/avro.
"""
database = self._get_database_or_default(database)
KustoTracingAttributes.set_streaming_ingest_attributes(self._kusto_cluster, database, table, properties)
Span.set_streaming_ingest_attributes(self._kusto_cluster, database, table, properties)

stream_format = stream_format.kusto_value if isinstance(stream_format, DataFormat) else DataFormat[stream_format.upper()].kusto_value
endpoint = self._streaming_ingest_endpoint + database + "/" + table + "?streamFormat=" + stream_format
Expand Down Expand Up @@ -252,7 +252,7 @@ def execute_streaming_query(
:param azure.kusto.data.ClientRequestProperties properties: Optional additional properties.
:return KustoStreamingResponseDataSet:
"""
KustoTracingAttributes.set_query_attributes(self._kusto_cluster, database, properties)
Span.set_query_attributes(self._kusto_cluster, database, properties)

return KustoStreamingResponseDataSet(self._execute_streaming_query_parsed(database, query, timeout, properties))

Expand Down Expand Up @@ -288,18 +288,17 @@ def _execute(
request_headers["Authorization"] = self._aad_helper.acquire_authorization_header()

# trace http post call for response
http_trace_attributes = KustoTracingAttributes.create_http_attributes(url=endpoint, method="POST", headers=request_headers)
response = KustoTracing.call_func_tracing(
self._session.post,
invoker = lambda: self._session.post(
endpoint,
headers=request_headers,
json=json_payload,
data=payload,
timeout=timeout.seconds,
stream=stream_response,
allow_redirects=False,
name_of_span="KustoClient.http_post",
tracing_attributes=http_trace_attributes,
)
response = MonitoredActivity.invoke(
invoker, name_of_span="KustoClient.http_post", tracing_attributes=Span.create_http_attributes("POST", endpoint, request_headers)
)

if stream_response:
Expand All @@ -320,4 +319,4 @@ def _execute(
except Exception as e:
raise self._handle_http_error(e, endpoint, payload, response, response.status_code, response_json, response.text)
# trace response processing
return KustoTracing.call_func_tracing(self._kusto_parse_by_endpoint, endpoint, response_json, name_of_span="KustoClient.processing_response")
return MonitoredActivity.invoke(lambda: self._kusto_parse_by_endpoint(endpoint, response_json), name_of_span="KustoClient.processing_response")
2 changes: 1 addition & 1 deletion azure-kusto-data/tests/aio/test_async_token_providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ async def test_app_key_provider(self):
@pytest.mark.asyncio
async def test_app_cert_provider(self):
# default details are for kusto-client-e2e-test-app
# to run the test download the certs from Azure Portal
# to invoke the test download the certs from Azure Portal
cert_app_id = os.environ.get("CERT_APP_ID", "b699d721-4f6f-4320-bc9a-88d578dfe68f")
cert_auth = os.environ.get("CERT_AUTH", "72f988bf-86f1-41af-91ab-2d7cd011db47")
thumbprint = os.environ.get("CERT_THUMBPRINT")
Expand Down
Loading

0 comments on commit fb0e671

Please sign in to comment.