-
Notifications
You must be signed in to change notification settings - Fork 580
fix: Second attempt at fixing trace propagation in Celery 4.2+ #831
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -61,7 +61,6 @@ def sentry_build_tracer(name, task, *args, **kwargs): | |
| # short-circuits to task.run if it thinks it's safe. | ||
| task.__call__ = _wrap_task_call(task, task.__call__) | ||
| task.run = _wrap_task_call(task, task.run) | ||
| task.apply_async = _wrap_apply_async(task, task.apply_async) | ||
|
|
||
| # `build_tracer` is apparently called for every task | ||
| # invocation. Can't wrap every celery task for every invocation | ||
|
|
@@ -72,6 +71,10 @@ def sentry_build_tracer(name, task, *args, **kwargs): | |
|
|
||
| trace.build_tracer = sentry_build_tracer | ||
|
|
||
| from celery.app.task import Task # type: ignore | ||
|
|
||
| Task.apply_async = _wrap_apply_async(Task.apply_async) | ||
|
|
||
| _patch_worker_exit() | ||
|
|
||
| # This logger logs every status of every task that ran on the worker. | ||
|
|
@@ -85,30 +88,31 @@ def sentry_build_tracer(name, task, *args, **kwargs): | |
| ignore_logger("celery.redirected") | ||
|
|
||
|
|
||
| def _wrap_apply_async(task, f): | ||
| # type: (Any, F) -> F | ||
| def _wrap_apply_async(f): | ||
| # type: (F) -> F | ||
| @wraps(f) | ||
| def apply_async(*args, **kwargs): | ||
| # type: (*Any, **Any) -> Any | ||
| hub = Hub.current | ||
| integration = hub.get_integration(CeleryIntegration) | ||
| if integration is not None and integration.propagate_traces: | ||
| with hub.start_span(op="celery.submit", description=task.name): | ||
| with hub.start_span(op="celery.submit", description=args[0].name): | ||
| with capture_internal_exceptions(): | ||
| headers = dict(hub.iter_trace_propagation_headers()) | ||
|
|
||
| if headers: | ||
| kwarg_headers = kwargs.setdefault("headers", {}) | ||
| # Note: kwargs can contain headers=None, so no setdefault! | ||
| # Unsure which backend though. | ||
| kwarg_headers = kwargs.get("headers") or {} | ||
| kwarg_headers.update(headers) | ||
|
|
||
| # https://github.com/celery/celery/issues/4875 | ||
| # | ||
| # Need to setdefault the inner headers too since other | ||
| # tracing tools (dd-trace-py) also employ this exact | ||
| # workaround and we don't want to break them. | ||
| # | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Turns out it is perfectly reproducible. The bug we're working around lives in celery.app.amqp, but it seems that module is used in redis too??? tbh I no longer understand how celery separates concerns. |
||
| # This is not reproducible outside of AMQP, therefore no | ||
| # tests! | ||
| kwarg_headers.setdefault("headers", {}).update(headers) | ||
| kwargs["headers"] = kwarg_headers | ||
|
|
||
| return f(*args, **kwargs) | ||
| else: | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -235,11 +235,7 @@ def append_envelope(envelope): | |
| @pytest.fixture | ||
| def capture_events_forksafe(monkeypatch, capture_events, request): | ||
| def inner(): | ||
| in_process_events = capture_events() | ||
|
|
||
| @request.addfinalizer | ||
| def _(): | ||
| assert not in_process_events | ||
| capture_events() | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Need to remove this assertion because we actually do have events in the same process now. We need to call capture_events or otherwise our transport will raise an error (this fixture setup is overdue for refactor...) |
||
|
|
||
| events_r, events_w = os.pipe() | ||
| events_r = os.fdopen(events_r, "rb", 0) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -42,6 +42,7 @@ def inner(propagate_traces=True, backend="always_eager", **kwargs): | |
|
|
||
| # this backend requires capture_events_forksafe | ||
| celery.conf.worker_max_tasks_per_child = 1 | ||
| celery.conf.worker_concurrency = 1 | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This speeds up tests, otherwise celery forks 20 times.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds good on the perspective of speeding up tests -- won't this have an effect on bugs and code paths surfaced by tests, though?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So we haven't had any bugs related to this so far and my gut feeling tells me no. There is no direct communication between the forked processes so I'd say it's unlikely, and the tests won't send in high event volumes ever anyway. |
||
| celery.conf.broker_url = "redis://127.0.0.1:6379" | ||
| celery.conf.result_backend = "redis://127.0.0.1:6379" | ||
| celery.conf.task_always_eager = False | ||
|
|
@@ -297,7 +298,7 @@ def dummy_task(self): | |
|
|
||
|
|
||
| @pytest.mark.forked | ||
| def test_redis_backend(init_celery, capture_events_forksafe, tmpdir): | ||
| def test_redis_backend_trace_propagation(init_celery, capture_events_forksafe, tmpdir): | ||
| celery = init_celery(traces_sample_rate=1.0, backend="redis", debug=True) | ||
|
|
||
| events = capture_events_forksafe() | ||
|
|
@@ -309,8 +310,9 @@ def dummy_task(self): | |
| runs.append(1) | ||
| 1 / 0 | ||
|
|
||
| # Curious: Cannot use delay() here or py2.7-celery-4.2 crashes | ||
| res = dummy_task.apply_async() | ||
| with start_transaction(name="submit_celery"): | ||
| # Curious: Cannot use delay() here or py2.7-celery-4.2 crashes | ||
| res = dummy_task.apply_async() | ||
|
|
||
| with pytest.raises(Exception): | ||
| # Celery 4.1 raises a gibberish exception | ||
|
|
@@ -319,6 +321,13 @@ def dummy_task(self): | |
| # if this is nonempty, the worker never really forked | ||
| assert not runs | ||
|
|
||
| submit_transaction = events.read_event() | ||
| assert submit_transaction["type"] == "transaction" | ||
| assert submit_transaction["transaction"] == "submit_celery" | ||
| (span,) = submit_transaction["spans"] | ||
| assert span["op"] == "celery.submit" | ||
| assert span["description"] == "dummy_task" | ||
|
|
||
| event = events.read_event() | ||
| (exception,) = event["exception"]["values"] | ||
| assert exception["type"] == "ZeroDivisionError" | ||
|
|
@@ -327,6 +336,7 @@ def dummy_task(self): | |
| assert ( | ||
| transaction["contexts"]["trace"]["trace_id"] | ||
| == event["contexts"]["trace"]["trace_id"] | ||
| == submit_transaction["contexts"]["trace"]["trace_id"] | ||
| ) | ||
|
|
||
| events.read_flush() | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line is currently crashing all the time (in
capture_internal_exceptions) because the old code could not deal withkwargs == {"headers": None}, onlykwargs == {}.