Skip to content

Commit 18d71d6

Browse files
committed
Implemented callstack capture task
Currently the task in htt2p.rs only reports that capturing the callstack is not supported on any operating system. But now with the asynchronously executed capture task it will be possible to actually implement the capturing.
1 parent 77759c8 commit 18d71d6

File tree

11 files changed

+274
-63
lines changed

11 files changed

+274
-63
lines changed

src/connection.rs

+68
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1+
use http::{HeaderMap, HeaderValue};
12
use snafu::{ResultExt, Snafu};
3+
use std::convert::TryFrom;
24
use std::net::SocketAddr;
35
use std::sync::mpsc::Sender;
46
use std::sync::Arc;
@@ -124,6 +126,58 @@ impl<TClient, TServer> Streams<TClient, TServer>
124126
}
125127
}
126128

129+
/// When available, identifies the thread in the calling or client process.
130+
/// The client should reports its process id with the proxide-client-process-id" header and
131+
/// the thread id with the "proxide-client-thread-id" header.
132+
/// This enables the proxide proxy to capture client's callstack when it is making the call if the proxide
133+
/// and the client are running on the same host.
134+
pub struct ClientThreadId
135+
{
136+
process_id: u32,
137+
thread_id: i64,
138+
}
139+
140+
impl ClientThreadId
141+
{
142+
pub fn process_id(&self) -> u32
143+
{
144+
self.process_id
145+
}
146+
147+
pub fn thread_id(&self) -> i64
148+
{
149+
self.thread_id
150+
}
151+
}
152+
153+
impl TryFrom<&MessageData> for ClientThreadId
154+
{
155+
type Error = ();
156+
157+
fn try_from(value: &MessageData) -> std::result::Result<Self, Self::Error>
158+
{
159+
ClientThreadId::try_from(&value.headers)
160+
}
161+
}
162+
163+
impl TryFrom<&HeaderMap> for ClientThreadId
164+
{
165+
type Error = ();
166+
167+
fn try_from(value: &HeaderMap) -> std::result::Result<Self, Self::Error>
168+
{
169+
let process_id: Option<u32> = number_or_none(&value.get("proxide-client-process-id"));
170+
let thread_id: Option<i64> = number_or_none(&value.get("proxide-client-thread-id"));
171+
match (process_id, thread_id) {
172+
(Some(process_id), Some(thread_id)) => Ok(ClientThreadId {
173+
process_id,
174+
thread_id,
175+
}),
176+
_ => Err(()),
177+
}
178+
}
179+
}
180+
127181
/// Handles a single client connection.
128182
///
129183
/// The connection handling is split into multiple functions, but the functions are chained in a
@@ -311,3 +365,17 @@ where
311365
log::info!("Exit");
312366
});
313367
}
368+
369+
fn number_or_none<N>(header: &Option<&HeaderValue>) -> Option<N>
370+
where
371+
N: std::str::FromStr,
372+
{
373+
if let Some(value) = header {
374+
value
375+
.to_str()
376+
.map(|s| N::from_str(s).map(|n| Some(n)).unwrap_or(None))
377+
.unwrap_or(None)
378+
} else {
379+
None
380+
}
381+
}

src/connection/http2.rs

+82
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,14 @@ use h2::{
88
use http::{HeaderMap, Request, Response};
99
use log::error;
1010
use snafu::ResultExt;
11+
use std::convert::TryFrom;
1112
use std::net::SocketAddr;
13+
use std::pin::Pin;
1214
use std::sync::mpsc::Sender;
15+
use std::task::{Context, Poll};
1316
use std::time::SystemTime;
1417
use tokio::io::{AsyncRead, AsyncWrite};
18+
use tokio::task::{JoinHandle, JoinSet};
1519
use uuid::Uuid;
1620

1721
use super::*;
@@ -141,6 +145,12 @@ pub struct ProxyRequest
141145
client_response: SendResponse<Bytes>,
142146
server_request: SendStream<Bytes>,
143147
server_response: ResponseFuture,
148+
request_processor: ProcessingFuture,
149+
}
150+
151+
struct ProcessingFuture
152+
{
153+
inner: JoinHandle<()>,
144154
}
145155

146156
impl ProxyRequest
@@ -191,6 +201,10 @@ impl ProxyRequest
191201
}))
192202
.unwrap();
193203

204+
// Request processor supports asynchronous message processing while the proxide is busy proxying data between
205+
// the client and the server.
206+
let request_processor = ProcessingFuture::spawn(uuid, &client_head, ui);
207+
194208
let server_request = Request::from_parts(client_head, ());
195209

196210
// Set up a server request.
@@ -208,6 +222,7 @@ impl ProxyRequest
208222
client_response,
209223
server_request,
210224
server_response,
225+
request_processor,
211226
})
212227
}
213228

@@ -265,6 +280,7 @@ impl ProxyRequest
265280
let mut client_response = self.client_response;
266281
let server_response = self.server_response;
267282
let connection_uuid = self.connection_uuid;
283+
let request_processor = self.request_processor;
268284
let ui_temp = ui.clone();
269285
let response_future = async move {
270286
let ui = ui_temp;
@@ -293,6 +309,11 @@ impl ProxyRequest
293309
scenario: "sending response",
294310
})?;
295311

312+
// Ensure the request processor has finished before we send the response to the client.
313+
// Callstack capturing process inside the request processor may capture incorrect data if
314+
// the client is given the final answer from the server as it no longer has to wait for the response.
315+
request_processor.await;
316+
296317
// The server might have sent all the details in the headers, at which point there is
297318
// no body present. Check for this scenario here.
298319
if response_body.is_end_stream() {
@@ -440,3 +461,64 @@ fn is_fatal_error<S>(r: &Result<S, Error>) -> bool
440461
},
441462
}
442463
}
464+
465+
impl ProcessingFuture
466+
{
467+
fn spawn(uuid: Uuid, client_head: &http::request::Parts, ui: &Sender<SessionEvent>) -> Self
468+
{
469+
let mut tasks: JoinSet<std::result::Result<(), Box<dyn std::error::Error + Send + Sync>>> =
470+
JoinSet::new();
471+
472+
// Task which attempts to capture client's callstack.
473+
if let Ok(thread_id) = crate::connection::ClientThreadId::try_from(&client_head.headers) {
474+
let ui_clone = ui.clone();
475+
tasks.spawn(ProcessingFuture::capture_client_callstack(
476+
uuid, thread_id, ui_clone,
477+
));
478+
}
479+
480+
Self {
481+
inner: tokio::spawn(async move {
482+
while let Some(result) = tasks.join_next().await {
483+
match result {
484+
Ok(_) => {}
485+
Err(e) => {
486+
// TODO: Send the error to UI.
487+
eprintln!("{}", e);
488+
error!("{}", e);
489+
}
490+
}
491+
}
492+
}),
493+
}
494+
}
495+
496+
async fn capture_client_callstack(
497+
uuid: Uuid,
498+
_client_thread_id: ClientThreadId,
499+
ui: Sender<SessionEvent>,
500+
) -> std::result::Result<(), Box<dyn std::error::Error + Send + Sync>>
501+
{
502+
// TODO: Try to capture the callstack
503+
ui.send(SessionEvent::ClientCallstackProcessed(
504+
ClientCallstackProcessedEvent {
505+
uuid,
506+
callstack: ClientCallstack::Unsupported,
507+
},
508+
))?;
509+
Ok(())
510+
}
511+
}
512+
513+
impl Future for ProcessingFuture
514+
{
515+
type Output = ();
516+
517+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>
518+
{
519+
match Pin::new(&mut self.inner).poll(cx) {
520+
Poll::Ready(_) => Poll::Ready(()),
521+
Poll::Pending => Poll::Pending,
522+
}
523+
}
524+
}

src/main.rs

+59
Original file line numberDiff line numberDiff line change
@@ -419,13 +419,15 @@ mod test
419419
use log::SetLoggerError;
420420
use serial_test::serial;
421421
use std::io::{ErrorKind, Write};
422+
use std::ops::Add;
422423
use std::str::FromStr;
423424
use std::sync::{Arc, Mutex};
424425
use std::time::Duration;
425426
use tokio::sync::broadcast::error::TryRecvError;
426427
use tokio::sync::broadcast::Receiver;
427428
use tokio::sync::mpsc::UnboundedReceiver;
428429
use tokio::sync::oneshot;
430+
use tokio::time::Instant;
429431

430432
use crate::session::events::SessionEvent;
431433
use crate::ConnectionOptions;
@@ -533,6 +535,63 @@ mod test
533535
.expect("Waiting for proxide to stop failed.");
534536
}
535537

538+
#[tokio::test]
539+
#[serial]
540+
async fn proxide_receives_client_callstack_ui_message()
541+
{
542+
// Logging must be enabled to detect errors inside proxide.
543+
// Failure to monitor logs may cause the test to hang as errors that stop processing get silently ignored.
544+
let mut error_monitor = get_error_monitor().expect("Acquiring error monitor failed.");
545+
546+
// Server
547+
let server = GrpcServer::start()
548+
.await
549+
.expect("Starting test server failed.");
550+
551+
// Proxide
552+
let options = get_proxide_options(&server);
553+
let (abort_tx, abort_rx) = tokio::sync::oneshot::channel::<()>();
554+
let (ui_tx, ui_rx_std) = std::sync::mpsc::channel();
555+
let proxide_port = u16::from_str(&options.listen_port.to_string()).unwrap();
556+
let proxide = tokio::spawn(crate::launch_proxide(options, abort_rx, ui_tx));
557+
558+
// Message generator and tester.
559+
let tester = grpc_tester::GrpcTester::with_proxide(
560+
server,
561+
proxide_port,
562+
grpc_tester::Args {
563+
period: std::time::Duration::from_secs(0),
564+
tasks: 1,
565+
},
566+
)
567+
.await
568+
.expect("Starting tester failed.");
569+
let mut message_rx = async_from_sync(ui_rx_std);
570+
571+
// UI channel should be constantly receiving client callstack events.
572+
// The generator includes the process id and the thread id in the messages it sends.
573+
let timeout_at = Instant::now().add(Duration::from_secs(30));
574+
while let Some(message) = tokio::select! {
575+
result = message_rx.recv() => result,
576+
_t = tokio::time::sleep( Duration::from_secs( 30 ) ) => panic!( "Timeout" ),
577+
error = error_monitor.recv() => panic!( "{:?}", error ),
578+
} {
579+
if let SessionEvent::ClientCallstackProcessed(..) = message {
580+
break;
581+
} else if Instant::now() > timeout_at {
582+
panic!( "Timeout" )
583+
}
584+
}
585+
586+
let mut server = tester.stop_generator().expect("Stopping generator failed.");
587+
abort_tx.send(()).expect("Stopping proxide failed.");
588+
proxide
589+
.await
590+
.expect("Waiting for proxide to stop failed.")
591+
.expect("Waiting for proxide to stop failed.");
592+
server.stop().expect("Stopping server failed");
593+
}
594+
536595
/// Gets options for launching proxide.
537596
fn get_proxide_options(server: &GrpcServer) -> Arc<ConnectionOptions>
538597
{

src/session.rs

+8
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ pub struct RequestData
5656

5757
pub start_timestamp: DateTime<Local>,
5858
pub end_timestamp: Option<DateTime<Local>>,
59+
pub client_callstack: Option<ClientCallstack>,
5960
pub status: Status,
6061
}
6162

@@ -126,6 +127,13 @@ impl MessageData
126127
}
127128
}
128129

130+
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
131+
pub enum ClientCallstack
132+
{
133+
/// Proxide does not support callstack capture on the current platform/operating system.
134+
Unsupported,
135+
}
136+
129137
impl<T> IndexedVec<T>
130138
{
131139
pub fn push(&mut self, uuid: Uuid, item: T)

src/session/events.rs

+28
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ pub enum SessionEvent
1414
MessageDone(MessageDoneEvent),
1515
RequestDone(RequestDoneEvent),
1616
ConnectionDone(ConnectionDoneEvent),
17+
ClientCallstackProcessed(ClientCallstackProcessedEvent),
1718
}
1819

1920
#[derive(Serialize, Deserialize, Debug)]
@@ -84,6 +85,13 @@ pub struct ConnectionDoneEvent
8485
pub timestamp: SystemTime,
8586
}
8687

88+
#[derive(Serialize, Deserialize, Debug)]
89+
pub struct ClientCallstackProcessedEvent
90+
{
91+
pub uuid: Uuid,
92+
pub callstack: ClientCallstack,
93+
}
94+
8795
pub enum SessionChange
8896
{
8997
NewConnection
@@ -110,6 +118,10 @@ pub enum SessionChange
110118
{
111119
connection: Uuid
112120
},
121+
Callstack
122+
{
123+
request: Uuid
124+
},
113125
}
114126

115127
impl Session
@@ -124,6 +136,7 @@ impl Session
124136
SessionEvent::MessageDone(e) => self.on_message_done(e),
125137
SessionEvent::RequestDone(e) => self.on_request_done(e),
126138
SessionEvent::ConnectionDone(e) => self.on_connection_done(e),
139+
SessionEvent::ClientCallstackProcessed(e) => self.on_client_callstack_processed(e),
127140
}
128141
}
129142

@@ -154,6 +167,7 @@ impl Session
154167
status: Status::InProgress,
155168
start_timestamp: e.timestamp.into(),
156169
end_timestamp: None,
170+
client_callstack: None,
157171
},
158172
request_msg: MessageData::new(RequestPart::Request)
159173
.with_headers(e.headers)
@@ -247,4 +261,18 @@ impl Session
247261
vec![]
248262
}
249263
}
264+
265+
fn on_client_callstack_processed(
266+
&mut self,
267+
e: ClientCallstackProcessedEvent,
268+
) -> Vec<SessionChange>
269+
{
270+
let request = self.requests.get_mut_by_uuid(e.uuid);
271+
if let Some(request) = request {
272+
request.request_data.client_callstack = Some(e.callstack);
273+
vec![SessionChange::Callstack { request: e.uuid }]
274+
} else {
275+
vec![]
276+
}
277+
}
250278
}

src/ui.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ pub fn main(
9292
state.draw(&mut terminal).context(IoError {})?;
9393
let mut redraw_pending = false;
9494
loop {
95-
let e = ui_rx.recv().expect( "Receiving UI events failed.");
95+
let e = ui_rx.recv().expect("Receiving UI events failed.");
9696
if let UiEvent::Redraw = e {
9797
redraw_pending = false;
9898
state.draw(&mut terminal).context(IoError {})?;

0 commit comments

Comments
 (0)