Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(telemetry): fix flaky tests #10721

Merged
merged 13 commits into from
Oct 1, 2024
18 changes: 18 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,24 @@ def get_events(self, event_type=None, filter_heartbeats=True):
requests = self.get_requests(event_type, filter_heartbeats)
return [req["body"] for req in requests]

def get_metrics(self, name=None):
metrics = []
for event in self.get_events("generate-metrics"):
for series in event["payload"]["series"]:
if name is None or series["metric"] == name:
metrics.append(series)
metrics.sort(key=lambda x: (x["metric"], x["tags"]), reverse=False)
return metrics

def get_dependencies(self, name=None):
deps = []
for event in self.get_events("app-dependencies-loaded"):
for dep in event["payload"]["dependencies"]:
if name is None or dep["name"] == name:
deps.append(dep)
deps.sort(key=lambda x: x["name"], reverse=False)
return deps


@pytest.fixture
def test_agent_session(telemetry_writer, request):
Expand Down
7 changes: 6 additions & 1 deletion tests/telemetry/test_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
from ddtrace.internal.telemetry.data import get_application
from ddtrace.internal.telemetry.data import get_host_info
from ddtrace.internal.telemetry.data import get_hostname
from ddtrace.internal.telemetry.data import update_imported_dependencies


def test_get_application():
Expand Down Expand Up @@ -173,7 +172,10 @@ def test_get_container_id_when_container_does_not_exists():
assert _get_container_id() == ""


@pytest.mark.subprocess
def test_update_imported_dependencies_both_empty():
from ddtrace.internal.telemetry.data import update_imported_dependencies

already_imported = {}
new_modules = []
res = update_imported_dependencies(already_imported, new_modules)
Expand All @@ -182,9 +184,12 @@ def test_update_imported_dependencies_both_empty():
assert new_modules == []


@pytest.mark.subprocess
def test_update_imported_dependencies():
import xmltodict

from ddtrace.internal.telemetry.data import update_imported_dependencies

already_imported = {}
res = update_imported_dependencies(already_imported, [xmltodict.__name__])
assert len(res) == 1
Expand Down
76 changes: 15 additions & 61 deletions tests/telemetry/test_telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,34 +21,6 @@ def test_enable(test_agent_session, run_python_code_in_subprocess):
assert stderr == b""


@pytest.mark.snapshot
def test_telemetry_enabled_on_first_tracer_flush(test_agent_session, ddtrace_run_python_code_in_subprocess):
"""assert telemetry events are generated after the first trace is flushed to the agent"""

# Submit a trace to the agent in a subprocess
code = """
from ddtrace import tracer

span = tracer.trace("test-telemetry")
span.finish()
"""
env = os.environ.copy()
env["_DD_INSTRUMENTATION_TELEMETRY_TESTS_FORCE_APP_STARTED"] = "true"
_, stderr, status, _ = ddtrace_run_python_code_in_subprocess(code, env=env)
assert status == 0, stderr
assert stderr == b""
# Ensure telemetry events were sent to the agent (snapshot ensures one trace was generated)
# Note event order is reversed e.g. event[0] is actually the last event
events = test_agent_session.get_events()

assert len(events) == 5
assert events[0]["request_type"] == "app-closing"
assert events[1]["request_type"] == "app-dependencies-loaded"
assert events[2]["request_type"] == "app-integrations-change"
assert events[3]["request_type"] == "generate-metrics"
assert events[4]["request_type"] == "app-started"


def test_enable_fork(test_agent_session, run_python_code_in_subprocess):
"""assert app-started/app-closing events are only sent in parent process"""
code = """
Expand Down Expand Up @@ -183,9 +155,6 @@ def test_app_started_error_handled_exception(test_agent_session, run_python_code

from ddtrace import tracer
from ddtrace.filters import TraceFilter
from ddtrace.settings import _config

_config._telemetry_dependency_collection = False

class FailingFilture(TraceFilter):
def process_trace(self, trace):
Expand All @@ -197,10 +166,10 @@ def process_trace(self, trace):
}
)

# generate and encode span
# generate and encode span to trigger sampling failure
tracer.trace("hello").finish()

# force app_started call instead of waiting for periodic()
# force app_started event (instead of waiting for 10 seconds)
from ddtrace.internal.telemetry import telemetry_writer
telemetry_writer._app_started()
"""
Expand Down Expand Up @@ -273,9 +242,6 @@ def test_handled_integration_error(test_agent_session, run_python_code_in_subpro

from ddtrace import patch, tracer
patch(raise_errors=False, sqlite3=True)

# Create a span to start the telemetry writer
tracer.trace("hi").finish()
"""

env = os.environ.copy()
Expand All @@ -294,19 +260,12 @@ def test_handled_integration_error(test_agent_session, run_python_code_in_subpro
)

# Get metric containing the integration error
integration_error = {}
metric_events = test_agent_session.get_events("generate-metrics")
for event in metric_events:
for metric in event["payload"]["series"]:
if metric["metric"] == "integration_errors":
integration_error = metric
break

integration_error = test_agent_session.get_metrics("integration_errors")
# assert the integration metric has the correct type, count, and tags
assert integration_error
assert integration_error["type"] == "count"
assert integration_error["points"][0][1] == 1
assert integration_error["tags"] == ["integration_name:sqlite3", "error_type:attributeerror"]
assert len(integration_error) == 1
assert integration_error[0]["type"] == "count"
assert integration_error[0]["points"][0][1] == 1
assert integration_error[0]["tags"] == ["integration_name:sqlite3", "error_type:attributeerror"]


def test_unhandled_integration_error(test_agent_session, ddtrace_run_python_code_in_subprocess):
Expand Down Expand Up @@ -350,16 +309,13 @@ def test_unhandled_integration_error(test_agent_session, ddtrace_run_python_code
assert "ddtrace/contrib/internal/flask/patch.py:" in flask_integration["error"]
assert "not enough values to unpack (expected 2, got 0)" in flask_integration["error"]

metric_events = [event for event in events if event["request_type"] == "generate-metrics"]

assert len(metric_events) == 1
assert metric_events[0]["payload"]["namespace"] == "tracers"
assert len(metric_events[0]["payload"]["series"]) == 1
assert metric_events[0]["payload"]["series"][0]["metric"] == "integration_errors"
assert metric_events[0]["payload"]["series"][0]["type"] == "count"
assert len(metric_events[0]["payload"]["series"][0]["points"]) == 1
assert metric_events[0]["payload"]["series"][0]["points"][0][1] == 1
assert metric_events[0]["payload"]["series"][0]["tags"] == ["integration_name:flask", "error_type:valueerror"]
error_metrics = test_agent_session.get_metrics("integration_errors")
assert len(error_metrics) == 1
error_metric = error_metrics[0]
assert error_metric["type"] == "count"
assert len(error_metric["points"]) == 1
assert error_metric["points"][0][1] == 1
assert error_metric["tags"] == ["integration_name:flask", "error_type:valueerror"]


def test_app_started_with_install_metrics(test_agent_session, run_python_code_in_subprocess):
Expand All @@ -373,7 +329,7 @@ def test_app_started_with_install_metrics(test_agent_session, run_python_code_in
}
)
# Generate a trace to trigger app-started event
_, stderr, status, _ = run_python_code_in_subprocess("import ddtrace; ddtrace.tracer.trace('s1').finish()", env=env)
_, stderr, status, _ = run_python_code_in_subprocess("import ddtrace", env=env)
assert status == 0, stderr

app_started_event = test_agent_session.get_events("app-started")
Expand All @@ -392,8 +348,6 @@ def test_instrumentation_telemetry_disabled(test_agent_session, run_python_code_

code = """
from ddtrace import tracer
# Create a span to start the telemetry writer
tracer.trace("hi").finish()

# We want to import the telemetry module even when telemetry is disabled.
import sys
Expand Down
42 changes: 13 additions & 29 deletions tests/telemetry/test_telemetry_metrics_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ def parse_payload(data):

def test_telemetry_metrics_enabled_on_gunicorn_child_process(test_agent_session):
token = "tests.telemetry.test_telemetry_metrics_e2e.test_telemetry_metrics_enabled_on_gunicorn_child_process"
initial_event_count = len(test_agent_session.get_events("generate-metrics"))
with gunicorn_server(telemetry_metrics_enabled="true", token=token) as context:
_, gunicorn_client = context

Expand All @@ -86,11 +85,12 @@ def test_telemetry_metrics_enabled_on_gunicorn_child_process(test_agent_session)
response = gunicorn_client.get("/count_metric")
assert response.status_code == 200

metrics = test_agent_session.get_events("generate-metrics")
assert len(metrics) > initial_event_count
assert len(metrics) == 1
assert metrics[0]["payload"]["series"][0]["metric"] == "test_metric"
assert metrics[0]["payload"]["series"][0]["points"][0][1] == 5
# Ensure /count_metric was called 5 times (these counts could be sent in different payloads)
metrics = test_agent_session.get_metrics("test_metric")
count = 0
for metric in metrics:
count += metric["points"][0][1]
assert count == 5


def test_span_creation_and_finished_metrics_datadog(test_agent_session, ddtrace_run_python_code_in_subprocess):
Expand All @@ -104,14 +104,13 @@ def test_span_creation_and_finished_metrics_datadog(test_agent_session, ddtrace_
env["_DD_INSTRUMENTATION_TELEMETRY_TESTS_FORCE_APP_STARTED"] = "true"
_, stderr, status, _ = ddtrace_run_python_code_in_subprocess(code, env=env)
assert status == 0, stderr
metrics_events = test_agent_session.get_events("generate-metrics")
metrics_sc = get_metrics_from_events("spans_created", metrics_events)
metrics_sc = test_agent_session.get_metrics("spans_created")
assert len(metrics_sc) == 1
assert metrics_sc[0]["metric"] == "spans_created"
assert metrics_sc[0]["tags"] == ["integration_name:datadog"]
assert metrics_sc[0]["points"][0][1] == 10

metrics_sf = get_metrics_from_events("spans_finished", metrics_events)
metrics_sf = test_agent_session.get_metrics("spans_finished")
assert len(metrics_sf) == 1
assert metrics_sf[0]["metric"] == "spans_finished"
assert metrics_sf[0]["tags"] == ["integration_name:datadog"]
Expand All @@ -133,15 +132,13 @@ def test_span_creation_and_finished_metrics_otel(test_agent_session, ddtrace_run
_, stderr, status, _ = ddtrace_run_python_code_in_subprocess(code, env=env)
assert status == 0, stderr

metrics_events = test_agent_session.get_events("generate-metrics")

metrics_sc = get_metrics_from_events("spans_created", metrics_events)
metrics_sc = test_agent_session.get_metrics("spans_created")
assert len(metrics_sc) == 1
assert metrics_sc[0]["metric"] == "spans_created"
assert metrics_sc[0]["tags"] == ["integration_name:otel"]
assert metrics_sc[0]["points"][0][1] == 9

metrics_sf = get_metrics_from_events("spans_finished", metrics_events)
metrics_sf = test_agent_session.get_metrics("spans_finished")
assert len(metrics_sf) == 1
assert metrics_sf[0]["metric"] == "spans_finished"
assert metrics_sf[0]["tags"] == ["integration_name:otel"]
Expand All @@ -163,15 +160,13 @@ def test_span_creation_and_finished_metrics_opentracing(test_agent_session, ddtr
_, stderr, status, _ = ddtrace_run_python_code_in_subprocess(code, env=env)
assert status == 0, stderr

metrics_events = test_agent_session.get_events("generate-metrics")

metrics_sc = get_metrics_from_events("spans_created", metrics_events)
metrics_sc = test_agent_session.get_metrics("spans_created")
assert len(metrics_sc) == 1
assert metrics_sc[0]["metric"] == "spans_created"
assert metrics_sc[0]["tags"] == ["integration_name:opentracing"]
assert metrics_sc[0]["points"][0][1] == 2

metrics_sf = get_metrics_from_events("spans_finished", metrics_events)
metrics_sf = test_agent_session.get_metrics("spans_finished")
assert len(metrics_sf) == 1
assert metrics_sf[0]["metric"] == "spans_finished"
assert metrics_sf[0]["tags"] == ["integration_name:opentracing"]
Expand Down Expand Up @@ -202,8 +197,7 @@ def test_span_creation_no_finish(test_agent_session, ddtrace_run_python_code_in_
_, stderr, status, _ = ddtrace_run_python_code_in_subprocess(code, env=env)
assert status == 0, stderr

metrics_events = test_agent_session.get_events("generate-metrics")
metrics = get_metrics_from_events("spans_created", metrics_events)
metrics = test_agent_session.get_metrics("spans_created")
assert len(metrics) == 3

assert metrics[0]["metric"] == "spans_created"
Expand All @@ -215,13 +209,3 @@ def test_span_creation_no_finish(test_agent_session, ddtrace_run_python_code_in_
assert metrics[2]["metric"] == "spans_created"
assert metrics[2]["tags"] == ["integration_name:otel"]
assert metrics[2]["points"][0][1] == 4


def get_metrics_from_events(name, events):
metrics = []
for event in events:
for series in event["payload"]["series"]:
if series["metric"] == name:
metrics.append(series)
metrics.sort(key=lambda x: (x["metric"], x["tags"]), reverse=False)
return metrics
Loading
Loading