Skip to content

Commit a67914c

Browse files
jan-auermitsuhikoantonpirker
authored
feat: Code locations for metrics (getsentry#2526)
DDM wants to show code locations with metrics. Locations are semi-static information: they change infrequently, so they don't need to be reported with every data point. Sentry expects locations to be reported at least once per day. With backdating of metrics, the timestamp used to report the location is the metric bucket's timestamp rounded down to the start of the day (UTC timezone). The metrics aggregator keeps a cache of previously reported locations. When a location is seen for the first time on a day, it is added to a list of pending locations. On the next flush cycle, all pending locations are sent to Sentry in the same envelope as the metric buckets. See: getsentry/relay#2751 Epic: getsentry/sentry#60260 --------- Co-authored-by: Armin Ronacher <[email protected]> Co-authored-by: Anton Pirker <[email protected]>
1 parent 088431e commit a67914c

File tree

5 files changed

+265
-51
lines changed

5 files changed

+265
-51
lines changed

sentry_sdk/_types.py

+1
Original file line numberDiff line numberDiff line change
@@ -117,3 +117,4 @@
117117
FlushedMetricValue = Union[int, float]
118118

119119
BucketKey = Tuple[MetricType, str, MeasurementUnit, MetricTagsInternal]
120+
MetricMetaKey = Tuple[MetricType, str, MeasurementUnit]

sentry_sdk/client.py

+6-2
Original file line numberDiff line numberDiff line change
@@ -237,11 +237,15 @@ def _capture_envelope(envelope):
237237
self.session_flusher = SessionFlusher(capture_func=_capture_envelope)
238238

239239
self.metrics_aggregator = None # type: Optional[MetricsAggregator]
240-
if self.options.get("_experiments", {}).get("enable_metrics"):
240+
experiments = self.options.get("_experiments", {})
241+
if experiments.get("enable_metrics"):
241242
from sentry_sdk.metrics import MetricsAggregator
242243

243244
self.metrics_aggregator = MetricsAggregator(
244-
capture_func=_capture_envelope
245+
capture_func=_capture_envelope,
246+
enable_code_locations=bool(
247+
experiments.get("metric_code_locations")
248+
),
245249
)
246250

247251
max_request_body_size = ("always", "never", "small", "medium")

sentry_sdk/consts.py

+1
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
"transport_num_pools": Optional[int],
4848
"enable_metrics": Optional[bool],
4949
"before_emit_metric": Optional[Callable[[str, MetricTags], bool]],
50+
"metric_code_locations": Optional[bool],
5051
},
5152
total=False,
5253
)

sentry_sdk/metrics.py

+113-25
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import os
22
import io
33
import re
4+
import sys
45
import threading
56
import random
67
import time
@@ -11,8 +12,14 @@
1112
from contextlib import contextmanager
1213

1314
import sentry_sdk
14-
from sentry_sdk._compat import text_type
15-
from sentry_sdk.utils import now, nanosecond_time, to_timestamp
15+
from sentry_sdk._compat import text_type, utc_from_timestamp, iteritems
16+
from sentry_sdk.utils import (
17+
now,
18+
nanosecond_time,
19+
to_timestamp,
20+
serialize_frame,
21+
json_dumps,
22+
)
1623
from sentry_sdk.envelope import Envelope, Item
1724
from sentry_sdk.tracing import (
1825
TRANSACTION_SOURCE_ROUTE,
@@ -24,18 +31,21 @@
2431

2532
if TYPE_CHECKING:
2633
from typing import Any
34+
from typing import Callable
2735
from typing import Dict
36+
from typing import Generator
2837
from typing import Iterable
29-
from typing import Callable
38+
from typing import List
3039
from typing import Optional
31-
from typing import Generator
40+
from typing import Set
3241
from typing import Tuple
3342
from typing import Union
3443

3544
from sentry_sdk._types import BucketKey
3645
from sentry_sdk._types import DurationUnit
3746
from sentry_sdk._types import FlushedMetricValue
3847
from sentry_sdk._types import MeasurementUnit
48+
from sentry_sdk._types import MetricMetaKey
3949
from sentry_sdk._types import MetricTagValue
4050
from sentry_sdk._types import MetricTags
4151
from sentry_sdk._types import MetricTagsInternal
@@ -46,6 +56,7 @@
4656
_thread_local = threading.local()
4757
_sanitize_key = partial(re.compile(r"[^a-zA-Z0-9_/.-]+").sub, "_")
4858
_sanitize_value = partial(re.compile(r"[^\w\d_:/@\.{}\[\]$-]+", re.UNICODE).sub, "_")
59+
_set = set # set is shadowed below
4960

5061
GOOD_TRANSACTION_SOURCES = frozenset(
5162
[
@@ -57,6 +68,18 @@
5768
)
5869

5970

71+
def get_code_location(stacklevel):
72+
# type: (int) -> Optional[Dict[str, Any]]
73+
try:
74+
frm = sys._getframe(stacklevel + 4)
75+
except Exception:
76+
return None
77+
78+
return serialize_frame(
79+
frm, include_local_variables=False, include_source_context=False
80+
)
81+
82+
6083
@contextmanager
6184
def recursion_protection():
6285
# type: () -> Generator[bool, None, None]
@@ -247,7 +270,7 @@ def _encode_metrics(flushable_buckets):
247270
# relay side emission and should not happen commonly.
248271

249272
for timestamp, buckets in flushable_buckets:
250-
for bucket_key, metric in buckets.items():
273+
for bucket_key, metric in iteritems(buckets):
251274
metric_type, metric_name, metric_unit, metric_tags = bucket_key
252275
metric_name = _sanitize_key(metric_name)
253276
_write(metric_name.encode("utf-8"))
@@ -283,6 +306,20 @@ def _encode_metrics(flushable_buckets):
283306
return out.getvalue()
284307

285308

309+
def _encode_locations(timestamp, code_locations):
310+
# type: (int, Iterable[Tuple[MetricMetaKey, Dict[str, Any]]]) -> bytes
311+
mapping = {} # type: Dict[str, List[Any]]
312+
313+
for key, loc in code_locations:
314+
metric_type, name, unit = key
315+
mri = "{}:{}@{}".format(metric_type, _sanitize_key(name), unit)
316+
317+
loc["type"] = "location"
318+
mapping.setdefault(mri, []).append(loc)
319+
320+
return json_dumps({"timestamp": timestamp, "mapping": mapping})
321+
322+
286323
METRIC_TYPES = {
287324
"c": CounterMetric,
288325
"g": GaugeMetric,
@@ -311,9 +348,13 @@ class MetricsAggregator(object):
311348
def __init__(
312349
self,
313350
capture_func, # type: Callable[[Envelope], None]
351+
enable_code_locations=False, # type: bool
314352
):
315353
# type: (...) -> None
316354
self.buckets = {} # type: Dict[int, Any]
355+
self._enable_code_locations = enable_code_locations
356+
self._seen_locations = _set() # type: Set[Tuple[int, MetricMetaKey]]
357+
self._pending_locations = {} # type: Dict[int, List[Tuple[MetricMetaKey, Any]]]
317358
self._buckets_total_weight = 0
318359
self._capture_func = capture_func
319360
self._lock = Lock()
@@ -366,9 +407,7 @@ def _flush_loop(self):
366407

367408
def _flush(self):
368409
# type: (...) -> None
369-
flushable_buckets = self._flushable_buckets()
370-
if flushable_buckets:
371-
self._emit(flushable_buckets)
410+
self._emit(self._flushable_buckets(), self._flushable_locations())
372411

373412
def _flushable_buckets(self):
374413
# type: (...) -> (Iterable[Tuple[int, Dict[BucketKey, Metric]]])
@@ -385,21 +424,28 @@ def _flushable_buckets(self):
385424
self._force_flush = False
386425
else:
387426
flushable_buckets = []
388-
for buckets_timestamp, buckets in self.buckets.items():
427+
for buckets_timestamp, buckets in iteritems(self.buckets):
389428
# If the timestamp of the bucket is newer that the rollup we want to skip it.
390429
if buckets_timestamp <= cutoff:
391430
flushable_buckets.append((buckets_timestamp, buckets))
392431

393432
# We will clear the elements while holding the lock, in order to avoid requesting it downstream again.
394433
for buckets_timestamp, buckets in flushable_buckets:
395-
for _, metric in buckets.items():
434+
for _, metric in iteritems(buckets):
396435
weight_to_remove += metric.weight
397436
del self.buckets[buckets_timestamp]
398437

399438
self._buckets_total_weight -= weight_to_remove
400439

401440
return flushable_buckets
402441

442+
def _flushable_locations(self):
443+
# type: (...) -> Dict[int, List[Tuple[MetricMetaKey, Dict[str, Any]]]]
444+
with self._lock:
445+
locations = self._pending_locations
446+
self._pending_locations = {}
447+
return locations
448+
403449
@metrics_noop
404450
def add(
405451
self,
@@ -409,6 +455,7 @@ def add(
409455
unit, # type: MeasurementUnit
410456
tags, # type: Optional[MetricTags]
411457
timestamp=None, # type: Optional[Union[float, datetime]]
458+
stacklevel=0, # type: int
412459
):
413460
# type: (...) -> None
414461
if not self._ensure_thread() or self._flusher is None:
@@ -441,6 +488,24 @@ def add(
441488

442489
self._buckets_total_weight += metric.weight - previous_weight
443490

491+
# Store code location once per metric and per day (of bucket timestamp)
492+
if self._enable_code_locations:
493+
meta_key = (ty, key, unit)
494+
start_of_day = utc_from_timestamp(timestamp).replace(
495+
hour=0, minute=0, second=0, microsecond=0, tzinfo=None
496+
)
497+
start_of_day = int(to_timestamp(start_of_day))
498+
499+
if (start_of_day, meta_key) not in self._seen_locations:
500+
self._seen_locations.add((start_of_day, meta_key))
501+
loc = get_code_location(stacklevel)
502+
if loc is not None:
503+
# Group metadata by day to make flushing more efficient.
504+
# There needs to be one envelope item per timestamp.
505+
self._pending_locations.setdefault(start_of_day, []).append(
506+
(meta_key, loc)
507+
)
508+
444509
# Given the new weight we consider whether we want to force flush.
445510
self._consider_force_flush()
446511

@@ -471,13 +536,23 @@ def _consider_force_flush(self):
471536
def _emit(
472537
self,
473538
flushable_buckets, # type: (Iterable[Tuple[int, Dict[BucketKey, Metric]]])
539+
code_locations, # type: Dict[int, List[Tuple[MetricMetaKey, Dict[str, Any]]]]
474540
):
475-
# type: (...) -> Envelope
476-
encoded_metrics = _encode_metrics(flushable_buckets)
477-
metric_item = Item(payload=encoded_metrics, type="statsd")
478-
envelope = Envelope(items=[metric_item])
479-
self._capture_func(envelope)
480-
return envelope
541+
# type: (...) -> Optional[Envelope]
542+
envelope = Envelope()
543+
544+
if flushable_buckets:
545+
encoded_metrics = _encode_metrics(flushable_buckets)
546+
envelope.add_item(Item(payload=encoded_metrics, type="statsd"))
547+
548+
for timestamp, locations in iteritems(code_locations):
549+
encoded_locations = _encode_locations(timestamp, locations)
550+
envelope.add_item(Item(payload=encoded_locations, type="metric_meta"))
551+
552+
if envelope.items:
553+
self._capture_func(envelope)
554+
return envelope
555+
return None
481556

482557
def _serialize_tags(
483558
self, tags # type: Optional[MetricTags]
@@ -487,7 +562,7 @@ def _serialize_tags(
487562
return ()
488563

489564
rv = []
490-
for key, value in tags.items():
565+
for key, value in iteritems(tags):
491566
# If the value is a collection, we want to flatten it.
492567
if isinstance(value, (list, tuple)):
493568
for inner_value in value:
@@ -536,12 +611,13 @@ def incr(
536611
unit="none", # type: MeasurementUnit
537612
tags=None, # type: Optional[MetricTags]
538613
timestamp=None, # type: Optional[Union[float, datetime]]
614+
stacklevel=0, # type: int
539615
):
540616
# type: (...) -> None
541617
"""Increments a counter."""
542618
aggregator, tags = _get_aggregator_and_update_tags(key, tags)
543619
if aggregator is not None:
544-
aggregator.add("c", key, value, unit, tags, timestamp)
620+
aggregator.add("c", key, value, unit, tags, timestamp, stacklevel)
545621

546622

547623
class _Timing(object):
@@ -552,6 +628,7 @@ def __init__(
552628
timestamp, # type: Optional[Union[float, datetime]]
553629
value, # type: Optional[float]
554630
unit, # type: DurationUnit
631+
stacklevel, # type: int
555632
):
556633
# type: (...) -> None
557634
self.key = key
@@ -560,6 +637,7 @@ def __init__(
560637
self.value = value
561638
self.unit = unit
562639
self.entered = None # type: Optional[float]
640+
self.stacklevel = stacklevel
563641

564642
def _validate_invocation(self, context):
565643
# type: (str) -> None
@@ -579,7 +657,9 @@ def __exit__(self, exc_type, exc_value, tb):
579657
aggregator, tags = _get_aggregator_and_update_tags(self.key, self.tags)
580658
if aggregator is not None:
581659
elapsed = TIMING_FUNCTIONS[self.unit]() - self.entered # type: ignore
582-
aggregator.add("d", self.key, elapsed, self.unit, tags, self.timestamp)
660+
aggregator.add(
661+
"d", self.key, elapsed, self.unit, tags, self.timestamp, self.stacklevel
662+
)
583663

584664
def __call__(self, f):
585665
# type: (Any) -> Any
@@ -589,7 +669,11 @@ def __call__(self, f):
589669
def timed_func(*args, **kwargs):
590670
# type: (*Any, **Any) -> Any
591671
with timing(
592-
key=self.key, tags=self.tags, timestamp=self.timestamp, unit=self.unit
672+
key=self.key,
673+
tags=self.tags,
674+
timestamp=self.timestamp,
675+
unit=self.unit,
676+
stacklevel=self.stacklevel + 1,
593677
):
594678
return f(*args, **kwargs)
595679

@@ -602,6 +686,7 @@ def timing(
602686
unit="second", # type: DurationUnit
603687
tags=None, # type: Optional[MetricTags]
604688
timestamp=None, # type: Optional[Union[float, datetime]]
689+
stacklevel=0, # type: int
605690
):
606691
# type: (...) -> _Timing
607692
"""Emits a distribution with the time it takes to run the given code block.
@@ -615,8 +700,8 @@ def timing(
615700
if value is not None:
616701
aggregator, tags = _get_aggregator_and_update_tags(key, tags)
617702
if aggregator is not None:
618-
aggregator.add("d", key, value, unit, tags, timestamp)
619-
return _Timing(key, tags, timestamp, value, unit)
703+
aggregator.add("d", key, value, unit, tags, timestamp, stacklevel)
704+
return _Timing(key, tags, timestamp, value, unit, stacklevel)
620705

621706

622707
def distribution(
@@ -625,12 +710,13 @@ def distribution(
625710
unit="none", # type: MeasurementUnit
626711
tags=None, # type: Optional[MetricTags]
627712
timestamp=None, # type: Optional[Union[float, datetime]]
713+
stacklevel=0, # type: int
628714
):
629715
# type: (...) -> None
630716
"""Emits a distribution."""
631717
aggregator, tags = _get_aggregator_and_update_tags(key, tags)
632718
if aggregator is not None:
633-
aggregator.add("d", key, value, unit, tags, timestamp)
719+
aggregator.add("d", key, value, unit, tags, timestamp, stacklevel)
634720

635721

636722
def set(
@@ -639,12 +725,13 @@ def set(
639725
unit="none", # type: MeasurementUnit
640726
tags=None, # type: Optional[MetricTags]
641727
timestamp=None, # type: Optional[Union[float, datetime]]
728+
stacklevel=0, # type: int
642729
):
643730
# type: (...) -> None
644731
"""Emits a set."""
645732
aggregator, tags = _get_aggregator_and_update_tags(key, tags)
646733
if aggregator is not None:
647-
aggregator.add("s", key, value, unit, tags, timestamp)
734+
aggregator.add("s", key, value, unit, tags, timestamp, stacklevel)
648735

649736

650737
def gauge(
@@ -653,9 +740,10 @@ def gauge(
653740
unit="none", # type: MetricValue
654741
tags=None, # type: Optional[MetricTags]
655742
timestamp=None, # type: Optional[Union[float, datetime]]
743+
stacklevel=0, # type: int
656744
):
657745
# type: (...) -> None
658746
"""Emits a gauge."""
659747
aggregator, tags = _get_aggregator_and_update_tags(key, tags)
660748
if aggregator is not None:
661-
aggregator.add("g", key, value, unit, tags, timestamp)
749+
aggregator.add("g", key, value, unit, tags, timestamp, stacklevel)

0 commit comments

Comments
 (0)