Skip to content

Commit

Permalink
Implement UnderlyingBytes for ByteSlice so it can be borrowed as unde…
Browse files Browse the repository at this point in the history
…rlying for TinyBytes::Bytes (#764)

Update data-pipeline's send function to accept a tinybytes::Bytes for the payload type. 

Also update the data-pipeline-ffi ddog_trace_exporter_send function to transmute the incoming ByteSlice to static. The ByteSlice is expected to live for the length of the function call, but it is technically possible to borrow the underlying data for longer, leading to stale references. This will be addressed as part of APMSP-1621.
  • Loading branch information
ekump authored Dec 3, 2024
1 parent 1776e5d commit a3c1bd4
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 24 deletions.
3 changes: 1 addition & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions data-pipeline-ffi/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,4 @@ build_common = { path = "../build-common" }
[dependencies]
data-pipeline = { path = "../data-pipeline" }
ddcommon-ffi = { path = "../ddcommon-ffi", default-features = false }
bytes = "1.4"
libc = "0.2.153"
tinybytes = { path = "../tinybytes" }
16 changes: 13 additions & 3 deletions data-pipeline-ffi/src/trace_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,17 +102,27 @@ pub unsafe extern "C" fn ddog_trace_exporter_free(handle: Box<TraceExporter>) {
///
/// * `handle` - The handle to the TraceExporter instance.
/// * `trace` - The traces to send to the Datadog Agent in the input format used to create the
/// TraceExporter.
/// TraceExporter. The memory for the trace must be valid for the life of the call to this
/// function.
/// * `trace_count` - The number of traces to send to the Datadog Agent.
#[no_mangle]
pub unsafe extern "C" fn ddog_trace_exporter_send(
handle: &TraceExporter,
trace: ByteSlice,
trace_count: usize,
) -> MaybeError {
// TODO - handle errors - https://datadoghq.atlassian.net/browse/APMSP-1095
// necessary that the trace be static for the life of the FFI function call as the caller
// currently owns the memory.
//APMSP-1621 - Properly fix this sharp-edge by allocating memory on the Rust side
let static_trace: ByteSlice<'static> = std::mem::transmute(trace);

// TODO: APMSP-1095 - properly handle errors from the send call
handle
.send(trace.as_bytes(), trace_count)
.send(
tinybytes::Bytes::from_static(static_trace.as_slice()),
trace_count,
)
.unwrap_or(String::from(""));

MaybeError::None
}
4 changes: 3 additions & 1 deletion data-pipeline/examples/send-traces-with-stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ fn main() {
traces.push(trace);
}
let data = rmp_serde::to_vec_named(&traces).unwrap();
exporter.send(&data, 100).unwrap();
let data_as_bytes = tinybytes::Bytes::from(data);

exporter.send(data_as_bytes, 100).unwrap();
exporter.shutdown(None).unwrap();
}
31 changes: 15 additions & 16 deletions data-pipeline/src/trace_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,15 +253,11 @@ impl TraceExporter {

/// Send msgpack serialized traces to the agent
#[allow(missing_docs)]
pub fn send(&self, data: &[u8], trace_count: usize) -> Result<String, String> {
pub fn send(&self, data: tinybytes::Bytes, trace_count: usize) -> Result<String, String> {
self.check_agent_info();
match self.input_format {
TraceExporterInputFormat::Proxy => self.send_proxy(data, trace_count),
TraceExporterInputFormat::V04 => {
self.send_deser_ser(tinybytes::Bytes::copy_from_slice(data))
// TODO: APMSP-1582 - Refactor data-pipeline-ffi so we can leverage a type that
// implements tinybytes::UnderlyingBytes trait to avoid copying
}
TraceExporterInputFormat::Proxy => self.send_proxy(data.as_ref(), trace_count),
TraceExporterInputFormat::V04 => self.send_deser_ser(data),
}
}

Expand Down Expand Up @@ -1187,7 +1183,7 @@ mod tests {
..Default::default()
}];

let data = rmp_serde::to_vec_named(&vec![trace_chunk]).unwrap();
let data = tinybytes::Bytes::from(rmp_serde::to_vec_named(&vec![trace_chunk]).unwrap());

// Wait for the info fetcher to get the config
while mock_info.hits() == 0 {
Expand All @@ -1196,7 +1192,7 @@ mod tests {
})
}

exporter.send(data.as_slice(), 1).unwrap();
exporter.send(data, 1).unwrap();
exporter.shutdown(None).unwrap();

mock_traces.assert();
Expand Down Expand Up @@ -1314,8 +1310,10 @@ mod tests {
..Default::default()
}],
];
let bytes = rmp_serde::to_vec_named(&traces).expect("failed to serialize static trace");
let _result = exporter.send(&bytes, 1).expect("failed to send trace");
let bytes = tinybytes::Bytes::from(
rmp_serde::to_vec_named(&traces).expect("failed to serialize static trace"),
);
let _result = exporter.send(bytes, 1).expect("failed to send trace");

assert_eq!(
&format!(
Expand Down Expand Up @@ -1346,9 +1344,8 @@ mod tests {
stats_socket.local_addr().unwrap().to_string(),
);

let _result = exporter
.send(b"some_bad_payload", 1)
.expect("failed to send trace");
let bad_payload = tinybytes::Bytes::copy_from_slice(b"some_bad_payload".as_ref());
let _result = exporter.send(bad_payload, 1).expect("failed to send trace");

assert_eq!(
&format!(
Expand Down Expand Up @@ -1381,8 +1378,10 @@ mod tests {
name: BytesString::from_slice(b"test").unwrap(),
..Default::default()
}]];
let bytes = rmp_serde::to_vec_named(&traces).expect("failed to serialize static trace");
let _result = exporter.send(&bytes, 1).expect("failed to send trace");
let bytes = tinybytes::Bytes::from(
rmp_serde::to_vec_named(&traces).expect("failed to serialize static trace"),
);
let _result = exporter.send(bytes, 1).expect("failed to send trace");

assert_eq!(
&format!(
Expand Down

0 comments on commit a3c1bd4

Please sign in to comment.