diff --git a/tests/conftest.py b/tests/conftest.py index 27e53842411..82638f4f43f 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -558,6 +558,24 @@ def get_events(self, event_type=None, filter_heartbeats=True): requests = self.get_requests(event_type, filter_heartbeats) return [req["body"] for req in requests] + def get_metrics(self, name=None): + metrics = [] + for event in self.get_events("generate-metrics"): + for series in event["payload"]["series"]: + if name is None or series["metric"] == name: + metrics.append(series) + metrics.sort(key=lambda x: (x["metric"], x["tags"]), reverse=False) + return metrics + + def get_dependencies(self, name=None): + deps = [] + for event in self.get_events("app-dependencies-loaded"): + for dep in event["payload"]["dependencies"]: + if name is None or dep["name"] == name: + deps.append(dep) + deps.sort(key=lambda x: x["name"], reverse=False) + return deps + @pytest.fixture def test_agent_session(telemetry_writer, request): diff --git a/tests/telemetry/test_data.py b/tests/telemetry/test_data.py index 0b6cf354233..7c607287d26 100644 --- a/tests/telemetry/test_data.py +++ b/tests/telemetry/test_data.py @@ -13,7 +13,6 @@ from ddtrace.internal.telemetry.data import get_application from ddtrace.internal.telemetry.data import get_host_info from ddtrace.internal.telemetry.data import get_hostname -from ddtrace.internal.telemetry.data import update_imported_dependencies def test_get_application(): @@ -173,7 +172,10 @@ def test_get_container_id_when_container_does_not_exists(): assert _get_container_id() == "" +@pytest.mark.subprocess def test_update_imported_dependencies_both_empty(): + from ddtrace.internal.telemetry.data import update_imported_dependencies + already_imported = {} new_modules = [] res = update_imported_dependencies(already_imported, new_modules) @@ -182,9 +184,12 @@ def test_update_imported_dependencies_both_empty(): assert new_modules == [] +@pytest.mark.subprocess def test_update_imported_dependencies(): import xmltodict + from ddtrace.internal.telemetry.data import update_imported_dependencies + already_imported = {} res = update_imported_dependencies(already_imported, [xmltodict.__name__]) assert len(res) == 1 diff --git a/tests/telemetry/test_telemetry.py b/tests/telemetry/test_telemetry.py index 28cb4453466..509017203d7 100644 --- a/tests/telemetry/test_telemetry.py +++ b/tests/telemetry/test_telemetry.py @@ -21,34 +21,6 @@ def test_enable(test_agent_session, run_python_code_in_subprocess): assert stderr == b"" -@pytest.mark.snapshot -def test_telemetry_enabled_on_first_tracer_flush(test_agent_session, ddtrace_run_python_code_in_subprocess): - """assert telemetry events are generated after the first trace is flushed to the agent""" - - # Submit a trace to the agent in a subprocess - code = """ -from ddtrace import tracer - -span = tracer.trace("test-telemetry") -span.finish() - """ - env = os.environ.copy() - env["_DD_INSTRUMENTATION_TELEMETRY_TESTS_FORCE_APP_STARTED"] = "true" - _, stderr, status, _ = ddtrace_run_python_code_in_subprocess(code, env=env) - assert status == 0, stderr - assert stderr == b"" - # Ensure telemetry events were sent to the agent (snapshot ensures one trace was generated) - # Note event order is reversed e.g. event[0] is actually the last event - events = test_agent_session.get_events() - - assert len(events) == 5 - assert events[0]["request_type"] == "app-closing" - assert events[1]["request_type"] == "app-dependencies-loaded" - assert events[2]["request_type"] == "app-integrations-change" - assert events[3]["request_type"] == "generate-metrics" - assert events[4]["request_type"] == "app-started" - - def test_enable_fork(test_agent_session, run_python_code_in_subprocess): """assert app-started/app-closing events are only sent in parent process""" code = """ @@ -183,9 +155,6 @@ def test_app_started_error_handled_exception(test_agent_session, run_python_code from ddtrace import tracer from ddtrace.filters import TraceFilter -from ddtrace.settings import _config - -_config._telemetry_dependency_collection = False class FailingFilture(TraceFilter): def process_trace(self, trace): @@ -197,10 +166,10 @@ def process_trace(self, trace): } ) -# generate and encode span +# generate and encode span to trigger sampling failure tracer.trace("hello").finish() -# force app_started call instead of waiting for periodic() +# force app_started event (instead of waiting for 10 seconds) from ddtrace.internal.telemetry import telemetry_writer telemetry_writer._app_started() """ @@ -273,9 +242,6 @@ def test_handled_integration_error(test_agent_session, run_python_code_in_subpro from ddtrace import patch, tracer patch(raise_errors=False, sqlite3=True) - -# Create a span to start the telemetry writer -tracer.trace("hi").finish() """ env = os.environ.copy() @@ -294,19 +260,12 @@ def test_handled_integration_error(test_agent_session, run_python_code_in_subpro ) # Get metric containing the integration error - integration_error = {} - metric_events = test_agent_session.get_events("generate-metrics") - for event in metric_events: - for metric in event["payload"]["series"]: - if metric["metric"] == "integration_errors": - integration_error = metric - break - + integration_error = test_agent_session.get_metrics("integration_errors") # assert the integration metric has the correct type, count, and tags - assert integration_error - assert integration_error["type"] == "count" - assert integration_error["points"][0][1] == 1 - assert integration_error["tags"] == ["integration_name:sqlite3", "error_type:attributeerror"] + assert len(integration_error) == 1 + assert integration_error[0]["type"] == "count" + assert integration_error[0]["points"][0][1] == 1 + assert integration_error[0]["tags"] == ["integration_name:sqlite3", "error_type:attributeerror"] def test_unhandled_integration_error(test_agent_session, ddtrace_run_python_code_in_subprocess): @@ -350,16 +309,13 @@ def test_unhandled_integration_error(test_agent_session, ddtrace_run_python_code assert "ddtrace/contrib/internal/flask/patch.py:" in flask_integration["error"] assert "not enough values to unpack (expected 2, got 0)" in flask_integration["error"] - metric_events = [event for event in events if event["request_type"] == "generate-metrics"] - - assert len(metric_events) == 1 - assert metric_events[0]["payload"]["namespace"] == "tracers" - assert len(metric_events[0]["payload"]["series"]) == 1 - assert metric_events[0]["payload"]["series"][0]["metric"] == "integration_errors" - assert metric_events[0]["payload"]["series"][0]["type"] == "count" - assert len(metric_events[0]["payload"]["series"][0]["points"]) == 1 - assert metric_events[0]["payload"]["series"][0]["points"][0][1] == 1 - assert metric_events[0]["payload"]["series"][0]["tags"] == ["integration_name:flask", "error_type:valueerror"] + error_metrics = test_agent_session.get_metrics("integration_errors") + assert len(error_metrics) == 1 + error_metric = error_metrics[0] + assert error_metric["type"] == "count" + assert len(error_metric["points"]) == 1 + assert error_metric["points"][0][1] == 1 + assert error_metric["tags"] == ["integration_name:flask", "error_type:valueerror"] def test_app_started_with_install_metrics(test_agent_session, run_python_code_in_subprocess): @@ -373,7 +329,7 @@ def test_app_started_with_install_metrics(test_agent_session, run_python_code_in } ) # Generate a trace to trigger app-started event - _, stderr, status, _ = run_python_code_in_subprocess("import ddtrace; ddtrace.tracer.trace('s1').finish()", env=env) + _, stderr, status, _ = run_python_code_in_subprocess("import ddtrace", env=env) assert status == 0, stderr app_started_event = test_agent_session.get_events("app-started") @@ -392,8 +348,6 @@ def test_instrumentation_telemetry_disabled(test_agent_session, run_python_code_ code = """ from ddtrace import tracer -# Create a span to start the telemetry writer -tracer.trace("hi").finish() # We want to import the telemetry module even when telemetry is disabled. import sys diff --git a/tests/telemetry/test_telemetry_metrics_e2e.py b/tests/telemetry/test_telemetry_metrics_e2e.py index 6e8b833e310..77b10c508ef 100644 --- a/tests/telemetry/test_telemetry_metrics_e2e.py +++ b/tests/telemetry/test_telemetry_metrics_e2e.py @@ -74,7 +74,6 @@ def parse_payload(data): def test_telemetry_metrics_enabled_on_gunicorn_child_process(test_agent_session): token = "tests.telemetry.test_telemetry_metrics_e2e.test_telemetry_metrics_enabled_on_gunicorn_child_process" - initial_event_count = len(test_agent_session.get_events("generate-metrics")) with gunicorn_server(telemetry_metrics_enabled="true", token=token) as context: _, gunicorn_client = context @@ -86,11 +85,12 @@ def test_telemetry_metrics_enabled_on_gunicorn_child_process(test_agent_session) response = gunicorn_client.get("/count_metric") assert response.status_code == 200 - metrics = test_agent_session.get_events("generate-metrics") - assert len(metrics) > initial_event_count - assert len(metrics) == 1 - assert metrics[0]["payload"]["series"][0]["metric"] == "test_metric" - assert metrics[0]["payload"]["series"][0]["points"][0][1] == 5 + # Ensure /count_metric was called 5 times (these counts could be sent in different payloads) + metrics = test_agent_session.get_metrics("test_metric") + count = 0 + for metric in metrics: + count += metric["points"][0][1] + assert count == 5 def test_span_creation_and_finished_metrics_datadog(test_agent_session, ddtrace_run_python_code_in_subprocess): @@ -104,14 +104,13 @@ def test_span_creation_and_finished_metrics_datadog(test_agent_session, ddtrace_ env["_DD_INSTRUMENTATION_TELEMETRY_TESTS_FORCE_APP_STARTED"] = "true" _, stderr, status, _ = ddtrace_run_python_code_in_subprocess(code, env=env) assert status == 0, stderr - metrics_events = test_agent_session.get_events("generate-metrics") - metrics_sc = get_metrics_from_events("spans_created", metrics_events) + metrics_sc = test_agent_session.get_metrics("spans_created") assert len(metrics_sc) == 1 assert metrics_sc[0]["metric"] == "spans_created" assert metrics_sc[0]["tags"] == ["integration_name:datadog"] assert metrics_sc[0]["points"][0][1] == 10 - metrics_sf = get_metrics_from_events("spans_finished", metrics_events) + metrics_sf = test_agent_session.get_metrics("spans_finished") assert len(metrics_sf) == 1 assert metrics_sf[0]["metric"] == "spans_finished" assert metrics_sf[0]["tags"] == ["integration_name:datadog"] @@ -133,15 +132,13 @@ def test_span_creation_and_finished_metrics_otel(test_agent_session, ddtrace_run _, stderr, status, _ = ddtrace_run_python_code_in_subprocess(code, env=env) assert status == 0, stderr - metrics_events = test_agent_session.get_events("generate-metrics") - - metrics_sc = get_metrics_from_events("spans_created", metrics_events) + metrics_sc = test_agent_session.get_metrics("spans_created") assert len(metrics_sc) == 1 assert metrics_sc[0]["metric"] == "spans_created" assert metrics_sc[0]["tags"] == ["integration_name:otel"] assert metrics_sc[0]["points"][0][1] == 9 - metrics_sf = get_metrics_from_events("spans_finished", metrics_events) + metrics_sf = test_agent_session.get_metrics("spans_finished") assert len(metrics_sf) == 1 assert metrics_sf[0]["metric"] == "spans_finished" assert metrics_sf[0]["tags"] == ["integration_name:otel"] @@ -163,15 +160,13 @@ def test_span_creation_and_finished_metrics_opentracing(test_agent_session, ddtr _, stderr, status, _ = ddtrace_run_python_code_in_subprocess(code, env=env) assert status == 0, stderr - metrics_events = test_agent_session.get_events("generate-metrics") - - metrics_sc = get_metrics_from_events("spans_created", metrics_events) + metrics_sc = test_agent_session.get_metrics("spans_created") assert len(metrics_sc) == 1 assert metrics_sc[0]["metric"] == "spans_created" assert metrics_sc[0]["tags"] == ["integration_name:opentracing"] assert metrics_sc[0]["points"][0][1] == 2 - metrics_sf = get_metrics_from_events("spans_finished", metrics_events) + metrics_sf = test_agent_session.get_metrics("spans_finished") assert len(metrics_sf) == 1 assert metrics_sf[0]["metric"] == "spans_finished" assert metrics_sf[0]["tags"] == ["integration_name:opentracing"] @@ -202,8 +197,7 @@ def test_span_creation_no_finish(test_agent_session, ddtrace_run_python_code_in_ _, stderr, status, _ = ddtrace_run_python_code_in_subprocess(code, env=env) assert status == 0, stderr - metrics_events = test_agent_session.get_events("generate-metrics") - metrics = get_metrics_from_events("spans_created", metrics_events) + metrics = test_agent_session.get_metrics("spans_created") assert len(metrics) == 3 assert metrics[0]["metric"] == "spans_created" @@ -215,13 +209,3 @@ def test_span_creation_no_finish(test_agent_session, ddtrace_run_python_code_in_ assert metrics[2]["metric"] == "spans_created" assert metrics[2]["tags"] == ["integration_name:otel"] assert metrics[2]["points"][0][1] == 4 - - -def get_metrics_from_events(name, events): - metrics = [] - for event in events: - for series in event["payload"]["series"]: - if series["metric"] == name: - metrics.append(series) - metrics.sort(key=lambda x: (x["metric"], x["tags"]), reverse=False) - return metrics diff --git a/tests/telemetry/test_writer.py b/tests/telemetry/test_writer.py index 73cdc98c00e..058d01f1c46 100644 --- a/tests/telemetry/test_writer.py +++ b/tests/telemetry/test_writer.py @@ -10,7 +10,6 @@ import pytest import ddtrace.internal.telemetry -from ddtrace.internal.telemetry import modules from ddtrace.internal.telemetry.constants import TELEMETRY_APM_PRODUCT from ddtrace.internal.telemetry.data import get_application from ddtrace.internal.telemetry.data import get_host_info @@ -182,7 +181,6 @@ def test_app_started_event(telemetry_writer, test_agent_session, mock_time): ("DD_APPSEC_SCA_ENABLED", "0", "false"), ], ) -@pytest.mark.skip(reason="FIXME: This test needs to be updated.") def test_app_started_event_configuration_override( test_agent_session, run_python_code_in_subprocess, tmpdir, env_var, value, expected_value ): @@ -196,6 +194,12 @@ def test_app_started_event_configuration_override( logging.basicConfig() import ddtrace.auto + +# By default telemetry collection is enabled after 10 seconds, so we either need to +# to sleep for 10 seconds or manually call _app_started() to generate the app started event. +# This delay allows us to collect start up errors and dynamic configurations +import ddtrace +ddtrace.internal.telemetry.telemetry_writer._app_started() """ env = os.environ.copy() @@ -253,10 +257,8 @@ def test_app_started_event_configuration_override( env["DD_SPAN_SAMPLING_RULES_FILE"] = str(file) env["DD_TRACE_PARTIAL_FLUSH_ENABLED"] = "false" env["DD_TRACE_PARTIAL_FLUSH_MIN_SPANS"] = "3" - env["_DD_INSTRUMENTATION_TELEMETRY_TESTS_FORCE_APP_STARTED"] = "true" _, stderr, status, _ = run_python_code_in_subprocess(code, env=env) - assert status == 0, stderr app_started_events = test_agent_session.get_events("app-started") @@ -349,74 +351,54 @@ def test_app_started_event_configuration_override( assert result == expected -def test_update_dependencies_event(telemetry_writer, test_agent_session, mock_time): - import xmltodict - - new_deps = [xmltodict.__name__] - telemetry_writer._app_dependencies_loaded_event(new_deps) - # force a flush - telemetry_writer.periodic(force_flush=True) - events = test_agent_session.get_events("app-dependencies-loaded") - assert len(events) >= 1 - xmltodict_events = [e for e in events if e["payload"]["dependencies"][0]["name"] == "xmltodict"] - assert len(xmltodict_events) == 1 - assert "xmltodict" in telemetry_writer._imported_dependencies - assert telemetry_writer._imported_dependencies["xmltodict"] - - -def test_update_dependencies_event_when_disabled(telemetry_writer, test_agent_session, mock_time): - with override_global_config(dict(_telemetry_dependency_collection=False)): - # Fetch modules to reset the state of seen modules - modules.get_newly_imported_modules() - - import xmltodict - - new_deps = [xmltodict.__name__] - telemetry_writer._app_dependencies_loaded_event(new_deps) - # force a flush - telemetry_writer.periodic(force_flush=True) - events = test_agent_session.get_events() - for event in events: - assert event["request_type"] != "app-dependencies-loaded" +def test_update_dependencies_event(test_agent_session, ddtrace_run_python_code_in_subprocess): + env = os.environ.copy() + # app-started events are sent 10 seconds after ddtrace imported, this configuration overrides this + # behavior to force the app-started event to be queued immediately + env["_DD_INSTRUMENTATION_TELEMETRY_TESTS_FORCE_APP_STARTED"] = "true" + # Import httppretty after ddtrace is imported, this ensures that the module is sent in a dependencies event + # Imports httpretty twice and ensures only one dependency entry is sent + _, stderr, status, _ = ddtrace_run_python_code_in_subprocess("import xmltodict", env=env) + assert status == 0, stderr + deps = test_agent_session.get_dependencies("xmltodict") + assert len(deps) == 1, deps -@pytest.mark.skip(reason="FIXME: This test does not generate a dependencies event") -def test_update_dependencies_event_not_stdlib(telemetry_writer, test_agent_session, mock_time): - # Fetch modules to reset the state of seen modules - modules.get_newly_imported_modules() - import string +def test_update_dependencies_event_when_disabled(test_agent_session, ddtrace_run_python_code_in_subprocess): + env = os.environ.copy() + # app-started events are sent 10 seconds after ddtrace imported, this configuration overrides this + # behavior to force the app-started event to be queued immediately + env["_DD_INSTRUMENTATION_TELEMETRY_TESTS_FORCE_APP_STARTED"] = "true" + env["DD_TELEMETRY_DEPENDENCY_COLLECTION_ENABLED"] = "false" - new_deps = [string.__name__] - telemetry_writer._app_dependencies_loaded_event(new_deps) - # force a flush - telemetry_writer.periodic(force_flush=True) + # Import httppretty after ddtrace is imported, this ensures that the module is sent in a dependencies event + # Imports httpretty twice and ensures only one dependency entry is sent + _, stderr, status, _ = ddtrace_run_python_code_in_subprocess("import xmltodict") events = test_agent_session.get_events("app-dependencies-loaded") - assert len(events) == 1 - + assert len(events) == 0, events -def test_update_dependencies_event_not_duplicated(telemetry_writer, test_agent_session, mock_time): - # Fetch modules to reset the state of seen modules - modules.get_newly_imported_modules() - import xmltodict - - new_deps = [xmltodict.__name__] - telemetry_writer._app_dependencies_loaded_event(new_deps) - # force a flush - telemetry_writer.periodic(force_flush=True) - events = test_agent_session.get_events("app-dependencies-loaded") - assert events[0]["payload"]["dependencies"][0]["name"] == "xmltodict" - - telemetry_writer._app_dependencies_loaded_event(new_deps) - # force a flush - telemetry_writer.periodic(force_flush=True) - events = test_agent_session.get_events("app-dependencies-loaded") +def test_update_dependencies_event_not_stdlib(test_agent_session, ddtrace_run_python_code_in_subprocess): + env = os.environ.copy() + # app-started events are sent 10 seconds after ddtrace imported, this configuration overrides this + # behavior to force the app-started event to be queued immediately + env["_DD_INSTRUMENTATION_TELEMETRY_TESTS_FORCE_APP_STARTED"] = "true" - assert events[0]["seq_id"] == 1 - # only one event must be sent with a non empty payload - # flaky - # assert sum(e["payload"] != {} for e in events) == 1 + # Import httppretty after ddtrace is imported, this ensures that the module is sent in a dependencies event + # Imports httpretty twice and ensures only one dependency entry is sent + _, stderr, status, _ = ddtrace_run_python_code_in_subprocess( + """ +import sys +import httpretty +del sys.modules["httpretty"] +import httpretty +""", + env=env, + ) + assert status == 0, stderr + deps = test_agent_session.get_dependencies("httpretty") + assert len(deps) == 1, deps def test_app_closing_event(telemetry_writer, test_agent_session, mock_time): @@ -476,8 +458,7 @@ def test_app_client_configuration_changed_event(telemetry_writer, test_agent_ses # force periodic call to flush the first app_started call telemetry_writer.periodic(force_flush=True) """asserts that queuing a configuration sends a valid telemetry request""" - with override_global_config(dict(_telemetry_dependency_collection=False)): - initial_event_count = len(test_agent_session.get_events("app-client-configuration-change")) + with override_global_config(dict()): telemetry_writer.add_configuration("appsec_enabled", True) telemetry_writer.add_configuration("DD_TRACE_PROPAGATION_STYLE_EXTRACT", "datadog") telemetry_writer.add_configuration("appsec_enabled", False, "env_var") @@ -485,12 +466,8 @@ def test_app_client_configuration_changed_event(telemetry_writer, test_agent_ses telemetry_writer.periodic(force_flush=True) events = test_agent_session.get_events("app-client-configuration-change") - assert len(events) >= initial_event_count + 1 - assert events[0]["request_type"] == "app-client-configuration-change" - received_configurations = events[0]["payload"]["configuration"] - # Sort the configuration list by name + received_configurations = [c for event in events for c in event["payload"]["configuration"]] received_configurations.sort(key=lambda c: c["name"]) - # assert the latest configuration value is send to the agent assert received_configurations == [ { @@ -533,8 +510,6 @@ def test_send_failing_request(mock_status, telemetry_writer): telemetry_writer._client.url, mock_status, ) - # ensure one failing request was sent - assert len(httpretty.latest_requests()) == 1 def test_app_heartbeat_event_periodic(mock_time, telemetry_writer, test_agent_session): diff --git a/tests/tracer/test_processors.py b/tests/tracer/test_processors.py index a2587223a52..37ed707c63f 100644 --- a/tests/tracer/test_processors.py +++ b/tests/tracer/test_processors.py @@ -371,21 +371,6 @@ def test_span_creation_metrics(): ) -def test_span_creation_metrics_disabled_telemetry(): - """Test that telemetry metrics are not queued when telemetry is disabled""" - aggr = SpanAggregator( - partial_flush_enabled=False, partial_flush_min_spans=0, trace_processors=[], writer=DummyWriter() - ) - - with override_global_config(dict(_telemetry_enabled=False)): - with mock.patch("ddtrace.internal.telemetry.telemetry_writer.add_count_metric") as mock_tm: - for _ in range(300): - span = Span("span", on_finish=[aggr.on_span_finish]) - aggr.on_span_start(span) - span.finish() - mock_tm.assert_not_called() - - def test_changing_tracer_sampler_changes_tracesamplingprocessor_sampler(): """Changing the tracer sampler should change the sampling processor's sampler""" tracer = Tracer()