From c3655c26351112d08b99f88e8bfd087712ac3cc7 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Tue, 22 Sep 2020 17:31:59 +0200 Subject: [PATCH 1/5] wip --- sentry_sdk/integrations/celery.py | 20 ++++++++++++-------- tests/conftest.py | 7 ++----- tests/integrations/celery/test_celery.py | 17 ++++++++++++++--- 3 files changed, 28 insertions(+), 16 deletions(-) diff --git a/sentry_sdk/integrations/celery.py b/sentry_sdk/integrations/celery.py index 1a11d4a745..04f72bfebd 100644 --- a/sentry_sdk/integrations/celery.py +++ b/sentry_sdk/integrations/celery.py @@ -61,7 +61,6 @@ def sentry_build_tracer(name, task, *args, **kwargs): # short-circuits to task.run if it thinks it's safe. task.__call__ = _wrap_task_call(task, task.__call__) task.run = _wrap_task_call(task, task.run) - task.apply_async = _wrap_apply_async(task, task.apply_async) # `build_tracer` is apparently called for every task # invocation. Can't wrap every celery task for every invocation @@ -72,6 +71,10 @@ def sentry_build_tracer(name, task, *args, **kwargs): trace.build_tracer = sentry_build_tracer + from celery.app.task import Task + + Task.apply_async = _wrap_apply_async(Task.apply_async) + _patch_worker_exit() # This logger logs every status of every task that ran on the worker. @@ -85,19 +88,22 @@ def sentry_build_tracer(name, task, *args, **kwargs): ignore_logger("celery.redirected") -def _wrap_apply_async(task, f): - # type: (Any, F) -> F +def _wrap_apply_async(f): + # type: (F) -> F @wraps(f) def apply_async(*args, **kwargs): # type: (*Any, **Any) -> Any hub = Hub.current integration = hub.get_integration(CeleryIntegration) if integration is not None and integration.propagate_traces: - with hub.start_span(op="celery.submit", description=task.name): + with hub.start_span(op="celery.submit", description=args[0].name) as span: with capture_internal_exceptions(): headers = dict(hub.iter_trace_propagation_headers()) + if headers: - kwarg_headers = kwargs.setdefault("headers", {}) + # Note: kwargs can contain headers=None, so no setdefault! + # Unsure which backend though. + kwarg_headers = kwargs.get("headers") or {} kwarg_headers.update(headers) # https://github.com/celery/celery/issues/4875 @@ -105,10 +111,8 @@ def apply_async(*args, **kwargs): # Need to setdefault the inner headers too since other # tracing tools (dd-trace-py) also employ this exact # workaround and we don't want to break them. - # - # This is not reproducible outside of AMQP, therefore no - # tests! kwarg_headers.setdefault("headers", {}).update(headers) + kwargs['headers'] = kwarg_headers return f(*args, **kwargs) else: diff --git a/tests/conftest.py b/tests/conftest.py index 0a17d135fc..682498c51d 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,5 +1,6 @@ import os import json +import traceback import pytest import jsonschema @@ -235,11 +236,7 @@ def append_envelope(envelope): @pytest.fixture def capture_events_forksafe(monkeypatch, capture_events, request): def inner(): - in_process_events = capture_events() - - @request.addfinalizer - def _(): - assert not in_process_events + capture_events() events_r, events_w = os.pipe() events_r = os.fdopen(events_r, "rb", 0) diff --git a/tests/integrations/celery/test_celery.py b/tests/integrations/celery/test_celery.py index 13c7c4dd46..0763f3e3cf 100644 --- a/tests/integrations/celery/test_celery.py +++ b/tests/integrations/celery/test_celery.py @@ -42,6 +42,7 @@ def inner(propagate_traces=True, backend="always_eager", **kwargs): # this backend requires capture_events_forksafe celery.conf.worker_max_tasks_per_child = 1 + celery.conf.worker_concurrency = 1 celery.conf.broker_url = "redis://127.0.0.1:6379" celery.conf.result_backend = "redis://127.0.0.1:6379" celery.conf.task_always_eager = False @@ -297,7 +298,7 @@ def dummy_task(self): @pytest.mark.forked -def test_redis_backend(init_celery, capture_events_forksafe, tmpdir): +def test_redis_backend_trace_propagation(init_celery, capture_events_forksafe, tmpdir): celery = init_celery(traces_sample_rate=1.0, backend="redis", debug=True) events = capture_events_forksafe() @@ -309,8 +310,10 @@ def dummy_task(self): runs.append(1) 1 / 0 - # Curious: Cannot use delay() here or py2.7-celery-4.2 crashes - res = dummy_task.apply_async() + + with start_transaction(name='submit_celery'): + # Curious: Cannot use delay() here or py2.7-celery-4.2 crashes + res = dummy_task.apply_async() with pytest.raises(Exception): # Celery 4.1 raises a gibberish exception @@ -319,6 +322,13 @@ def dummy_task(self): # if this is nonempty, the worker never really forked assert not runs + submit_transaction = events.read_event() + assert submit_transaction['type'] == 'transaction' + assert submit_transaction['transaction'] == 'submit_celery' + span, = submit_transaction['spans'] + assert span['op'] == 'celery.submit' + assert span['description'] == 'dummy_task' + event = events.read_event() (exception,) = event["exception"]["values"] assert exception["type"] == "ZeroDivisionError" @@ -327,6 +337,7 @@ def dummy_task(self): assert ( transaction["contexts"]["trace"]["trace_id"] == event["contexts"]["trace"]["trace_id"] + == submit_transaction["contexts"]["trace"]["trace_id"] ) events.read_flush() From f380004c4bcd38c20593368ffc7f78d62e2670cb Mon Sep 17 00:00:00 2001 From: sentry-bot Date: Tue, 22 Sep 2020 18:20:45 +0000 Subject: [PATCH 2/5] fix: Formatting --- sentry_sdk/integrations/celery.py | 2 +- tests/integrations/celery/test_celery.py | 13 ++++++------- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/sentry_sdk/integrations/celery.py b/sentry_sdk/integrations/celery.py index 04f72bfebd..9a5f4c8b2e 100644 --- a/sentry_sdk/integrations/celery.py +++ b/sentry_sdk/integrations/celery.py @@ -112,7 +112,7 @@ def apply_async(*args, **kwargs): # tracing tools (dd-trace-py) also employ this exact # workaround and we don't want to break them. kwarg_headers.setdefault("headers", {}).update(headers) - kwargs['headers'] = kwarg_headers + kwargs["headers"] = kwarg_headers return f(*args, **kwargs) else: diff --git a/tests/integrations/celery/test_celery.py b/tests/integrations/celery/test_celery.py index 0763f3e3cf..6ef50bc093 100644 --- a/tests/integrations/celery/test_celery.py +++ b/tests/integrations/celery/test_celery.py @@ -310,8 +310,7 @@ def dummy_task(self): runs.append(1) 1 / 0 - - with start_transaction(name='submit_celery'): + with start_transaction(name="submit_celery"): # Curious: Cannot use delay() here or py2.7-celery-4.2 crashes res = dummy_task.apply_async() @@ -323,11 +322,11 @@ def dummy_task(self): assert not runs submit_transaction = events.read_event() - assert submit_transaction['type'] == 'transaction' - assert submit_transaction['transaction'] == 'submit_celery' - span, = submit_transaction['spans'] - assert span['op'] == 'celery.submit' - assert span['description'] == 'dummy_task' + assert submit_transaction["type"] == "transaction" + assert submit_transaction["transaction"] == "submit_celery" + (span,) = submit_transaction["spans"] + assert span["op"] == "celery.submit" + assert span["description"] == "dummy_task" event = events.read_event() (exception,) = event["exception"]["values"] From 036be0cca589688baeae8cd8b0c487e813a7afe5 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Tue, 22 Sep 2020 21:07:19 +0200 Subject: [PATCH 3/5] remove unused import --- tests/conftest.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/conftest.py b/tests/conftest.py index 682498c51d..1c368a5b14 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,6 +1,5 @@ import os import json -import traceback import pytest import jsonschema From 7818ae93e6a06dba907a115a6fd5fd39c7af983d Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Wed, 23 Sep 2020 09:46:14 +0200 Subject: [PATCH 4/5] fix linters --- sentry_sdk/integrations/celery.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sentry_sdk/integrations/celery.py b/sentry_sdk/integrations/celery.py index 9a5f4c8b2e..a63e80eb47 100644 --- a/sentry_sdk/integrations/celery.py +++ b/sentry_sdk/integrations/celery.py @@ -96,7 +96,7 @@ def apply_async(*args, **kwargs): hub = Hub.current integration = hub.get_integration(CeleryIntegration) if integration is not None and integration.propagate_traces: - with hub.start_span(op="celery.submit", description=args[0].name) as span: + with hub.start_span(op="celery.submit", description=args[0].name): with capture_internal_exceptions(): headers = dict(hub.iter_trace_propagation_headers()) From 5c99cfe238e371d856a48312dfef0d2252d786e2 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Wed, 23 Sep 2020 10:36:48 +0200 Subject: [PATCH 5/5] fix linters --- sentry_sdk/integrations/celery.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sentry_sdk/integrations/celery.py b/sentry_sdk/integrations/celery.py index a63e80eb47..2b51fe1f00 100644 --- a/sentry_sdk/integrations/celery.py +++ b/sentry_sdk/integrations/celery.py @@ -71,7 +71,7 @@ def sentry_build_tracer(name, task, *args, **kwargs): trace.build_tracer = sentry_build_tracer - from celery.app.task import Task + from celery.app.task import Task # type: ignore Task.apply_async = _wrap_apply_async(Task.apply_async)