Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 12 additions & 8 deletions sentry_sdk/integrations/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ def sentry_build_tracer(name, task, *args, **kwargs):
# short-circuits to task.run if it thinks it's safe.
task.__call__ = _wrap_task_call(task, task.__call__)
task.run = _wrap_task_call(task, task.run)
task.apply_async = _wrap_apply_async(task, task.apply_async)

# `build_tracer` is apparently called for every task
# invocation. Can't wrap every celery task for every invocation
Expand All @@ -72,6 +71,10 @@ def sentry_build_tracer(name, task, *args, **kwargs):

trace.build_tracer = sentry_build_tracer

from celery.app.task import Task # type: ignore

Task.apply_async = _wrap_apply_async(Task.apply_async)

_patch_worker_exit()

# This logger logs every status of every task that ran on the worker.
Expand All @@ -85,30 +88,31 @@ def sentry_build_tracer(name, task, *args, **kwargs):
ignore_logger("celery.redirected")


def _wrap_apply_async(task, f):
# type: (Any, F) -> F
def _wrap_apply_async(f):
# type: (F) -> F
@wraps(f)
def apply_async(*args, **kwargs):
# type: (*Any, **Any) -> Any
hub = Hub.current
integration = hub.get_integration(CeleryIntegration)
if integration is not None and integration.propagate_traces:
with hub.start_span(op="celery.submit", description=task.name):
with hub.start_span(op="celery.submit", description=args[0].name):
with capture_internal_exceptions():
headers = dict(hub.iter_trace_propagation_headers())

if headers:
kwarg_headers = kwargs.setdefault("headers", {})
# Note: kwargs can contain headers=None, so no setdefault!
# Unsure which backend though.
kwarg_headers = kwargs.get("headers") or {}
kwarg_headers.update(headers)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line is currently crashing all the time (in capture_internal_exceptions) because the old code could not deal with kwargs == {"headers": None}, only kwargs == {}.


# https://github.com/celery/celery/issues/4875
#
# Need to setdefault the inner headers too since other
# tracing tools (dd-trace-py) also employ this exact
# workaround and we don't want to break them.
#
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Turns out it is perfectly reproducible. The bug we're working around lives in celery.app.amqp, but it seems that module is used in redis too??? tbh I no longer understand how celery separates concerns.

# This is not reproducible outside of AMQP, therefore no
# tests!
kwarg_headers.setdefault("headers", {}).update(headers)
kwargs["headers"] = kwarg_headers

return f(*args, **kwargs)
else:
Expand Down
6 changes: 1 addition & 5 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,11 +235,7 @@ def append_envelope(envelope):
@pytest.fixture
def capture_events_forksafe(monkeypatch, capture_events, request):
def inner():
in_process_events = capture_events()

@request.addfinalizer
def _():
assert not in_process_events
capture_events()
Copy link
Member Author

@untitaker untitaker Sep 22, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to remove this assertion because we actually do have events in the same process now.

We need to call capture_events or otherwise our transport will raise an error (this fixture setup is overdue for refactor...)


events_r, events_w = os.pipe()
events_r = os.fdopen(events_r, "rb", 0)
Expand Down
16 changes: 13 additions & 3 deletions tests/integrations/celery/test_celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def inner(propagate_traces=True, backend="always_eager", **kwargs):

# this backend requires capture_events_forksafe
celery.conf.worker_max_tasks_per_child = 1
celery.conf.worker_concurrency = 1
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This speeds up tests, otherwise celery forks 20 times.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good on the perspective of speeding up tests -- won't this have an effect on bugs and code paths surfaced by tests, though?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we haven't had any bugs related to this so far and my gut feeling tells me no. There is no direct communication between the forked processes so I'd say it's unlikely, and the tests won't send in high event volumes ever anyway.

celery.conf.broker_url = "redis://127.0.0.1:6379"
celery.conf.result_backend = "redis://127.0.0.1:6379"
celery.conf.task_always_eager = False
Expand Down Expand Up @@ -297,7 +298,7 @@ def dummy_task(self):


@pytest.mark.forked
def test_redis_backend(init_celery, capture_events_forksafe, tmpdir):
def test_redis_backend_trace_propagation(init_celery, capture_events_forksafe, tmpdir):
celery = init_celery(traces_sample_rate=1.0, backend="redis", debug=True)

events = capture_events_forksafe()
Expand All @@ -309,8 +310,9 @@ def dummy_task(self):
runs.append(1)
1 / 0

# Curious: Cannot use delay() here or py2.7-celery-4.2 crashes
res = dummy_task.apply_async()
with start_transaction(name="submit_celery"):
# Curious: Cannot use delay() here or py2.7-celery-4.2 crashes
res = dummy_task.apply_async()

with pytest.raises(Exception):
# Celery 4.1 raises a gibberish exception
Expand All @@ -319,6 +321,13 @@ def dummy_task(self):
# if this is nonempty, the worker never really forked
assert not runs

submit_transaction = events.read_event()
assert submit_transaction["type"] == "transaction"
assert submit_transaction["transaction"] == "submit_celery"
(span,) = submit_transaction["spans"]
assert span["op"] == "celery.submit"
assert span["description"] == "dummy_task"

event = events.read_event()
(exception,) = event["exception"]["values"]
assert exception["type"] == "ZeroDivisionError"
Expand All @@ -327,6 +336,7 @@ def dummy_task(self):
assert (
transaction["contexts"]["trace"]["trace_id"]
== event["contexts"]["trace"]["trace_id"]
== submit_transaction["contexts"]["trace"]["trace_id"]
)

events.read_flush()
Expand Down