Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(dsm): implement protobuf schema tracking #10587

Merged
merged 55 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
ac0e4c2
initial commit
wconti27 Aug 29, 2024
982d502
schemas
wconti27 Aug 29, 2024
12be958
fix method signature
wconti27 Aug 29, 2024
7509d11
lint
wconti27 Aug 29, 2024
57b625a
iniital integration
wconti27 Aug 30, 2024
a4da27e
avro integration
wconti27 Aug 30, 2024
4e5b5d7
add tests
wconti27 Aug 30, 2024
f6a5f17
fix lint
wconti27 Aug 30, 2024
e724425
tests
wconti27 Aug 30, 2024
2dd8761
use active span instead of making new span for avro
wconti27 Sep 3, 2024
0389b45
fix avro reqs files
wconti27 Sep 3, 2024
36245cb
Merge branch 'main' into conti/implement-avro-schemas
wconti27 Sep 3, 2024
5254afc
Merge branch 'main' into conti/implement-dsm-schema-support
wconti27 Sep 3, 2024
d20c7f0
Merge branch 'main' into conti/implement-avro-schemas
wconti27 Sep 3, 2024
084a7da
Merge branch 'conti/implement-dsm-schema-support' into conti/implemen…
wconti27 Sep 3, 2024
0ad0dd9
initial commit
wconti27 Aug 29, 2024
f48cde3
schemas
wconti27 Aug 29, 2024
80052df
fix method signature
wconti27 Aug 29, 2024
ad5c015
lint
wconti27 Aug 29, 2024
50795f3
Merge branch 'conti/implement-dsm-schema-support' of github.com:DataD…
wconti27 Sep 3, 2024
3f4ed18
Merge branch 'conti/implement-dsm-schema-support' into conti/implemen…
wconti27 Sep 3, 2024
92639e7
use contrib job small for avro job base
wconti27 Sep 3, 2024
d1c5109
clean up avro code
wconti27 Sep 3, 2024
7b96ead
Merge branch 'main' into conti/implement-dsm-schema-support
wconti27 Sep 4, 2024
3c6d227
Merge branch 'conti/implement-dsm-schema-support' of github.com:DataD…
wconti27 Sep 4, 2024
25d102b
add release note
wconti27 Sep 4, 2024
40a49b8
Merge branch 'main' into conti/implement-dsm-schema-support
wconti27 Sep 4, 2024
edef06a
Merge branch 'main' into conti/implement-dsm-schema-support
wconti27 Sep 5, 2024
a4481d9
fix lint
wconti27 Sep 5, 2024
8374d48
Merge branch 'conti/implement-dsm-schema-support' into conti/implemen…
wconti27 Sep 5, 2024
e82c4d9
fix suitespec
wconti27 Sep 5, 2024
4012b12
Merge branch 'conti/implement-dsm-schema-support' into conti/implemen…
wconti27 Sep 5, 2024
2705343
add release note
wconti27 Sep 5, 2024
92c7b44
merge main
wconti27 Sep 5, 2024
9bc3232
ensure sampling is used
wconti27 Sep 5, 2024
49e6550
initial commit
wconti27 Sep 5, 2024
5ed2712
fix lint
wconti27 Sep 5, 2024
440553e
fix
wconti27 Sep 5, 2024
7148de2
fix sampling
wconti27 Sep 5, 2024
fc92fbb
Merge branch 'main' into conti/implement-avro-schemas
wconti27 Sep 5, 2024
494c065
merge with main
wconti27 Sep 5, 2024
53e3500
Update releasenotes/notes/implement-schema-extraction-for-avro-3f903a…
wconti27 Sep 9, 2024
39d4544
fixes
wconti27 Sep 9, 2024
7172923
more fixes
wconti27 Sep 9, 2024
8e0f227
Merge branch 'conti/implement-avro-schemas' into conti/implement-prot…
wconti27 Sep 10, 2024
9a3e851
more fixes
wconti27 Sep 10, 2024
a122333
fix suitespec
wconti27 Sep 10, 2024
b6dd28d
merge with main
wconti27 Sep 10, 2024
dd262a0
Merge branch 'main' into conti/implement-protobuf-schemas
wconti27 Sep 10, 2024
a05fd2c
more fixes
wconti27 Sep 10, 2024
ded4c30
add riotfile
wconti27 Sep 10, 2024
09532d6
Merge branch 'main' into conti/implement-protobuf-schemas
wconti27 Sep 10, 2024
ee1e4f8
fix tests
wconti27 Sep 10, 2024
48fd73b
fix protobuf reqs
wconti27 Sep 11, 2024
9f31466
use global for wrapped message classes
wconti27 Sep 12, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .gitlab/tests/contrib.yml
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,11 @@ molten:
variables:
SUITE_NAME: "molten"

protobuf:
extends: .test_base_riot_snapshot
variables:
SUITE_NAME: "protobuf"

opentracer:
extends: .test_base_riot
variables:
Expand Down
22 changes: 22 additions & 0 deletions .riot/requirements/1721018.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#
# This file is autogenerated by pip-compile with Python 3.10
# by the following command:
#
# pip-compile --allow-unsafe --no-annotate .riot/requirements/1721018.in
#
attrs==24.2.0
coverage[toml]==7.6.1
exceptiongroup==1.2.2
hypothesis==6.45.0
iniconfig==2.0.0
mock==5.1.0
opentracing==2.4.0
packaging==24.1
pluggy==1.5.0
protobuf==5.28.0
pytest==8.3.3
pytest-cov==5.0.0
pytest-mock==3.14.0
pytest-randomly==3.15.0
sortedcontainers==2.4.0
tomli==2.0.1
20 changes: 20 additions & 0 deletions .riot/requirements/1bf9721.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#
# This file is autogenerated by pip-compile with Python 3.12
# by the following command:
#
# pip-compile --allow-unsafe --no-annotate .riot/requirements/1bf9721.in
#
attrs==24.2.0
coverage[toml]==7.6.1
hypothesis==6.45.0
iniconfig==2.0.0
mock==5.1.0
opentracing==2.4.0
packaging==24.1
pluggy==1.5.0
protobuf==5.28.0
pytest==8.3.3
pytest-cov==5.0.0
pytest-mock==3.14.0
pytest-randomly==3.15.0
sortedcontainers==2.4.0
20 changes: 20 additions & 0 deletions .riot/requirements/26aada0.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#
# This file is autogenerated by pip-compile with Python 3.11
# by the following command:
#
# pip-compile --allow-unsafe --no-annotate .riot/requirements/26aada0.in
#
attrs==24.2.0
coverage[toml]==7.6.1
hypothesis==6.45.0
iniconfig==2.0.0
mock==5.1.0
opentracing==2.4.0
packaging==24.1
pluggy==1.5.0
protobuf==5.28.0
pytest==8.3.3
pytest-cov==5.0.0
pytest-mock==3.14.0
pytest-randomly==3.15.0
sortedcontainers==2.4.0
24 changes: 24 additions & 0 deletions .riot/requirements/3b28562.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#
# This file is autogenerated by pip-compile with Python 3.9
# by the following command:
#
# pip-compile --allow-unsafe --no-annotate --resolver=backtracking .riot/requirements/3b28562.in
#
attrs==24.2.0
coverage[toml]==7.6.1
exceptiongroup==1.2.2
hypothesis==6.45.0
importlib-metadata==8.4.0
iniconfig==2.0.0
mock==5.1.0
opentracing==2.4.0
packaging==24.1
pluggy==1.5.0
protobuf==5.28.0
pytest==8.3.3
pytest-cov==5.0.0
pytest-mock==3.14.0
pytest-randomly==3.15.0
sortedcontainers==2.4.0
tomli==2.0.1
zipp==3.20.1
24 changes: 24 additions & 0 deletions .riot/requirements/e222783.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#
# This file is autogenerated by pip-compile with Python 3.8
# by the following command:
#
# pip-compile --allow-unsafe --no-annotate --resolver=backtracking .riot/requirements/e222783.in
#
attrs==24.2.0
coverage[toml]==7.6.1
exceptiongroup==1.2.2
hypothesis==6.45.0
importlib-metadata==8.4.0
iniconfig==2.0.0
mock==5.1.0
opentracing==2.4.0
packaging==24.1
pluggy==1.5.0
protobuf==5.28.0
pytest==8.3.3
pytest-cov==5.0.0
pytest-mock==3.14.0
pytest-randomly==3.15.0
sortedcontainers==2.4.0
tomli==2.0.1
zipp==3.20.1
Empty file.
120 changes: 120 additions & 0 deletions ddtrace/contrib/internal/protobuf/patch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
from google import protobuf
from google.protobuf.internal import builder
import wrapt

from ddtrace import config
from ddtrace.internal.utils.wrappers import unwrap
from ddtrace.pin import Pin

from .schema_iterator import SchemaExtractor


config._add(
"protobuf",
dict(),
)


_WRAPPED_MESSAGE_CLASSES = []


def get_version():
# type: () -> str
return getattr(protobuf, "__version__", "")


def patch():
"""Patch the instrumented methods"""
if getattr(protobuf, "_datadog_patch", False):
return
protobuf._datadog_patch = True

_w = wrapt.wrap_function_wrapper

_w("google.protobuf.internal", "builder.BuildTopDescriptorsAndMessages", _traced_build)
Pin().onto(builder)


def unpatch():
if getattr(protobuf, "_datadog_patch", False):
protobuf._datadog_patch = False

unwrap(protobuf.internal.builder, "BuildTopDescriptorsAndMessages")

global _WRAPPED_MESSAGE_CLASSES
for wrapped_message_class in _WRAPPED_MESSAGE_CLASSES:
_unwrap_message(wrapped_message_class)

_WRAPPED_MESSAGE_CLASSES = []


def _unwrap_message(message_class):
unwrap(message_class, "SerializeToString")
unwrap(message_class, "ParseFromString")


def _wrap_message(message_descriptor, message_class):
def serialize_wrapper(wrapped, instance, args, kwargs):
return _traced_serialize_message(wrapped, instance, args, kwargs, msg_descriptor=message_descriptor)

def deserialize_wrapper(wrapped, instance, args, kwargs):
return _traced_deserialize_message(wrapped, instance, args, kwargs, msg_descriptor=message_descriptor)

_w = wrapt.wrap_function_wrapper
_w(message_class, "SerializeToString", serialize_wrapper)
_w(message_class, "ParseFromString", deserialize_wrapper)

global _WRAPPED_MESSAGE_CLASSES
_WRAPPED_MESSAGE_CLASSES.append(message_class)
Pin().onto(message_class)


#
# tracing functions
#
def _traced_build(func, instance, args, kwargs):
file_des = args[0]

pin = Pin.get_from(instance)
if not pin or not pin.enabled():
func(*args, **kwargs)

try:
func(*args, **kwargs)
finally:
if config._data_streams_enabled:
generated_message_classes = args[2]
message_descriptors = file_des.message_types_by_name.items()
for message_idx in range(len(message_descriptors)):
message_class_name = message_descriptors[message_idx][0]
message_descriptor = message_descriptors[message_idx][1]
message_class = generated_message_classes[message_class_name]
_wrap_message(message_descriptor=message_descriptor, message_class=message_class)


def _traced_deserialize_message(func, instance, args, kwargs, msg_descriptor):
pin = Pin.get_from(instance)
if not pin or not pin.enabled():
func(*args, **kwargs)

active = pin.tracer.current_span()

try:
func(*args, **kwargs)
finally:
if config._data_streams_enabled and active:
SchemaExtractor.attach_schema_on_span(msg_descriptor, active, SchemaExtractor.DESERIALIZATION)


def _traced_serialize_message(func, instance, args, kwargs, msg_descriptor):
pin = Pin.get_from(instance)
if not pin or not pin.enabled() or not msg_descriptor:
return func(*args, **kwargs)

active = pin.tracer.current_span()

try:
return func(*args, **kwargs)
finally:
if config._data_streams_enabled and active:
SchemaExtractor.attach_schema_on_span(msg_descriptor, active, SchemaExtractor.SERIALIZATION)
114 changes: 114 additions & 0 deletions ddtrace/contrib/internal/protobuf/schema_iterator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
from google.protobuf.descriptor import FieldDescriptor

# from google._upb._message import Descriptor
from ddtrace._trace.span import Span
from ddtrace.ext import schema as SCHEMA_TAGS
from ddtrace.internal.datastreams import data_streams_processor
from ddtrace.internal.datastreams.schemas.schema_builder import SchemaBuilder
from ddtrace.internal.datastreams.schemas.schema_iterator import SchemaIterator


class SchemaExtractor(SchemaIterator):
SERIALIZATION = "serialization"
DESERIALIZATION = "deserialization"
PROTOBUF = "protobuf"

def __init__(self, schema):
self.schema = schema

@staticmethod
def extract_property(
field: FieldDescriptor, schema_name: str, field_name: str, builder: SchemaBuilder, depth: int
) -> int:
array = False
type_ = None
format_ = None
description = None
ref = None
enum_values = None

type_format = SchemaExtractor.get_type_and_format(field.type)
type_ = type_format[0]
format_ = type_format[1]

if type_ is None and format_ == "Message type":
format_ = None
ref = "#/components/schemas/" + field.message_type.name
if not SchemaExtractor.extract_schema(field.message_type, builder, depth):
return False
elif format_ == "enum":
enum_values = [value.name for value in field.enum_type.values]
return builder.add_property(schema_name, field_name, array, type_, description, ref, format_, enum_values)

@staticmethod
def extract_schema(schema, builder: SchemaBuilder, depth: int) -> bool:
depth += 1
schema_name = schema.name
if not builder.should_extract_schema(schema_name, depth):
return False
try:
for field_name, field_desc in schema.fields_by_name.items():
if not SchemaExtractor.extract_property(field_desc, schema_name, field_name, builder, depth):
return False
except Exception:
return False
return True

@staticmethod
def extract_schemas(descriptor) -> bool:
return data_streams_processor().get_schema(descriptor.name, SchemaExtractor(descriptor))

def iterate_over_schema(self, builder: SchemaBuilder):
self.extract_schema(self.schema, builder, 0)

@staticmethod
def attach_schema_on_span(descriptor, span: Span, operation: str):
if descriptor is None or span is None:
return

span.set_tag(SCHEMA_TAGS.SCHEMA_TYPE, SchemaExtractor.PROTOBUF)
span.set_tag(SCHEMA_TAGS.SCHEMA_NAME, descriptor.name)
span.set_tag(SCHEMA_TAGS.SCHEMA_OPERATION, operation)

if not data_streams_processor().can_sample_schema(operation):
return

prio = span.context.sampling_priority
if prio is None or prio <= 0:
return

weight = data_streams_processor().try_sample_schema(operation)
if weight == 0:
return

schema_data = SchemaExtractor.extract_schemas(descriptor)

span.set_tag(SCHEMA_TAGS.SCHEMA_DEFINITION, schema_data.definition)
span.set_metric(SCHEMA_TAGS.SCHEMA_WEIGHT, weight)
span.set_tag(SCHEMA_TAGS.SCHEMA_ID, schema_data.id)

@staticmethod
def get_type_and_format(type_: int) -> tuple:
type_format_mapping = {
1: ("number", "double"),
2: ("number", "float"),
3: ("integer", "int64"),
4: ("integer", "uint64"),
5: ("integer", "int32"),
6: ("integer", "fixed64"),
7: ("integer", "fixed32"),
8: ("boolean", None),
9: ("string", None),
10: ("object", "Group type"), # Group types are deprecated
11: (None, "Message type"), # Placeholder for handling messages
12: ("string", "byte"),
13: ("integer", "uint32"),
14: ("string", "enum"),
15: ("integer", "sfixed32"),
16: ("integer", "sfixed64"),
17: ("integer", "int32"),
18: ("integer", "int64"),
}

# Default values for unknown types
return type_format_mapping.get(type_, ("string", None))
30 changes: 30 additions & 0 deletions ddtrace/contrib/protobuf/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
"""
The Protobuf integration will trace all Protobuf read / write calls made with the ``google.protobuf``
library. This integration is enabled by default.

Enabling
~~~~~~~~

The protobuf integration is enabled by default. Use
:func:`patch()<ddtrace.patch>` to enable the integration::

from ddtrace import patch
patch(protobuf=True)

Configuration
~~~~~~~~~~~~~

"""
from ...internal.utils.importlib import require_modules


required_modules = ["protobuf"]

with require_modules(required_modules) as missing_modules:
if not missing_modules:
# Expose public methods
from ..internal.protobuf.patch import get_version
from ..internal.protobuf.patch import patch
from ..internal.protobuf.patch import unpatch

__all__ = ["patch", "unpatch", "get_version"]
Loading
Loading