Skip to content

Commit

Permalink
memcpy approach fastest benchmark
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-jopel committed Oct 18, 2024
1 parent 5d474c7 commit 265716f
Show file tree
Hide file tree
Showing 13 changed files with 947 additions and 1,160 deletions.
4 changes: 2 additions & 2 deletions compile/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class ProtoTypeDescriptor:
FieldDescriptorProto.TYPE_DOUBLE: ProtoTypeDescriptor("double", WireType.I64, "float", "0.0"),
FieldDescriptorProto.TYPE_STRING: ProtoTypeDescriptor("string", WireType.LEN, "str", '""'),
FieldDescriptorProto.TYPE_BYTES: ProtoTypeDescriptor("bytes", WireType.LEN, "bytes", 'b""'),
FieldDescriptorProto.TYPE_MESSAGE: ProtoTypeDescriptor("message", WireType.LEN, "MessageMarshaler", 'None'),
FieldDescriptorProto.TYPE_MESSAGE: ProtoTypeDescriptor("message", WireType.LEN, "bytes", 'None'),
}

@dataclass
Expand Down Expand Up @@ -179,7 +179,7 @@ def sort_key(field: Union[FieldTemplate, OneOfTemplate]):
if isinstance(field, FieldTemplate):
return field.number
return field.fields[0].number
fields.sort(key=sort_key, reverse=True)
fields.sort(key=sort_key)

return MessageTemplate(
name=descriptor.name,
Expand Down
45 changes: 17 additions & 28 deletions compile/templates/template.py.jinja2
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@
from snowflake.telemetry._internal.serialize import (
Enum,
ProtoSerializer,
MessageMarshaler,
)
from typing import List
from typing import List, Optional

{% for enum in file_template.enums %}
class {{ enum.name }}(Enum):
Expand All @@ -17,52 +16,42 @@ class {{ enum.name }}(Enum):
{% endfor %}

{% macro render_message(message) %}
class {{ message.name }}(MessageMarshaler):
def __init__(
self,
{% for field in message.fields|reverse %}
{% if field.fields is defined %}
{% for oneof_field in field.fields|reverse %}
{{ oneof_field.name }}: {{ oneof_field.python_type }} = {{ oneof_field.default }},
{% endfor %}
{% else %}
{{ field.name }}: {{ field.python_type }} = {{ field.default }},
{% endif %}
{% endfor %}
):
{%- for field in message.fields|reverse %}
def {{ message.name }}(
{%- for field in message.fields %}
{%- if field.fields is defined %}
{%- for oneof_field in field.fields|reverse %}
self.{{ oneof_field.name }} = {{ oneof_field.name }}
{%- for oneof_field in field.fields %}
{{ oneof_field.name }}: Optional[{{ oneof_field.python_type }}] = None,
{%- endfor %}
{%- else %}
self.{{ field.name }} = {{ field.name }}
{{ field.name }}: Optional[{{ field.python_type }}] = None,
{%- endif %}
{%- endfor %}
def write_to(self, proto_serializer: ProtoSerializer) -> None:
) -> bytes:
proto_serializer = ProtoSerializer()
{%- for field in message.fields %}
{%- if field.fields is defined %}
# oneof group {{ field.name }}
# oneof group {{ field.name }}
{%- for oneof_field in field.fields %}
{% if loop.index != 1 %}el{% endif %}if self.{{ oneof_field.name }} is not None:
proto_serializer.serialize_{{ oneof_field.proto_type }}({{ oneof_field.tag }}, self.{{ oneof_field.name }})
{% if loop.index != 1 %}el{% endif %}if {{ oneof_field.name }} is not None:
proto_serializer.serialize_{{ oneof_field.proto_type }}({{ oneof_field.tag }}, {{ oneof_field.name }})
{%- endfor %}
{%- else %}
if self.{{ field.name }}: proto_serializer.serialize_{{ field.proto_type }}({{ field.tag }}, self.{{ field.name }})
if {{ field.name }}{% if field.proto_type == "message" %} is not None{% endif %}:
proto_serializer.serialize_{{ field.proto_type }}({{ field.tag }}, {{ field.name }})
{%- endif %}
{%- endfor %}
return proto_serializer.out

{% for nested_enum in message.enums %}
class {{ nested_enum.name }}(Enum):
class {{ nested_enum.name }}(Enum):
{%- for value in nested_enum.values %}
{{ value.name }} = {{ value.number }}
{{ value.name }} = {{ value.number }}
{%- endfor %}
{% endfor %}

{% for nested_message in message.messages %}
{%- set nested_message_result = render_message(nested_message) -%}
{{ nested_message_result | indent(4) }}
{{ nested_message_result }}
{% endfor %}
{% endmacro %}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,44 +188,37 @@ def encode_metrics(data: MetricsData) -> ExportMetricsServiceRequest:

for scope_metrics in resource_metrics.scope_metrics:

instrumentation_scope = scope_metrics.scope

# The SDK groups metrics in instrumentation scopes already so
# there is no need to check for existing instrumentation scopes
# here.
pb2_scope_metrics = pb2.ScopeMetrics(
scope=InstrumentationScope(
name=instrumentation_scope.name,
version=instrumentation_scope.version,
)
)

scope_metrics_dict[instrumentation_scope] = pb2_scope_metrics
pb2_scope_metrics.metrics = []
pb2_metrics = []
pb2_metric_gauge = None
pb2_metric_histogram = None
pb2_metric_sum = None
pb2_metric_exponential_histogram = None
for metric in scope_metrics.metrics:
pb2_metric = pb2.Metric(
name=metric.name,
description=metric.description,
unit=metric.unit,
)

if isinstance(metric.data, Gauge):
pb2_metric.gauge = pb2.Gauge(data_points=[])
pb2_data_points = []
for data_point in metric.data.data_points:
as_int = None
as_double = None
if isinstance(data_point.value, int):
as_int = data_point.value
else:
as_double = data_point.value

pt = pb2.NumberDataPoint(
attributes=_encode_attributes(
data_point.attributes
),
time_unix_nano=data_point.time_unix_nano,
as_int=as_int,
as_double=as_double,
)
if isinstance(data_point.value, int):
pt.as_int = data_point.value
else:
pt.as_double = data_point.value
pb2_metric.gauge.data_points.append(pt)
pb2_data_points.append(pt)

pb2_metric_gauge = pb2.Gauge(data_points=pb2_data_points)

elif isinstance(metric.data, HistogramType):
pb2_metric.histogram = pb2.Histogram(data_points=[])
pb2_data_points = []
pb2_aggregation_temporality = None
for data_point in metric.data.data_points:
pt = pb2.HistogramDataPoint(
attributes=_encode_attributes(
Expand All @@ -242,14 +235,26 @@ def encode_metrics(data: MetricsData) -> ExportMetricsServiceRequest:
max=data_point.max,
min=data_point.min,
)
pb2_metric.histogram.aggregation_temporality = (
pb2_aggregation_temporality = (
metric.data.aggregation_temporality
)
pb2_metric.histogram.data_points.append(pt)
pb2_data_points.append(pt)
pb2_metric_histogram = pb2.Histogram(
data_points=pb2_data_points,
aggregation_temporality=pb2_aggregation_temporality
)

elif isinstance(metric.data, Sum):
pb2_metric.sum = pb2.Sum(data_points=[])
pb2_data_points = []
pb2_is_monotonic = None
pb2_aggregation_temporality = None
for data_point in metric.data.data_points:
as_int = None
as_double = None
if isinstance(data_point.value, int):
as_int = data_point.value
else:
as_double = data_point.value
pt = pb2.NumberDataPoint(
attributes=_encode_attributes(
data_point.attributes
Expand All @@ -258,36 +263,38 @@ def encode_metrics(data: MetricsData) -> ExportMetricsServiceRequest:
data_point.start_time_unix_nano
),
time_unix_nano=data_point.time_unix_nano,
as_int=as_int,
as_double=as_double,
)
if isinstance(data_point.value, int):
pt.as_int = data_point.value
else:
pt.as_double = data_point.value
# note that because sum is a message type, the
# fields must be set individually rather than
# instantiating a pb2.Sum and setting it once
pb2_metric.sum.aggregation_temporality = (
pb2_aggregation_temporality = (
metric.data.aggregation_temporality
)
pb2_metric.sum.is_monotonic = metric.data.is_monotonic
pb2_metric.sum.data_points.append(pt)
pb2_is_monotonic = metric.data.is_monotonic
pb2_data_points.append(pt)
pb2_metric_sum = pb2.Sum(
data_points=pb2_data_points,
aggregation_temporality=pb2_aggregation_temporality,
is_monotonic=pb2_is_monotonic,
)

elif isinstance(metric.data, ExponentialHistogramType):
pb2_metric.exponential_histogram = pb2.ExponentialHistogram(
data_points=[]
)
pb2_data_points = []
pb2_aggregation_temporality = None
for data_point in metric.data.data_points:

if data_point.positive.bucket_counts:
positive = pb2.ExponentialHistogramDataPoint.Buckets(
positive = pb2.Buckets(
offset=data_point.positive.offset,
bucket_counts=data_point.positive.bucket_counts,
)
else:
positive = None

if data_point.negative.bucket_counts:
negative = pb2.ExponentialHistogramDataPoint.Buckets(
negative = pb2.Buckets(
offset=data_point.negative.offset,
bucket_counts=data_point.negative.bucket_counts,
)
Expand All @@ -312,10 +319,15 @@ def encode_metrics(data: MetricsData) -> ExportMetricsServiceRequest:
max=data_point.max,
min=data_point.min,
)
pb2_metric.exponential_histogram.aggregation_temporality = (
pb2_aggregation_temporality = (
metric.data.aggregation_temporality
)
pb2_metric.exponential_histogram.data_points.append(pt)
pb2_data_points.append(pt)

pb2_metric_exponential_histogram = pb2.ExponentialHistogram(
data_points=pb2_data_points,
aggregation_temporality=pb2_aggregation_temporality,
)

else:
_logger.warning(
Expand All @@ -324,7 +336,34 @@ def encode_metrics(data: MetricsData) -> ExportMetricsServiceRequest:
)
continue

pb2_scope_metrics.metrics.append(pb2_metric)

pb2_metric = pb2.Metric(
name=metric.name,
description=metric.description,
unit=metric.unit,
gauge=pb2_metric_gauge,
histogram=pb2_metric_histogram,
sum=pb2_metric_sum,
exponential_histogram=pb2_metric_exponential_histogram,
)

pb2_metrics.append(pb2_metric)


instrumentation_scope = scope_metrics.scope

# The SDK groups metrics in instrumentation scopes already so
# there is no need to check for existing instrumentation scopes
# here.
pb2_scope_metrics = pb2.ScopeMetrics(
scope=InstrumentationScope(
name=instrumentation_scope.name,
version=instrumentation_scope.version,
),
metrics=pb2_metrics,
)

scope_metrics_dict[instrumentation_scope] = pb2_scope_metrics

resource_data = []
for (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,22 +28,25 @@
)
from snowflake.telemetry._internal.opentelemetry.proto.trace.v1.trace import (
ResourceSpans as PB2ResourceSpans,
ScopeSpans as PB2ScopeSpans,
Span as PB2Span,
SpanFlags as PB2SpanFlags,
Status as PB2Status,
SpanKind as PB2SpanKind,
Link as PB2SpanLink,
Event as PB2SpanEvent,
)
from snowflake.telemetry._internal.opentelemetry.proto.trace.v1.trace import ScopeSpans as PB2ScopeSpans
from snowflake.telemetry._internal.opentelemetry.proto.trace.v1.trace import Span as PB2SPan
from snowflake.telemetry._internal.opentelemetry.proto.trace.v1.trace import SpanFlags as PB2SpanFlags
from snowflake.telemetry._internal.opentelemetry.proto.trace.v1.trace import Status as PB2Status
from opentelemetry.sdk.trace import Event, ReadableSpan
from opentelemetry.trace import Link, SpanKind
from opentelemetry.trace.span import SpanContext, Status, TraceState

# pylint: disable=E1101
_SPAN_KIND_MAP = {
SpanKind.INTERNAL: PB2SPan.SpanKind.SPAN_KIND_INTERNAL,
SpanKind.SERVER: PB2SPan.SpanKind.SPAN_KIND_SERVER,
SpanKind.CLIENT: PB2SPan.SpanKind.SPAN_KIND_CLIENT,
SpanKind.PRODUCER: PB2SPan.SpanKind.SPAN_KIND_PRODUCER,
SpanKind.CONSUMER: PB2SPan.SpanKind.SPAN_KIND_CONSUMER,
SpanKind.INTERNAL: PB2SpanKind.SPAN_KIND_INTERNAL,
SpanKind.SERVER: PB2SpanKind.SPAN_KIND_SERVER,
SpanKind.CLIENT: PB2SpanKind.SPAN_KIND_CLIENT,
SpanKind.PRODUCER: PB2SpanKind.SPAN_KIND_PRODUCER,
SpanKind.CONSUMER: PB2SpanKind.SPAN_KIND_CONSUMER,
}

_logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -109,9 +112,9 @@ def _span_flags(parent_span_context: Optional[SpanContext]) -> int:
return flags


def _encode_span(sdk_span: ReadableSpan) -> PB2SPan:
def _encode_span(sdk_span: ReadableSpan) -> PB2Span:
span_context = sdk_span.get_span_context()
return PB2SPan(
return PB2Span(
trace_id=_encode_trace_id(span_context.trace_id),
span_id=_encode_span_id(span_context.span_id),
trace_state=_encode_trace_state(span_context.trace_state),
Expand All @@ -133,12 +136,12 @@ def _encode_span(sdk_span: ReadableSpan) -> PB2SPan:

def _encode_events(
events: Sequence[Event],
) -> Optional[List[PB2SPan.Event]]:
) -> Optional[List[PB2SpanEvent]]:
pb2_events = None
if events:
pb2_events = []
for event in events:
encoded_event = PB2SPan.Event(
encoded_event = PB2SpanEvent(
name=event.name,
time_unix_nano=event.timestamp,
attributes=_encode_attributes(event.attributes),
Expand All @@ -148,12 +151,12 @@ def _encode_events(
return pb2_events


def _encode_links(links: Sequence[Link]) -> Sequence[PB2SPan.Link]:
def _encode_links(links: Sequence[Link]) -> Sequence[PB2SpanLink]:
pb2_links = None
if links:
pb2_links = []
for link in links:
encoded_link = PB2SPan.Link(
encoded_link = PB2SpanLink(
trace_id=_encode_trace_id(link.context.trace_id),
span_id=_encode_span_id(link.context.span_id),
attributes=_encode_attributes(link.attributes),
Expand Down
Loading

0 comments on commit 265716f

Please sign in to comment.