Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ dist: xenial

services:
- postgresql
- redis-server

language: python

Expand Down
47 changes: 29 additions & 18 deletions sentry_sdk/integrations/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,15 +93,23 @@ def apply_async(*args, **kwargs):
hub = Hub.current
integration = hub.get_integration(CeleryIntegration)
if integration is not None and integration.propagate_traces:
headers = None
for key, value in hub.iter_trace_propagation_headers():
if headers is None:
headers = dict(kwargs.get("headers") or {})
headers[key] = value
if headers is not None:
kwargs["headers"] = headers

with hub.start_span(op="celery.submit", description=task.name):
with capture_internal_exceptions():
headers = dict(hub.iter_trace_propagation_headers())
if headers:
kwarg_headers = kwargs.setdefault("headers", {})
kwarg_headers.update(headers)

# https://github.com/celery/celery/issues/4875
#
# Need to setdefault the inner headers too since other
# tracing tools (dd-trace-py) also employ this exact
# workaround and we don't want to break them.
#
# This is not reproducible outside of AMQP, therefore no
# tests!
kwarg_headers.setdefault("headers", {}).update(headers)

return f(*args, **kwargs)
else:
return f(*args, **kwargs)
Expand Down Expand Up @@ -130,19 +138,22 @@ def _inner(*args, **kwargs):
scope.clear_breadcrumbs()
scope.add_event_processor(_make_event_processor(task, *args, **kwargs))

transaction = Transaction.continue_from_headers(
args[3].get("headers") or {},
op="celery.task",
name="unknown celery task",
)

# Could possibly use a better hook than this one
transaction.set_status("ok")
transaction = None

# Celery task objects are not a thing to be trusted. Even
# something such as attribute access can fail.
with capture_internal_exceptions():
# Celery task objects are not a thing to be trusted. Even
# something such as attribute access can fail.
transaction = Transaction.continue_from_headers(
args[3].get("headers") or {},
op="celery.task",
name="unknown celery task",
)

transaction.name = task.name
transaction.set_status("ok")

if transaction is None:
return f(*args, **kwargs)

with hub.start_transaction(transaction):
return f(*args, **kwargs)
Expand Down
2 changes: 1 addition & 1 deletion sentry_sdk/tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ def set_status(self, value):

def set_http_status(self, http_status):
# type: (int) -> None
self.set_tag("http.status_code", http_status)
self.set_tag("http.status_code", str(http_status))

if http_status < 400:
self.set_status("ok")
Expand Down
10 changes: 8 additions & 2 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ def append_event(event):
def append_envelope(envelope):
for item in envelope:
if item.headers.get("type") in ("event", "transaction"):
events.append(item.payload.json)
test_client.transport.capture_event(item.payload.json)
return old_capture_envelope(envelope)

monkeypatch.setattr(test_client.transport, "capture_event", append_event)
Expand Down Expand Up @@ -233,8 +233,14 @@ def append_envelope(envelope):


@pytest.fixture
def capture_events_forksafe(monkeypatch):
def capture_events_forksafe(monkeypatch, capture_events, request):
def inner():
in_process_events = capture_events()

@request.addfinalizer
def _():
assert not in_process_events

events_r, events_w = os.pipe()
events_r = os.fdopen(events_r, "rb", 0)
events_w = os.fdopen(events_w, "wb", 0)
Expand Down
62 changes: 43 additions & 19 deletions tests/integrations/celery/test_celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,41 @@ def inner(signal, f):


@pytest.fixture
def init_celery(sentry_init):
def inner(propagate_traces=True, **kwargs):
def init_celery(sentry_init, request):
def inner(propagate_traces=True, backend="always_eager", **kwargs):
sentry_init(
integrations=[CeleryIntegration(propagate_traces=propagate_traces)],
**kwargs
)
celery = Celery(__name__)
if VERSION < (4,):
celery.conf.CELERY_ALWAYS_EAGER = True

if backend == "always_eager":
if VERSION < (4,):
celery.conf.CELERY_ALWAYS_EAGER = True
else:
celery.conf.task_always_eager = True
elif backend == "redis":
# broken on celery 3
if VERSION < (4,):
pytest.skip("Redis backend broken for some reason")

# this backend requires capture_events_forksafe
celery.conf.worker_max_tasks_per_child = 1
celery.conf.broker_url = "redis://127.0.0.1:6379"
celery.conf.result_backend = "redis://127.0.0.1:6379"
celery.conf.task_always_eager = False

Hub.main.bind_client(Hub.current.client)
request.addfinalizer(lambda: Hub.main.bind_client(None))

# Once we drop celery 3 we can use the celery_worker fixture
w = worker.worker(app=celery)
t = threading.Thread(target=w.run)
t.daemon = True
t.start()
else:
celery.conf.task_always_eager = True
raise ValueError(backend)

return celery

return inner
Expand Down Expand Up @@ -273,15 +297,10 @@ def dummy_task(self):


@pytest.mark.forked
@pytest.mark.skipif(VERSION < (4,), reason="in-memory backend broken")
def test_transport_shutdown(request, celery, capture_events_forksafe, tmpdir):
events = capture_events_forksafe()
def test_redis_backend(init_celery, capture_events_forksafe, tmpdir):
celery = init_celery(traces_sample_rate=1.0, backend="redis", debug=True)

celery.conf.worker_max_tasks_per_child = 1
celery.conf.broker_url = "memory://localhost/"
celery.conf.broker_backend = "memory"
celery.conf.result_backend = "file://{}".format(tmpdir.mkdir("celery-results"))
celery.conf.task_always_eager = False
events = capture_events_forksafe()

runs = []

Expand All @@ -290,21 +309,26 @@ def dummy_task(self):
runs.append(1)
1 / 0

res = dummy_task.delay()

w = worker.worker(app=celery)
t = threading.Thread(target=w.run)
t.daemon = True
t.start()
# Curious: Cannot use delay() here or py2.7-celery-4.2 crashes
res = dummy_task.apply_async()

with pytest.raises(Exception):
# Celery 4.1 raises a gibberish exception
res.wait()

# if this is nonempty, the worker never really forked
assert not runs

event = events.read_event()
(exception,) = event["exception"]["values"]
assert exception["type"] == "ZeroDivisionError"

transaction = events.read_event()
assert (
transaction["contexts"]["trace"]["trace_id"]
== event["contexts"]["trace"]["trace_id"]
)

events.read_flush()

# if this is nonempty, the worker never really forked
Expand Down
4 changes: 3 additions & 1 deletion tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ envlist =
{py3.6,py3.7}-sanic-19

# TODO: Add py3.9
{pypy,py2.7,py3.5,py3.6,py3.7,py3.8}-celery-{4.1,4.2,4.3,4.4}
{pypy,py2.7,py3.5,py3.6}-celery-{4.1,4.2}
{pypy,py2.7,py3.5,py3.6,py3.7,py3.8}-celery-{4.3,4.4}
{pypy,py2.7}-celery-3

{py2.7,py3.7}-beam-{2.12,2.13}
Expand Down Expand Up @@ -128,6 +129,7 @@ deps =
beam-2.13: apache-beam>=2.13.0, <2.14.0
beam-master: git+https://github.com/apache/beam#egg=apache-beam&subdirectory=sdks/python

celery: redis
celery-3: Celery>=3.1,<4.0
celery-4.1: Celery>=4.1,<4.2
celery-4.2: Celery>=4.2,<4.3
Expand Down