Skip to content

Commit

Permalink
Remove pin from celery.app.task.Task and update patch and tests accor…
Browse files Browse the repository at this point in the history
…dingly.
  • Loading branch information
wantsui committed Sep 18, 2024
1 parent 64abe8f commit f0c2892
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 3 deletions.
7 changes: 6 additions & 1 deletion ddtrace/contrib/internal/celery/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ def patch_app(app, pin=None):

# Patch apply_async
trace_utils.wrap("celery.app.task", "Task.apply_async", _traced_apply_async_function(config.celery, "apply_async"))
pin.onto(celery.app.task.Task)

# connect to the Signal framework
signals.task_prerun.connect(trace_prerun, weak=False)
Expand All @@ -70,6 +69,7 @@ def unpatch_app(app):

trace_utils.unwrap(celery.beat.Scheduler, "apply_entry")
trace_utils.unwrap(celery.beat.Scheduler, "tick")
trace_utils.unwrap(celery.app.task.Task, "apply_async")

signals.task_prerun.disconnect(trace_prerun)
signals.task_postrun.disconnect(trace_postrun)
Expand Down Expand Up @@ -105,6 +105,7 @@ def _traced_beat_inner(func, instance, args, kwargs):

def _traced_apply_async_function(integration_config, fn_name, resource_fn=None):
def _traced_apply_async_inner(func, instance, args, kwargs):

with core.context_with_data("task_context"):
try:
return func(*args, **kwargs)
Expand All @@ -113,4 +114,8 @@ def _traced_apply_async_inner(func, instance, args, kwargs):
if task_span:
task_span.finish()

prerun_span = core.get_item("prerun_span")
if prerun_span:
prerun_span.finish()

return _traced_apply_async_inner
5 changes: 4 additions & 1 deletion ddtrace/contrib/internal/celery/signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ def trace_prerun(*args, **kwargs):
service = config.celery["worker_service_name"]
span = pin.tracer.trace(c.WORKER_ROOT_SPAN, service=service, resource=task.name, span_type=SpanTypes.WORKER)

# Store an item called "prerun span" in case task_postrun doesn't get called
core.set_item("prerun_span", span)

# set span.kind to the type of request being performed
span.set_tag_str(SPAN_KIND, SpanKind.CONSUMER)

Expand Down Expand Up @@ -112,7 +115,7 @@ def trace_before_publish(*args, **kwargs):
service = config.celery["producer_service_name"]
span = pin.tracer.trace(c.PRODUCER_ROOT_SPAN, service=service, resource=task_name)

# Store an item called "task span" in case AFTER_TASK_PUBLISH doesn't get called
# Store an item called "task span" in case after_task_publish doesn't get called
core.set_item("task_span", span)

span.set_tag_str(COMPONENT, config.celery.integration_name)
Expand Down
1 change: 0 additions & 1 deletion tests/contrib/celery/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ def setUp(self):
# override pins to use our Dummy Tracer
Pin.override(self.app, tracer=self.tracer)
Pin.override(celery.beat.Scheduler, tracer=self.tracer)
Pin.override(celery.app.task.Task, tracer=self.tracer)

def tearDown(self):
self.app = None
Expand Down

0 comments on commit f0c2892

Please sign in to comment.