From d9c3bb9045ca83f41ea2b086c655f6848063c70d Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Fri, 18 Sep 2020 15:04:55 +0200 Subject: [PATCH 1/6] wip --- sentry_sdk/integrations/celery.py | 47 +++++++++++++-------- tests/conftest.py | 10 ++++- tests/integrations/celery/test_celery.py | 52 ++++++++++++++++-------- tox.ini | 4 +- 4 files changed, 74 insertions(+), 39 deletions(-) diff --git a/sentry_sdk/integrations/celery.py b/sentry_sdk/integrations/celery.py index 86714e2111..1a11d4a745 100644 --- a/sentry_sdk/integrations/celery.py +++ b/sentry_sdk/integrations/celery.py @@ -93,15 +93,23 @@ def apply_async(*args, **kwargs): hub = Hub.current integration = hub.get_integration(CeleryIntegration) if integration is not None and integration.propagate_traces: - headers = None - for key, value in hub.iter_trace_propagation_headers(): - if headers is None: - headers = dict(kwargs.get("headers") or {}) - headers[key] = value - if headers is not None: - kwargs["headers"] = headers - with hub.start_span(op="celery.submit", description=task.name): + with capture_internal_exceptions(): + headers = dict(hub.iter_trace_propagation_headers()) + if headers: + kwarg_headers = kwargs.setdefault("headers", {}) + kwarg_headers.update(headers) + + # https://github.com/celery/celery/issues/4875 + # + # Need to setdefault the inner headers too since other + # tracing tools (dd-trace-py) also employ this exact + # workaround and we don't want to break them. + # + # This is not reproducible outside of AMQP, therefore no + # tests! + kwarg_headers.setdefault("headers", {}).update(headers) + return f(*args, **kwargs) else: return f(*args, **kwargs) @@ -130,19 +138,22 @@ def _inner(*args, **kwargs): scope.clear_breadcrumbs() scope.add_event_processor(_make_event_processor(task, *args, **kwargs)) - transaction = Transaction.continue_from_headers( - args[3].get("headers") or {}, - op="celery.task", - name="unknown celery task", - ) - - # Could possibly use a better hook than this one - transaction.set_status("ok") + transaction = None + # Celery task objects are not a thing to be trusted. Even + # something such as attribute access can fail. with capture_internal_exceptions(): - # Celery task objects are not a thing to be trusted. Even - # something such as attribute access can fail. + transaction = Transaction.continue_from_headers( + args[3].get("headers") or {}, + op="celery.task", + name="unknown celery task", + ) + transaction.name = task.name + transaction.set_status("ok") + + if transaction is None: + return f(*args, **kwargs) with hub.start_transaction(transaction): return f(*args, **kwargs) diff --git a/tests/conftest.py b/tests/conftest.py index 36ab1d9159..0a17d135fc 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -197,7 +197,7 @@ def append_event(event): def append_envelope(envelope): for item in envelope: if item.headers.get("type") in ("event", "transaction"): - events.append(item.payload.json) + test_client.transport.capture_event(item.payload.json) return old_capture_envelope(envelope) monkeypatch.setattr(test_client.transport, "capture_event", append_event) @@ -233,8 +233,14 @@ def append_envelope(envelope): @pytest.fixture -def capture_events_forksafe(monkeypatch): +def capture_events_forksafe(monkeypatch, capture_events, request): def inner(): + in_process_events = capture_events() + + @request.addfinalizer + def _(): + assert not in_process_events + events_r, events_w = os.pipe() events_r = os.fdopen(events_r, "rb", 0) events_w = os.fdopen(events_w, "wb", 0) diff --git a/tests/integrations/celery/test_celery.py b/tests/integrations/celery/test_celery.py index ed06e8f2b0..cb8e2d54f3 100644 --- a/tests/integrations/celery/test_celery.py +++ b/tests/integrations/celery/test_celery.py @@ -22,17 +22,37 @@ def inner(signal, f): @pytest.fixture -def init_celery(sentry_init): - def inner(propagate_traces=True, **kwargs): +def init_celery(sentry_init, request): + def inner(propagate_traces=True, backend='always_eager', **kwargs): sentry_init( integrations=[CeleryIntegration(propagate_traces=propagate_traces)], **kwargs ) celery = Celery(__name__) - if VERSION < (4,): - celery.conf.CELERY_ALWAYS_EAGER = True + + if backend == 'always_eager': + if VERSION < (4,): + celery.conf.CELERY_ALWAYS_EAGER = True + else: + celery.conf.task_always_eager = True + elif backend == 'redis': + # requires capture_events_forksafe + celery.conf.worker_max_tasks_per_child = 1 + celery.conf.broker_url = "redis://127.0.0.1:6379" + celery.conf.result_backend = "redis://127.0.0.1:6379" + celery.conf.task_always_eager = False + + Hub.main.bind_client(Hub.current.client) + request.addfinalizer(lambda: Hub.main.bind_client(None)) + + # Once we drop celery 3 we can use the celery_worker fixture + w = worker.worker(app=celery) + t = threading.Thread(target=w.run) + t.daemon = True + t.start() else: - celery.conf.task_always_eager = True + raise ValueError(backend) + return celery return inner @@ -273,15 +293,10 @@ def dummy_task(self): @pytest.mark.forked -@pytest.mark.skipif(VERSION < (4,), reason="in-memory backend broken") -def test_transport_shutdown(request, celery, capture_events_forksafe, tmpdir): - events = capture_events_forksafe() +def test_redis_backend(init_celery, capture_events_forksafe, tmpdir): + celery = init_celery(traces_sample_rate=1.0, backend='redis', debug=True) - celery.conf.worker_max_tasks_per_child = 1 - celery.conf.broker_url = "memory://localhost/" - celery.conf.broker_backend = "memory" - celery.conf.result_backend = "file://{}".format(tmpdir.mkdir("celery-results")) - celery.conf.task_always_eager = False + events = capture_events_forksafe() runs = [] @@ -292,19 +307,20 @@ def dummy_task(self): res = dummy_task.delay() - w = worker.worker(app=celery) - t = threading.Thread(target=w.run) - t.daemon = True - t.start() - with pytest.raises(Exception): # Celery 4.1 raises a gibberish exception res.wait() + # if this is nonempty, the worker never really forked + assert not runs + event = events.read_event() (exception,) = event["exception"]["values"] assert exception["type"] == "ZeroDivisionError" + transaction = events.read_event() + assert transaction['contexts']['trace']['trace_id'] == event['contexts']['trace']['trace_id'] + events.read_flush() # if this is nonempty, the worker never really forked diff --git a/tox.ini b/tox.ini index 78d73a14aa..eb85a4b654 100644 --- a/tox.ini +++ b/tox.ini @@ -38,7 +38,8 @@ envlist = {py3.6,py3.7}-sanic-19 # TODO: Add py3.9 - {pypy,py2.7,py3.5,py3.6,py3.7,py3.8}-celery-{4.1,4.2,4.3,4.4} + {pypy,py2.7,py3.5,py3.6}-celery-{4.1,4.2} + {pypy,py2.7,py3.5,py3.6,py3.7,py3.8}-celery-{4.3,4.4} {pypy,py2.7}-celery-3 {py2.7,py3.7}-beam-{2.12,2.13} @@ -128,6 +129,7 @@ deps = beam-2.13: apache-beam>=2.13.0, <2.14.0 beam-master: git+https://github.com/apache/beam#egg=apache-beam&subdirectory=sdks/python + celery: redis celery-3: Celery>=3.1,<4.0 celery-4.1: Celery>=4.1,<4.2 celery-4.2: Celery>=4.2,<4.3 From 2580956c01870bcd298449e632ec0e3d29b4a417 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Mon, 21 Sep 2020 14:42:37 +0200 Subject: [PATCH 2/6] work around another celery bug? --- tests/integrations/celery/test_celery.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/integrations/celery/test_celery.py b/tests/integrations/celery/test_celery.py index cb8e2d54f3..e87d2439c0 100644 --- a/tests/integrations/celery/test_celery.py +++ b/tests/integrations/celery/test_celery.py @@ -305,7 +305,8 @@ def dummy_task(self): runs.append(1) 1 / 0 - res = dummy_task.delay() + # Curious: Cannot use delay() here or py2.7-celery-4.2 crashes + res = dummy_task.apply_async() with pytest.raises(Exception): # Celery 4.1 raises a gibberish exception From b30f2141909c4531e9e930a8b4619d94d220fb4c Mon Sep 17 00:00:00 2001 From: sentry-bot Date: Mon, 21 Sep 2020 12:43:15 +0000 Subject: [PATCH 3/6] fix: Formatting --- tests/integrations/celery/test_celery.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/tests/integrations/celery/test_celery.py b/tests/integrations/celery/test_celery.py index e87d2439c0..9491c9361f 100644 --- a/tests/integrations/celery/test_celery.py +++ b/tests/integrations/celery/test_celery.py @@ -23,19 +23,19 @@ def inner(signal, f): @pytest.fixture def init_celery(sentry_init, request): - def inner(propagate_traces=True, backend='always_eager', **kwargs): + def inner(propagate_traces=True, backend="always_eager", **kwargs): sentry_init( integrations=[CeleryIntegration(propagate_traces=propagate_traces)], **kwargs ) celery = Celery(__name__) - if backend == 'always_eager': + if backend == "always_eager": if VERSION < (4,): celery.conf.CELERY_ALWAYS_EAGER = True else: celery.conf.task_always_eager = True - elif backend == 'redis': + elif backend == "redis": # requires capture_events_forksafe celery.conf.worker_max_tasks_per_child = 1 celery.conf.broker_url = "redis://127.0.0.1:6379" @@ -294,7 +294,7 @@ def dummy_task(self): @pytest.mark.forked def test_redis_backend(init_celery, capture_events_forksafe, tmpdir): - celery = init_celery(traces_sample_rate=1.0, backend='redis', debug=True) + celery = init_celery(traces_sample_rate=1.0, backend="redis", debug=True) events = capture_events_forksafe() @@ -320,7 +320,10 @@ def dummy_task(self): assert exception["type"] == "ZeroDivisionError" transaction = events.read_event() - assert transaction['contexts']['trace']['trace_id'] == event['contexts']['trace']['trace_id'] + assert ( + transaction["contexts"]["trace"]["trace_id"] + == event["contexts"]["trace"]["trace_id"] + ) events.read_flush() From a323853ca5708e62430cc59ac80368cc077addf7 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Mon, 21 Sep 2020 15:29:13 +0200 Subject: [PATCH 4/6] fix schema violation --- sentry_sdk/tracing.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sentry_sdk/tracing.py b/sentry_sdk/tracing.py index 9064a96805..3028284ac3 100644 --- a/sentry_sdk/tracing.py +++ b/sentry_sdk/tracing.py @@ -318,7 +318,7 @@ def set_status(self, value): def set_http_status(self, http_status): # type: (int) -> None - self.set_tag("http.status_code", http_status) + self.set_tag("http.status_code", str(http_status)) if http_status < 400: self.set_status("ok") From 5dabecd07d35e5f09521510603fe7d62c8a80b91 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Mon, 21 Sep 2020 17:33:16 +0200 Subject: [PATCH 5/6] add redis --- .travis.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.travis.yml b/.travis.yml index ef24eed4ce..5bf138a656 100644 --- a/.travis.yml +++ b/.travis.yml @@ -4,6 +4,7 @@ dist: xenial services: - postgresql + - redis-server language: python From 8dd68a67f9a61758c565aeec12bbb2cd6b21b68c Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Mon, 21 Sep 2020 21:31:28 +0200 Subject: [PATCH 6/6] I really tried! --- tests/integrations/celery/test_celery.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/tests/integrations/celery/test_celery.py b/tests/integrations/celery/test_celery.py index 9491c9361f..13c7c4dd46 100644 --- a/tests/integrations/celery/test_celery.py +++ b/tests/integrations/celery/test_celery.py @@ -36,7 +36,11 @@ def inner(propagate_traces=True, backend="always_eager", **kwargs): else: celery.conf.task_always_eager = True elif backend == "redis": - # requires capture_events_forksafe + # broken on celery 3 + if VERSION < (4,): + pytest.skip("Redis backend broken for some reason") + + # this backend requires capture_events_forksafe celery.conf.worker_max_tasks_per_child = 1 celery.conf.broker_url = "redis://127.0.0.1:6379" celery.conf.result_backend = "redis://127.0.0.1:6379"