Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(outcomes): Emit outcomes as client reports [INGEST-247] #1119

Merged
merged 40 commits into from
Nov 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
57ff26e
wip
jjbayer Nov 5, 2021
4fad457
wip
jjbayer Nov 8, 2021
ed93929
wip
jjbayer Nov 8, 2021
64d6d5f
fix: processing flow
jjbayer Nov 8, 2021
2bdab2c
wip
jjbayer Nov 8, 2021
a519519
feat(outcomes): Aggregate client reports in outcome aggregator
jjbayer Nov 8, 2021
e15501f
test: Test aggregation in integration test
jjbayer Nov 8, 2021
9496481
ref: Cleanup + doc
jjbayer Nov 8, 2021
c2dd6ea
Add statsd metric for outcome aggregator flush time
jjbayer Nov 8, 2021
91a631c
ref: Remove old comment
jjbayer Nov 8, 2021
64749c0
fix: Increase mailbox size
jjbayer Nov 9, 2021
e0214ed
Add changelog
jjbayer Nov 9, 2021
29581bc
ref: Use or_default instead of or_insert_with
jjbayer Nov 9, 2021
d9a2ad9
Merge branch 'feat/aggregate-client-reports' into feat/outcomes-as-cl…
jjbayer Nov 9, 2021
2d73cee
feat: Make aggregation a noop if event_id is set
jjbayer Nov 9, 2021
d5556a9
feat: Send client report envelopes from outcomes
jjbayer Nov 9, 2021
20e01de
wip
jjbayer Nov 9, 2021
f6179bc
feat: Add outcome to client reports
jjbayer Nov 9, 2021
e9741f9
feat: Disable outcome aggregator if not needed
jjbayer Nov 9, 2021
01c034a
feat: Erase event_id and remote_addr for lossy outcome aggregation
jjbayer Nov 9, 2021
73681af
Remove Accepted variant of Outcome enum
jjbayer Nov 9, 2021
e97d4b6
wip
jjbayer Nov 10, 2021
19c9333
test: Use dynamic sampling in integration test
jjbayer Nov 11, 2021
3f9c410
Merge branch 'master' into feat/outcomes-as-client-reports-original
jjbayer Nov 11, 2021
bc2c141
ref(config): tri-state emit outcomes config
jjbayer Nov 12, 2021
85a6587
ref: cleanup
jjbayer Nov 12, 2021
5646935
Add statsd tag name for client reports
jjbayer Nov 12, 2021
c73e9f5
ref: Add new fields to client report (one per outcome type)
jjbayer Nov 12, 2021
0c6c629
ref: Custom serializer for EmitOutcomes
jjbayer Nov 12, 2021
1576c29
feat: Allow dynamic sampling config to propagate to external relays
jjbayer Nov 12, 2021
d534c1c
test: Fix integration test
jjbayer Nov 12, 2021
26767f2
fix: tests and clippy
jjbayer Nov 15, 2021
8bf84d9
ref: Apply review feedback
jjbayer Nov 16, 2021
d602615
feat: Batch client reports before sending as envelopes
jjbayer Nov 16, 2021
a09c68a
feat: Complete Outcome::from_outcome_type
jjbayer Nov 17, 2021
c8e1792
fix: Use same interval for client report batch as for outcome aggregator
jjbayer Nov 17, 2021
57401b4
Merge remote-tracking branch 'origin/master' into feat/outcomes-as-cl…
jjbayer Nov 17, 2021
84be868
Add changelog
jjbayer Nov 17, 2021
d2a9f19
Apply review feedback
jjbayer Nov 17, 2021
b13d136
Merge remote-tracking branch 'origin/master' into feat/outcomes-as-cl…
jjbayer Nov 18, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## Unreleased

**Features**

- External Relays perform dynamic sampling and emit outcomes as client reports. This feature is now enabled *by default*. ([#1119](https://github.com/getsentry/relay/pull/1119))

**Internal**

- Add more statsd metrics for relay metric bucketing. ([#1124](https://github.com/getsentry/relay/pull/1124))
Expand Down
109 changes: 105 additions & 4 deletions relay-config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use std::str::FromStr;
use std::time::Duration;

use failure::{Backtrace, Context, Fail};
use serde::de::{Unexpected, Visitor};
use serde::{de::DeserializeOwned, Deserialize, Deserializer, Serialize, Serializer};

use relay_auth::{generate_key_pair, generate_relay_id, PublicKey, RelayId, SecretKey};
Expand Down Expand Up @@ -983,13 +984,89 @@ impl Default for OutcomeAggregatorConfig {
}
}

/// Determines how to emit outcomes.
/// For compatibility reasons, this can either be true, false or AsClientReports
#[derive(Copy, Clone, Debug, PartialEq, Eq)]

pub enum EmitOutcomes {
/// Do not emit any outcomes
None,
/// Emit outcomes as client reports
AsClientReports,
/// Emit outcomes as outcomes
AsOutcomes,
}

impl EmitOutcomes {
/// Returns true of outcomes are emitted via http, kafka, or client reports.
pub fn any(&self) -> bool {
!matches!(self, EmitOutcomes::None)
}
}

impl Serialize for EmitOutcomes {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
// For compatibility, serialize None and AsOutcomes as booleans.
match self {
Self::None => serializer.serialize_bool(false),
Self::AsClientReports => serializer.serialize_str("as_client_reports"),
Self::AsOutcomes => serializer.serialize_bool(true),
}
}
}

struct EmitOutcomesVisitor;

impl<'de> Visitor<'de> for EmitOutcomesVisitor {
type Value = EmitOutcomes;

fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("true, false, or 'as_client_reports'")
}

fn visit_bool<E>(self, v: bool) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
Ok(if v {
EmitOutcomes::AsOutcomes
} else {
EmitOutcomes::None
})
}

fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
if v == "as_client_reports" {
Ok(EmitOutcomes::AsClientReports)
} else {
Err(E::invalid_value(Unexpected::Str(v), &"as_client_reports"))
}
}
}

impl<'de> Deserialize<'de> for EmitOutcomes {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
deserializer.deserialize_any(EmitOutcomesVisitor)
}
}

/// Outcome generation specific configuration values.
#[derive(Serialize, Deserialize, Debug)]
#[serde(default)]
pub struct Outcomes {
/// Controls whether outcomes will be emitted when processing is disabled.
/// Processing relays always emit outcomes (for backwards compatibility).
pub emit_outcomes: bool,
/// Can take the following values: false, "as_client_reports", true
pub emit_outcomes: EmitOutcomes,
/// Controls wheather client reported outcomes should be emitted.
pub emit_client_outcomes: bool,
/// The maximum number of outcomes that are batched before being sent
Expand All @@ -1008,7 +1085,7 @@ pub struct Outcomes {
impl Default for Outcomes {
fn default() -> Self {
Outcomes {
emit_outcomes: false,
emit_outcomes: EmitOutcomes::AsClientReports,
emit_client_outcomes: true,
batch_size: 1000,
batch_interval: 500,
Expand Down Expand Up @@ -1487,8 +1564,11 @@ impl Config {
///
/// This is `true` either if `outcomes.emit_outcomes` is explicitly enabled, or if this Relay is
/// in processing mode.
pub fn emit_outcomes(&self) -> bool {
self.values.outcomes.emit_outcomes || self.values.processing.enabled
pub fn emit_outcomes(&self) -> EmitOutcomes {
if self.processing_enabled() {
return EmitOutcomes::AsOutcomes;
}
self.values.outcomes.emit_outcomes
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jjbayer this makes me think, maybe we should make the field in the config Option<EmitOutcomes> so explicitly setting the value to anything takes precedence over this. but not entirely sure. can leave it out for now

}

/// Returns whether this Relay should emit client outcomes
Expand Down Expand Up @@ -1856,4 +1936,25 @@ cache:
assert_eq!(values.cache.envelope_buffer_size, 1_000_000);
assert_eq!(values.cache.envelope_expiry, 1800);
}

#[test]
fn test_emit_outcomes() {
for (serialized, deserialized) in &[
("true", EmitOutcomes::AsOutcomes),
("false", EmitOutcomes::None),
("\"as_client_reports\"", EmitOutcomes::AsClientReports),
] {
let value: EmitOutcomes = serde_json::from_str(serialized).unwrap();
assert_eq!(value, *deserialized);
assert_eq!(serde_json::to_string(&value).unwrap(), *serialized);
}
}

#[test]
fn test_emit_outcomes_invalid() {
assert!(matches!(
serde_json::from_str::<EmitOutcomes>("asdf"),
Err(_)
));
}
}
22 changes: 21 additions & 1 deletion relay-filter/src/common.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::fmt;
use std::{convert::TryFrom, fmt};

use globset::GlobBuilder;
use regex::bytes::{Regex, RegexBuilder};
Expand Down Expand Up @@ -162,6 +162,26 @@ impl fmt::Display for FilterStatKey {
}
}

impl<'a> TryFrom<&'a str> for FilterStatKey {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we have a macro for this? maybe just for serde impl

type Error = &'a str;

fn try_from(value: &'a str) -> Result<Self, Self::Error> {
Ok(match value {
"ip-address" => FilterStatKey::IpAddress,
"release-version" => FilterStatKey::ReleaseVersion,
"error-message" => FilterStatKey::ErrorMessage,
"browser-extensions" => FilterStatKey::BrowserExtensions,
"legacy-browsers" => FilterStatKey::LegacyBrowsers,
"localhost" => FilterStatKey::Localhost,
"web-crawlers" => FilterStatKey::WebCrawlers,
"invalid-csp" => FilterStatKey::InvalidCsp,
other => {
return Err(other);
}
})
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
31 changes: 29 additions & 2 deletions relay-general/src/protocol/client_report.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,22 @@ pub struct DiscardedEvent {
pub quantity: u32,
}

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
pub struct ClientReport {
/// The timestamp of when the report was created.
pub timestamp: Option<UnixTimestamp>,
/// Discard reason counters.
/// Counters for events discarded by the client.
pub discarded_events: Vec<DiscardedEvent>,
/// Counters for events rate limited by a relay configured to emit outcomes as client reports
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub rate_limited_events: Vec<DiscardedEvent>,
/// Counters for events filtered by a relay configured to emit outcomes as client reports
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub filtered_events: Vec<DiscardedEvent>,
/// Counters for events filtered by a sampling rule,
/// by a relay configured to emit outcomes as client reports
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub filtered_sampling_events: Vec<DiscardedEvent>,
}

impl ClientReport {
Expand All @@ -39,6 +49,9 @@ mod tests {
"discarded_events": [
{"reason": "foo_reason", "category": "error", "quantity": 42},
{"reason": "foo_reason", "category": "transaction", "quantity": 23}
],
"rate_limited_events" : [
{"reason": "bar_reason", "category": "session", "quantity": 456}
]
}"#;

Expand All @@ -55,6 +68,13 @@ mod tests {
"category": "transaction",
"quantity": 23
}
],
"rate_limited_events": [
{
"reason": "bar_reason",
"category": "session",
"quantity": 456
}
]
}"#;

Expand All @@ -72,6 +92,13 @@ mod tests {
quantity: 23,
},
],
rate_limited_events: vec![DiscardedEvent {
reason: "bar_reason".into(),
category: DataCategory::Session,
quantity: 456,
}],

..Default::default()
};

let parsed = ClientReport::parse(json.as_bytes()).unwrap();
Expand Down
2 changes: 1 addition & 1 deletion relay-general/src/protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub use sentry_release_parser::{validate_environment, validate_release};

pub use self::breadcrumb::Breadcrumb;
pub use self::breakdowns::Breakdowns;
pub use self::client_report::ClientReport;
pub use self::client_report::{ClientReport, DiscardedEvent};
pub use self::clientsdk::{ClientSdkInfo, ClientSdkPackage};
pub use self::constants::VALID_PLATFORMS;
pub use self::contexts::{
Expand Down
1 change: 1 addition & 0 deletions relay-metrics/benches/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ fn bench_insert_and_flush(c: &mut Criterion) {
bucket_interval: 1000,
initial_delay: 0,
debounce_delay: 0,
..Default::default()
};

let flush_receiver = TestReceiver.start().recipient();
Expand Down
2 changes: 1 addition & 1 deletion relay-quotas/src/quota.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use relay_common::{ProjectId, ProjectKey};
/// Data scoping information.
///
/// This structure holds information of all scopes required for attributing an item to quotas.
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct Scoping {
/// The organization id.
pub organization_id: u64,
Expand Down
Loading