diff --git a/sentry_sdk/client.py b/sentry_sdk/client.py index c59aa8f72e..7368b1055a 100644 --- a/sentry_sdk/client.py +++ b/sentry_sdk/client.py @@ -2,7 +2,6 @@ import uuid import random from datetime import datetime -from itertools import islice import socket from sentry_sdk._compat import string_types, text_type, iteritems @@ -30,12 +29,11 @@ from typing import Any from typing import Callable from typing import Dict - from typing import List from typing import Optional from sentry_sdk.scope import Scope from sentry_sdk._types import Event, Hint - from sentry_sdk.sessions import Session + from sentry_sdk.session import Session _client_init_debug = ContextVar("client_init_debug") @@ -99,24 +97,20 @@ def _init_impl(self): # type: () -> None old_debug = _client_init_debug.get(False) - def _send_sessions(sessions): - # type: (List[Any]) -> None - transport = self.transport - if not transport or not sessions: - return - sessions_iter = iter(sessions) - while True: - envelope = Envelope() - for session in islice(sessions_iter, 100): - envelope.add_session(session) - if not envelope.items: - break - transport.capture_envelope(envelope) + def _capture_envelope(envelope): + # type: (Envelope) -> None + if self.transport is not None: + self.transport.capture_envelope(envelope) try: _client_init_debug.set(self.options["debug"]) self.transport = make_transport(self.options) - self.session_flusher = SessionFlusher(flush_func=_send_sessions) + session_mode = self.options["_experiments"].get( + "session_mode", "application" + ) + self.session_flusher = SessionFlusher( + capture_func=_capture_envelope, session_mode=session_mode + ) request_bodies = ("always", "never", "small", "medium") if self.options["request_bodies"] not in request_bodies: diff --git a/sentry_sdk/envelope.py b/sentry_sdk/envelope.py index 119abf810f..5645eb8a12 100644 --- a/sentry_sdk/envelope.py +++ b/sentry_sdk/envelope.py @@ -4,7 +4,7 @@ from sentry_sdk._compat import text_type from sentry_sdk._types import MYPY -from sentry_sdk.sessions import Session +from sentry_sdk.session import Session from sentry_sdk.utils import json_dumps, capture_internal_exceptions if MYPY: @@ -62,6 +62,12 @@ def add_session( session = session.to_json() self.add_item(Item(payload=PayloadRef(json=session), type="session")) + def add_sessions( + self, sessions # type: Any + ): + # type: (...) -> None + self.add_item(Item(payload=PayloadRef(json=sessions), type="sessions")) + def add_item( self, item # type: Item ): diff --git a/sentry_sdk/hub.py b/sentry_sdk/hub.py index 1d8883970b..8afa4938a2 100644 --- a/sentry_sdk/hub.py +++ b/sentry_sdk/hub.py @@ -8,7 +8,7 @@ from sentry_sdk.scope import Scope from sentry_sdk.client import Client from sentry_sdk.tracing import Span, Transaction -from sentry_sdk.sessions import Session +from sentry_sdk.session import Session from sentry_sdk.utils import ( exc_info_from_error, event_from_exception, @@ -639,11 +639,12 @@ def end_session(self): """Ends the current session if there is one.""" client, scope = self._stack[-1] session = scope._session + self.scope._session = None + if session is not None: session.close() if client is not None: client.capture_session(session) - self.scope._session = None def stop_auto_session_tracking(self): # type: (...) -> None diff --git a/sentry_sdk/scope.py b/sentry_sdk/scope.py index f471cda3d4..b8e8901c5b 100644 --- a/sentry_sdk/scope.py +++ b/sentry_sdk/scope.py @@ -28,7 +28,7 @@ ) from sentry_sdk.tracing import Span - from sentry_sdk.sessions import Session + from sentry_sdk.session import Session F = TypeVar("F", bound=Callable[..., Any]) T = TypeVar("T") diff --git a/sentry_sdk/session.py b/sentry_sdk/session.py new file mode 100644 index 0000000000..d22c0e70be --- /dev/null +++ b/sentry_sdk/session.py @@ -0,0 +1,172 @@ +import uuid +from datetime import datetime + +from sentry_sdk._types import MYPY +from sentry_sdk.utils import format_timestamp + +if MYPY: + from typing import Optional + from typing import Union + from typing import Any + from typing import Dict + + from sentry_sdk._types import SessionStatus + + +def _minute_trunc(ts): + # type: (datetime) -> datetime + return ts.replace(second=0, microsecond=0) + + +def _make_uuid( + val, # type: Union[str, uuid.UUID] +): + # type: (...) -> uuid.UUID + if isinstance(val, uuid.UUID): + return val + return uuid.UUID(val) + + +class Session(object): + def __init__( + self, + sid=None, # type: Optional[Union[str, uuid.UUID]] + did=None, # type: Optional[str] + timestamp=None, # type: Optional[datetime] + started=None, # type: Optional[datetime] + duration=None, # type: Optional[float] + status=None, # type: Optional[SessionStatus] + release=None, # type: Optional[str] + environment=None, # type: Optional[str] + user_agent=None, # type: Optional[str] + ip_address=None, # type: Optional[str] + errors=None, # type: Optional[int] + user=None, # type: Optional[Any] + ): + # type: (...) -> None + if sid is None: + sid = uuid.uuid4() + if started is None: + started = datetime.utcnow() + if status is None: + status = "ok" + self.status = status + self.did = None # type: Optional[str] + self.started = started + self.release = None # type: Optional[str] + self.environment = None # type: Optional[str] + self.duration = None # type: Optional[float] + self.user_agent = None # type: Optional[str] + self.ip_address = None # type: Optional[str] + self.errors = 0 + + self.update( + sid=sid, + did=did, + timestamp=timestamp, + duration=duration, + release=release, + environment=environment, + user_agent=user_agent, + ip_address=ip_address, + errors=errors, + user=user, + ) + + @property + def truncated_started(self): + # type: (...) -> datetime + return _minute_trunc(self.started) + + def update( + self, + sid=None, # type: Optional[Union[str, uuid.UUID]] + did=None, # type: Optional[str] + timestamp=None, # type: Optional[datetime] + started=None, # type: Optional[datetime] + duration=None, # type: Optional[float] + status=None, # type: Optional[SessionStatus] + release=None, # type: Optional[str] + environment=None, # type: Optional[str] + user_agent=None, # type: Optional[str] + ip_address=None, # type: Optional[str] + errors=None, # type: Optional[int] + user=None, # type: Optional[Any] + ): + # type: (...) -> None + # If a user is supplied we pull some data form it + if user: + if ip_address is None: + ip_address = user.get("ip_address") + if did is None: + did = user.get("id") or user.get("email") or user.get("username") + + if sid is not None: + self.sid = _make_uuid(sid) + if did is not None: + self.did = str(did) + if timestamp is None: + timestamp = datetime.utcnow() + self.timestamp = timestamp + if started is not None: + self.started = started + if duration is not None: + self.duration = duration + if release is not None: + self.release = release + if environment is not None: + self.environment = environment + if ip_address is not None: + self.ip_address = ip_address + if user_agent is not None: + self.user_agent = user_agent + if errors is not None: + self.errors = errors + + if status is not None: + self.status = status + + def close( + self, status=None # type: Optional[SessionStatus] + ): + # type: (...) -> Any + if status is None and self.status == "ok": + status = "exited" + if status is not None: + self.update(status=status) + + def get_json_attrs( + self, with_user_info=True # type: Optional[bool] + ): + # type: (...) -> Any + attrs = {} + if self.release is not None: + attrs["release"] = self.release + if self.environment is not None: + attrs["environment"] = self.environment + if with_user_info: + if self.ip_address is not None: + attrs["ip_address"] = self.ip_address + if self.user_agent is not None: + attrs["user_agent"] = self.user_agent + return attrs + + def to_json(self): + # type: (...) -> Any + rv = { + "sid": str(self.sid), + "init": True, + "started": format_timestamp(self.started), + "timestamp": format_timestamp(self.timestamp), + "status": self.status, + } # type: Dict[str, Any] + if self.errors: + rv["errors"] = self.errors + if self.did is not None: + rv["did"] = self.did + if self.duration is not None: + rv["duration"] = self.duration + attrs = self.get_json_attrs() + if attrs: + rv["attrs"] = attrs + return rv diff --git a/sentry_sdk/sessions.py b/sentry_sdk/sessions.py index b8ef201e2a..a8321685d0 100644 --- a/sentry_sdk/sessions.py +++ b/sentry_sdk/sessions.py @@ -1,24 +1,22 @@ import os -import uuid import time -from datetime import datetime from threading import Thread, Lock from contextlib import contextmanager +import sentry_sdk +from sentry_sdk.envelope import Envelope +from sentry_sdk.session import Session from sentry_sdk._types import MYPY from sentry_sdk.utils import format_timestamp if MYPY: - import sentry_sdk - + from typing import Callable from typing import Optional - from typing import Union from typing import Any from typing import Dict + from typing import List from typing import Generator - from sentry_sdk._types import SessionStatus - def is_auto_session_tracking_enabled(hub=None): # type: (Optional[sentry_sdk.Hub]) -> bool @@ -48,38 +46,60 @@ def auto_session_tracking(hub=None): hub.end_session() -def _make_uuid( - val, # type: Union[str, uuid.UUID] -): - # type: (...) -> uuid.UUID - if isinstance(val, uuid.UUID): - return val - return uuid.UUID(val) +TERMINAL_SESSION_STATES = ("exited", "abnormal", "crashed") +MAX_ENVELOPE_ITEMS = 100 -TERMINAL_SESSION_STATES = ("exited", "abnormal", "crashed") +def make_aggregate_envelope(aggregate_states, attrs): + # type: (Any, Any) -> Any + return {"attrs": dict(attrs), "aggregates": list(aggregate_states.values())} class SessionFlusher(object): def __init__( self, - flush_func, # type: Any - flush_interval=10, # type: int + capture_func, # type: Callable[[Envelope], None] + session_mode, # type: str + flush_interval=60, # type: int ): # type: (...) -> None - self.flush_func = flush_func + self.capture_func = capture_func + self.session_mode = session_mode self.flush_interval = flush_interval - self.pending = {} # type: Dict[str, Any] + self.pending_sessions = [] # type: List[Any] + self.pending_aggregates = {} # type: Dict[Any, Any] self._thread = None # type: Optional[Thread] self._thread_lock = Lock() + self._aggregate_lock = Lock() self._thread_for_pid = None # type: Optional[int] self._running = True def flush(self): # type: (...) -> None - pending = self.pending - self.pending = {} - self.flush_func(list(pending.values())) + pending_sessions = self.pending_sessions + self.pending_sessions = [] + + with self._aggregate_lock: + pending_aggregates = self.pending_aggregates + self.pending_aggregates = {} + + envelope = Envelope() + for session in pending_sessions: + if len(envelope.items) == MAX_ENVELOPE_ITEMS: + self.capture_func(envelope) + envelope = Envelope() + + envelope.add_session(session) + + for (attrs, states) in pending_aggregates.items(): + if len(envelope.items) == MAX_ENVELOPE_ITEMS: + self.capture_func(envelope) + envelope = Envelope() + + envelope.add_sessions(make_aggregate_envelope(states, attrs)) + + if len(envelope.items) > 0: + self.capture_func(envelope) def _ensure_running(self): # type: (...) -> None @@ -93,7 +113,7 @@ def _thread(): # type: (...) -> None while self._running: time.sleep(self.flush_interval) - if self.pending and self._running: + if self._running: self.flush() thread = Thread(target=_thread) @@ -103,11 +123,45 @@ def _thread(): self._thread_for_pid = os.getpid() return None + def add_aggregate_session( + self, session # type: Session + ): + # type: (...) -> None + # NOTE on `session.did`: + # the protocol can deal with buckets that have a distinct-id, however + # in practice we expect the python SDK to have an extremely high cardinality + # here, effectively making aggregation useless, therefore we do not + # aggregate per-did. + + # For this part we can get away with using the global interpreter lock + with self._aggregate_lock: + attrs = session.get_json_attrs(with_user_info=False) + primary_key = tuple(sorted(attrs.items())) + secondary_key = session.truncated_started # (, session.did) + states = self.pending_aggregates.setdefault(primary_key, {}) + state = states.setdefault(secondary_key, {}) + + if "started" not in state: + state["started"] = format_timestamp(session.truncated_started) + # if session.did is not None: + # state["did"] = session.did + if session.status == "crashed": + state["crashed"] = state.get("crashed", 0) + 1 + elif session.status == "abnormal": + state["abnormal"] = state.get("abnormal", 0) + 1 + elif session.errors > 0: + state["errored"] = state.get("errored", 0) + 1 + else: + state["exited"] = state.get("exited", 0) + 1 + def add_session( self, session # type: Session ): # type: (...) -> None - self.pending[session.sid.hex] = session.to_json() + if self.session_mode == "request": + self.add_aggregate_session(session) + else: + self.pending_sessions.append(session.to_json()) self._ensure_running() def kill(self): @@ -117,136 +171,3 @@ def kill(self): def __del__(self): # type: (...) -> None self.kill() - - -class Session(object): - def __init__( - self, - sid=None, # type: Optional[Union[str, uuid.UUID]] - did=None, # type: Optional[str] - timestamp=None, # type: Optional[datetime] - started=None, # type: Optional[datetime] - duration=None, # type: Optional[float] - status=None, # type: Optional[SessionStatus] - release=None, # type: Optional[str] - environment=None, # type: Optional[str] - user_agent=None, # type: Optional[str] - ip_address=None, # type: Optional[str] - errors=None, # type: Optional[int] - user=None, # type: Optional[Any] - ): - # type: (...) -> None - if sid is None: - sid = uuid.uuid4() - if started is None: - started = datetime.utcnow() - if status is None: - status = "ok" - self.status = status - self.did = None # type: Optional[str] - self.started = started - self.release = None # type: Optional[str] - self.environment = None # type: Optional[str] - self.duration = None # type: Optional[float] - self.user_agent = None # type: Optional[str] - self.ip_address = None # type: Optional[str] - self.errors = 0 - - self.update( - sid=sid, - did=did, - timestamp=timestamp, - duration=duration, - release=release, - environment=environment, - user_agent=user_agent, - ip_address=ip_address, - errors=errors, - user=user, - ) - - def update( - self, - sid=None, # type: Optional[Union[str, uuid.UUID]] - did=None, # type: Optional[str] - timestamp=None, # type: Optional[datetime] - started=None, # type: Optional[datetime] - duration=None, # type: Optional[float] - status=None, # type: Optional[SessionStatus] - release=None, # type: Optional[str] - environment=None, # type: Optional[str] - user_agent=None, # type: Optional[str] - ip_address=None, # type: Optional[str] - errors=None, # type: Optional[int] - user=None, # type: Optional[Any] - ): - # type: (...) -> None - # If a user is supplied we pull some data form it - if user: - if ip_address is None: - ip_address = user.get("ip_address") - if did is None: - did = user.get("id") or user.get("email") or user.get("username") - - if sid is not None: - self.sid = _make_uuid(sid) - if did is not None: - self.did = str(did) - if timestamp is None: - timestamp = datetime.utcnow() - self.timestamp = timestamp - if started is not None: - self.started = started - if duration is not None: - self.duration = duration - if release is not None: - self.release = release - if environment is not None: - self.environment = environment - if ip_address is not None: - self.ip_address = ip_address - if user_agent is not None: - self.user_agent = user_agent - if errors is not None: - self.errors = errors - - if status is not None: - self.status = status - - def close( - self, status=None # type: Optional[SessionStatus] - ): - # type: (...) -> Any - if status is None and self.status == "ok": - status = "exited" - if status is not None: - self.update(status=status) - - def to_json(self): - # type: (...) -> Any - rv = { - "sid": str(self.sid), - "init": True, - "started": format_timestamp(self.started), - "timestamp": format_timestamp(self.timestamp), - "status": self.status, - } # type: Dict[str, Any] - if self.errors: - rv["errors"] = self.errors - if self.did is not None: - rv["did"] = self.did - if self.duration is not None: - rv["duration"] = self.duration - - attrs = {} - if self.release is not None: - attrs["release"] = self.release - if self.environment is not None: - attrs["environment"] = self.environment - if self.ip_address is not None: - attrs["ip_address"] = self.ip_address - if self.user_agent is not None: - attrs["user_agent"] = self.user_agent - if attrs: - rv["attrs"] = attrs - return rv diff --git a/tests/test_envelope.py b/tests/test_envelope.py index 96c33f0c99..e795e9d93c 100644 --- a/tests/test_envelope.py +++ b/tests/test_envelope.py @@ -1,5 +1,5 @@ from sentry_sdk.envelope import Envelope -from sentry_sdk.sessions import Session +from sentry_sdk.session import Session def generate_transaction_item(): diff --git a/tests/test_sessions.py b/tests/test_sessions.py index dfe9ee1dc6..6c84f029dd 100644 --- a/tests/test_sessions.py +++ b/tests/test_sessions.py @@ -1,4 +1,13 @@ +import sentry_sdk + from sentry_sdk import Hub +from sentry_sdk.sessions import auto_session_tracking + + +def sorted_aggregates(item): + aggregates = item["aggregates"] + aggregates.sort(key=lambda item: (item["started"], item.get("did", ""))) + return aggregates def test_basic(sentry_init, capture_envelopes): @@ -24,11 +33,55 @@ def test_basic(sentry_init, capture_envelopes): assert len(sess.items) == 1 sess_event = sess.items[0].payload.json + assert sess_event["attrs"] == { + "release": "fun-release", + "environment": "not-fun-env", + } assert sess_event["did"] == "42" assert sess_event["init"] assert sess_event["status"] == "exited" assert sess_event["errors"] == 1 + + +def test_aggregates(sentry_init, capture_envelopes): + sentry_init( + release="fun-release", + environment="not-fun-env", + _experiments={"auto_session_tracking": True, "session_mode": "request"}, + ) + envelopes = capture_envelopes() + + hub = Hub.current + + with auto_session_tracking(): + with sentry_sdk.push_scope(): + try: + with sentry_sdk.configure_scope() as scope: + scope.set_user({"id": "42"}) + raise Exception("all is wrong") + except Exception: + sentry_sdk.capture_exception() + + with auto_session_tracking(): + pass + + hub.start_session() + hub.end_session() + + sentry_sdk.flush() + + assert len(envelopes) == 2 + assert envelopes[0].get_event() is not None + + sess = envelopes[1] + assert len(sess.items) == 1 + sess_event = sess.items[0].payload.json assert sess_event["attrs"] == { "release": "fun-release", "environment": "not-fun-env", } + + aggregates = sorted_aggregates(sess_event) + assert len(aggregates) == 1 + assert aggregates[0]["exited"] == 2 + assert aggregates[0]["errored"] == 1