Skip to content

Commit

Permalink
internalize protobuf serialization
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-jopel committed Sep 6, 2024
1 parent 2736fad commit 5f36abd
Show file tree
Hide file tree
Showing 30 changed files with 2,782 additions and 0 deletions.
542 changes: 542 additions & 0 deletions src/betterproto/__init__.py

Large diffs are not rendered by default.

Empty file added src/grpclib/__init__.py
Empty file.
18 changes: 18 additions & 0 deletions src/opentelemetry/exporter/otlp/proto/common/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from opentelemetry.exporter.otlp.proto.common.version import __version__

__all__ = ["__version__"]
178 changes: 178 additions & 0 deletions src/opentelemetry/exporter/otlp/proto/common/_internal/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import logging
from collections.abc import Sequence
from itertools import count
from typing import (
Any,
Mapping,
Optional,
List,
Callable,
TypeVar,
Dict,
Iterator,
)

from opentelemetry.sdk.util.instrumentation import InstrumentationScope
from opentelemetry.proto.common.v1.common_pb2 import (
InstrumentationScope as PB2InstrumentationScope,
)
from opentelemetry.proto.resource.v1.resource_pb2 import (
Resource as PB2Resource,
)
from opentelemetry.proto.common.v1.common_pb2 import AnyValue as PB2AnyValue
from opentelemetry.proto.common.v1.common_pb2 import KeyValue as PB2KeyValue
from opentelemetry.proto.common.v1.common_pb2 import (
KeyValueList as PB2KeyValueList,
)
from opentelemetry.proto.common.v1.common_pb2 import (
ArrayValue as PB2ArrayValue,
)
from opentelemetry.sdk.trace import Resource
from opentelemetry.util.types import Attributes

_logger = logging.getLogger(__name__)

_TypingResourceT = TypeVar("_TypingResourceT")
_ResourceDataT = TypeVar("_ResourceDataT")


def _encode_instrumentation_scope(
instrumentation_scope: InstrumentationScope,
) -> PB2InstrumentationScope:
if instrumentation_scope is None:
return PB2InstrumentationScope()
return PB2InstrumentationScope(
name=instrumentation_scope.name,
version=instrumentation_scope.version,
)


def _encode_resource(resource: Resource) -> PB2Resource:
return PB2Resource(attributes=_encode_attributes(resource.attributes))


def _encode_value(value: Any) -> PB2AnyValue:
if isinstance(value, bool):
return PB2AnyValue(bool_value=value)
if isinstance(value, str):
return PB2AnyValue(string_value=value)
if isinstance(value, int):
return PB2AnyValue(int_value=value)
if isinstance(value, float):
return PB2AnyValue(double_value=value)
if isinstance(value, bytes):
return PB2AnyValue(bytes_value=value)
if isinstance(value, Sequence):
return PB2AnyValue(
array_value=PB2ArrayValue(values=[_encode_value(v) for v in value])
)
elif isinstance(value, Mapping):
return PB2AnyValue(
kvlist_value=PB2KeyValueList(
values=[_encode_key_value(str(k), v) for k, v in value.items()]
)
)
raise Exception(f"Invalid type {type(value)} of value {value}")


def _encode_key_value(key: str, value: Any) -> PB2KeyValue:
return PB2KeyValue(key=key, value=_encode_value(value))


def _encode_span_id(span_id: int) -> bytes:
return span_id.to_bytes(length=8, byteorder="big", signed=False)


def _encode_trace_id(trace_id: int) -> bytes:
return trace_id.to_bytes(length=16, byteorder="big", signed=False)


def _encode_attributes(
attributes: Attributes,
) -> Optional[List[PB2KeyValue]]:
if attributes:
pb2_attributes = []
for key, value in attributes.items():
# pylint: disable=broad-exception-caught
try:
pb2_attributes.append(_encode_key_value(key, value))
except Exception as error:
_logger.exception("Failed to encode key %s: %s", key, error)
else:
pb2_attributes = None
return pb2_attributes


def _get_resource_data(
sdk_resource_scope_data: Dict[Resource, _ResourceDataT],
resource_class: Callable[..., _TypingResourceT],
name: str,
) -> List[_TypingResourceT]:
resource_data = []

for (
sdk_resource,
scope_data,
) in sdk_resource_scope_data.items():
collector_resource = PB2Resource(
attributes=_encode_attributes(sdk_resource.attributes)
)
resource_data.append(
resource_class(
**{
"resource": collector_resource,
"scope_{}".format(name): scope_data.values(),
}
)
)
return resource_data


def _create_exp_backoff_generator(max_value: int = 0) -> Iterator[int]:
"""
Generates an infinite sequence of exponential backoff values. The sequence starts
from 1 (2^0) and doubles each time (2^1, 2^2, 2^3, ...). If a max_value is specified
and non-zero, the generated values will not exceed this maximum, capping at max_value
instead of growing indefinitely.
Parameters:
- max_value (int, optional): The maximum value to yield. If 0 or not provided, the
sequence grows without bound.
Returns:
Iterator[int]: An iterator that yields the exponential backoff values, either uncapped or
capped at max_value.
Example:
```
gen = _create_exp_backoff_generator(max_value=10)
for _ in range(5):
print(next(gen))
```
This will print:
1
2
4
8
10
Note: this functionality used to be handled by the 'backoff' package.
"""
for i in count(0):
out = 2**i
yield min(out, max_value) if max_value else out
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from collections import defaultdict
from typing import Sequence, List

from opentelemetry.exporter.otlp.proto.common._internal import (
_encode_instrumentation_scope,
_encode_resource,
_encode_span_id,
_encode_trace_id,
_encode_value,
_encode_attributes,
)
from opentelemetry.proto.collector.logs.v1.logs_service_pb2 import (
ExportLogsServiceRequest,
)
from opentelemetry.proto.logs.v1.logs_pb2 import (
ScopeLogs,
ResourceLogs,
)
from opentelemetry.proto.logs.v1.logs_pb2 import LogRecord as PB2LogRecord

from opentelemetry.sdk._logs import LogData


def encode_logs(batch: Sequence[LogData]) -> ExportLogsServiceRequest:
return ExportLogsServiceRequest(resource_logs=_encode_resource_logs(batch))


def _encode_log(log_data: LogData) -> PB2LogRecord:
span_id = (
None
if log_data.log_record.span_id == 0
else _encode_span_id(log_data.log_record.span_id)
)
trace_id = (
None
if log_data.log_record.trace_id == 0
else _encode_trace_id(log_data.log_record.trace_id)
)
return PB2LogRecord(
time_unix_nano=log_data.log_record.timestamp,
observed_time_unix_nano=log_data.log_record.observed_timestamp,
span_id=span_id,
trace_id=trace_id,
flags=int(log_data.log_record.trace_flags),
body=_encode_value(log_data.log_record.body),
severity_text=log_data.log_record.severity_text,
attributes=_encode_attributes(log_data.log_record.attributes),
dropped_attributes_count=log_data.log_record.dropped_attributes,
severity_number=log_data.log_record.severity_number.value,
)


def _encode_resource_logs(batch: Sequence[LogData]) -> List[ResourceLogs]:
sdk_resource_logs = defaultdict(lambda: defaultdict(list))

for sdk_log in batch:
sdk_resource = sdk_log.log_record.resource
sdk_instrumentation = sdk_log.instrumentation_scope or None
pb2_log = _encode_log(sdk_log)

sdk_resource_logs[sdk_resource][sdk_instrumentation].append(pb2_log)

pb2_resource_logs = []

for sdk_resource, sdk_instrumentations in sdk_resource_logs.items():
scope_logs = []
for sdk_instrumentation, pb2_logs in sdk_instrumentations.items():
scope_logs.append(
ScopeLogs(
scope=(_encode_instrumentation_scope(sdk_instrumentation)),
log_records=pb2_logs,
)
)
pb2_resource_logs.append(
ResourceLogs(
resource=_encode_resource(sdk_resource),
scope_logs=scope_logs,
schema_url=sdk_resource.schema_url,
)
)

return pb2_resource_logs
Loading

0 comments on commit 5f36abd

Please sign in to comment.