@@ -11,9 +11,11 @@ use std::collections::BTreeSet;
11
11
12
12
use std:: hash:: { Hash , Hasher } ;
13
13
use std:: mem;
14
+ use std:: ops:: Range ;
14
15
15
16
use chrono:: { DateTime , Duration , Utc } ;
16
17
use relay_base_schema:: metrics:: { is_valid_metric_name, DurationUnit , FractionUnit , MetricUnit } ;
18
+ use relay_common:: time:: UnixTimestamp ;
17
19
use relay_event_schema:: processor:: {
18
20
self , MaxChars , ProcessValue , ProcessingAction , ProcessingResult , ProcessingState , Processor ,
19
21
} ;
@@ -31,25 +33,42 @@ use crate::span::tag_extraction::{self, extract_span_tags};
31
33
use crate :: timestamp:: TimestampProcessor ;
32
34
use crate :: utils:: { self , MAX_DURATION_MOBILE_MS } ;
33
35
use crate :: {
34
- breakdowns, schema, span, transactions, trimming, user_agent, BreakdownsConfig ,
35
- ClockDriftProcessor , DynamicMeasurementsConfig , GeoIpLookup , PerformanceScoreConfig ,
36
- RawUserAgentInfo ,
36
+ breakdowns, end_all_spans, normalize_transaction_name, schema, set_default_transaction_source,
37
+ span, trimming, user_agent, validate_transaction, BreakdownsConfig , ClockDriftProcessor ,
38
+ DynamicMeasurementsConfig , GeoIpLookup , PerformanceScoreConfig , RawUserAgentInfo ,
39
+ TransactionNameConfig ,
37
40
} ;
38
41
39
42
use crate :: LightNormalizationConfig ;
40
43
41
44
/// Configuration for [`NormalizeProcessor`].
42
45
#[ derive( Clone , Debug , Default ) ]
43
- pub struct NormalizeProcessorConfig < ' a > {
46
+ pub ( crate ) struct NormalizeProcessorConfig < ' a > {
44
47
/// Light normalization config.
45
48
// XXX(iker): we should split this config appropriately.
46
- light_config : LightNormalizationConfig < ' a > ,
49
+ pub light_config : LightNormalizationConfig < ' a > ,
50
+
51
+ /// Configuration to apply to transaction names, especially around sanitizing.
52
+ pub transaction_name_config : TransactionNameConfig < ' a > ,
53
+
54
+ /// Timestamp range in which a transaction must end.
55
+ ///
56
+ /// Transactions that finish outside of this range are considered invalid.
57
+ /// This check is skipped if no range is provided.
58
+ pub transaction_range : Option < Range < UnixTimestamp > > ,
47
59
}
48
60
49
61
impl < ' a > From < LightNormalizationConfig < ' a > > for NormalizeProcessorConfig < ' a > {
50
- fn from ( config : LightNormalizationConfig < ' a > ) -> Self {
62
+ fn from ( mut config : LightNormalizationConfig < ' a > ) -> Self {
63
+ // HACK(iker): workaround to avoid cloning of config items. We'll get
64
+ // rid of this when we remove light normalization in the next step.
65
+ let transaction_name_config = std:: mem:: take ( & mut config. transaction_name_config ) ;
66
+ let transaction_range = config. transaction_range . take ( ) ;
67
+
51
68
Self {
52
69
light_config : config,
70
+ transaction_name_config,
71
+ transaction_range,
53
72
}
54
73
}
55
74
}
@@ -62,7 +81,7 @@ impl<'a> From<LightNormalizationConfig<'a>> for NormalizeProcessorConfig<'a> {
62
81
/// The returned [`ProcessingResult`] indicates whether the passed event should
63
82
/// be ingested or dropped.
64
83
#[ derive( Debug , Default ) ]
65
- pub struct NormalizeProcessor < ' a > {
84
+ pub ( crate ) struct NormalizeProcessor < ' a > {
66
85
/// Configuration for the normalization steps.
67
86
config : NormalizeProcessorConfig < ' a > ,
68
87
}
@@ -80,23 +99,40 @@ impl<'a> Processor for NormalizeProcessor<'a> {
80
99
meta : & mut Meta ,
81
100
state : & ProcessingState < ' _ > ,
82
101
) -> ProcessingResult {
102
+ if event. ty . value ( ) == Some ( & EventType :: Transaction ) {
103
+ // TODO: Parts of this processor should probably be a filter so we
104
+ // can revert some changes to ProcessingAction)
105
+
106
+ validate_transaction ( event, self . config . transaction_range . as_ref ( ) ) ?;
107
+
108
+ if let Some ( trace_context) = event. context_mut :: < TraceContext > ( ) {
109
+ trace_context. op . get_or_insert_with ( || "default" . to_owned ( ) ) ;
110
+ }
111
+
112
+ // The transaction name is expected to be non-empty by downstream services (e.g. Snuba), but
113
+ // Relay doesn't reject events missing the transaction name. Instead, a default transaction
114
+ // name is given, similar to how Sentry gives an "<unlabeled event>" title to error events.
115
+ // SDKs should avoid sending empty transaction names, setting a more contextual default
116
+ // value when possible.
117
+ if event. transaction . value ( ) . map_or ( true , |s| s. is_empty ( ) ) {
118
+ event
119
+ . transaction
120
+ . set_value ( Some ( "<unlabeled transaction>" . to_owned ( ) ) )
121
+ }
122
+ set_default_transaction_source ( event) ;
123
+ normalize_transaction_name ( event, & self . config . transaction_name_config ) ?;
124
+ end_all_spans ( event) ?;
125
+ }
126
+
127
+ // XXX(iker): processing child values should be the last step. The logic
128
+ // below this call is being moved (WIP) to the processor appropriately.
83
129
event. process_child_values ( self , state) ?;
84
130
85
131
let config = & self . config . light_config ;
86
132
if config. is_renormalize {
87
133
return Ok ( ( ) ) ;
88
134
}
89
135
90
- // Validate and normalize transaction
91
- // (internally noops for non-transaction events).
92
- // TODO: Parts of this processor should probably be a filter so we
93
- // can revert some changes to ProcessingAction)
94
- let mut transactions_processor = transactions:: TransactionsProcessor :: new (
95
- config. transaction_name_config . clone ( ) ,
96
- config. transaction_range . clone ( ) ,
97
- ) ;
98
- transactions_processor. process_event ( event, meta, ProcessingState :: root ( ) ) ?;
99
-
100
136
// Check for required and non-empty values
101
137
schema:: SchemaProcessor . process_event ( event, meta, ProcessingState :: root ( ) ) ?;
102
138
@@ -973,7 +1009,7 @@ mod tests {
973
1009
use insta:: assert_debug_snapshot;
974
1010
use relay_base_schema:: events:: EventType ;
975
1011
use relay_base_schema:: metrics:: { DurationUnit , MetricUnit } ;
976
- use relay_event_schema:: processor:: { process_value, ProcessingAction , ProcessingState } ;
1012
+ use relay_event_schema:: processor:: { self , process_value, ProcessingAction , ProcessingState } ;
977
1013
use relay_event_schema:: protocol:: {
978
1014
Contexts , Csp , DeviceContext , Event , Headers , IpAddr , Measurement , Measurements , Request ,
979
1015
Span , SpanId , Tags , TraceContext , TraceId ,
@@ -1755,7 +1791,7 @@ mod tests {
1755
1791
}
1756
1792
1757
1793
#[ test]
1758
- fn test_renormalize_is_idempotent ( ) {
1794
+ fn test_renormalize_spans_is_idempotent ( ) {
1759
1795
let json = r#"{
1760
1796
"start_timestamp": 1,
1761
1797
"timestamp": 2,
@@ -1782,6 +1818,53 @@ mod tests {
1782
1818
assert_eq ! ( reprocessed, reprocessed2) ;
1783
1819
}
1784
1820
1821
+ #[ test]
1822
+ fn test_renormalize_transactions_is_idempotent ( ) {
1823
+ let json = r#"{
1824
+ "event_id": "52df9022835246eeb317dbd739ccd059",
1825
+ "type": "transaction",
1826
+ "transaction": "test-transaction",
1827
+ "start_timestamp": 1,
1828
+ "timestamp": 2,
1829
+ "contexts": {
1830
+ "trace": {
1831
+ "trace_id": "ff62a8b040f340bda5d830223def1d81",
1832
+ "span_id": "bd429c44b67a3eb4"
1833
+ }
1834
+ }
1835
+ }"# ;
1836
+
1837
+ let mut processed = Annotated :: < Event > :: from_json ( json) . unwrap ( ) ;
1838
+ let processor_config = NormalizeProcessorConfig :: default ( ) ;
1839
+ let mut processor = NormalizeProcessor :: new ( processor_config. clone ( ) ) ;
1840
+ process_value ( & mut processed, & mut processor, ProcessingState :: root ( ) ) . unwrap ( ) ;
1841
+ remove_received_from_event ( & mut processed) ;
1842
+ let trace_context = get_value ! ( processed!) . context :: < TraceContext > ( ) . unwrap ( ) ;
1843
+ assert_eq ! ( trace_context. op. value( ) . unwrap( ) , "default" ) ;
1844
+
1845
+ let mut reprocess_config = processor_config. clone ( ) ;
1846
+ reprocess_config. light_config . is_renormalize = true ;
1847
+ let mut processor = NormalizeProcessor :: new ( processor_config. clone ( ) ) ;
1848
+
1849
+ let mut reprocessed = processed. clone ( ) ;
1850
+ process_value ( & mut reprocessed, & mut processor, ProcessingState :: root ( ) ) . unwrap ( ) ;
1851
+ remove_received_from_event ( & mut reprocessed) ;
1852
+ assert_eq ! ( processed, reprocessed) ;
1853
+
1854
+ let mut reprocessed2 = reprocessed. clone ( ) ;
1855
+ process_value ( & mut reprocessed2, & mut processor, ProcessingState :: root ( ) ) . unwrap ( ) ;
1856
+ remove_received_from_event ( & mut reprocessed2) ;
1857
+ assert_eq ! ( reprocessed, reprocessed2) ;
1858
+ }
1859
+
1860
+ fn remove_received_from_event ( event : & mut Annotated < Event > ) {
1861
+ processor:: apply ( event, |e, _| {
1862
+ e. received = Annotated :: empty ( ) ;
1863
+ Ok ( ( ) )
1864
+ } )
1865
+ . unwrap ( ) ;
1866
+ }
1867
+
1785
1868
#[ test]
1786
1869
fn test_computed_performance_score ( ) {
1787
1870
let json = r#"
0 commit comments