Skip to content

Commit a86eb68

Browse files
authored
Merge pull request #181 from greatest-ape/work-2023-01-29-b
udp: replace PanicSentinel with loop over JoinHandles
2 parents 1807c4a + 60c5a9c commit a86eb68

File tree

10 files changed

+174
-101
lines changed

10 files changed

+174
-101
lines changed

CHANGELOG.md

+4
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@
3535
* Speed up parsing and serialization of requests and responses by using
3636
[zerocopy](https://crates.io/crates/zerocopy)
3737

38+
#### Fixed
39+
40+
* Quit whole application if any worker thread quits
41+
3842
### aquatic_http
3943

4044
#### Added

Cargo.lock

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

TODO.md

+4-10
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
## High priority
44

5-
* general
5+
* http and ws
66
* add task to generate prometheus exports on regular interval to clean up
77
data. this is important if peer_clients is activated
88

@@ -12,7 +12,9 @@
1212
In that case, a shorter duration (e.g., 30 seconds) would be a good idea.
1313

1414
* general
15-
* panic sentinel not working? at least seemingly not in http?
15+
* Replace panic sentinel with checking threads like in udp implementation.
16+
It seems to be broken
17+
1618

1719
## Medium priority
1820

@@ -25,14 +27,6 @@
2527
* toml v0.7
2628
* syn v2.0
2729

28-
* quit whole program if any thread panics
29-
* But it would be nice not to panic in workers, but to return errors instead.
30-
Once JoinHandle::is_finished is available in stable Rust (#90470), an
31-
option would be to
32-
* Save JoinHandles
33-
* When preparing to quit because of PanicSentinel sending SIGTERM, loop
34-
through them, extract error and log it
35-
3630
* Run cargo-deny in CI
3731

3832
* aquatic_ws

crates/udp/Cargo.toml

+2-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ name = "aquatic_udp"
2020
[features]
2121
default = ["prometheus"]
2222
# Export prometheus metrics
23-
prometheus = ["metrics", "metrics-util", "metrics-exporter-prometheus"]
23+
prometheus = ["metrics", "metrics-util", "metrics-exporter-prometheus", "tokio"]
2424
# Experimental io_uring support (Linux 6.0 or later required)
2525
io-uring = ["dep:io-uring"]
2626
# Experimental CPU pinning support
@@ -59,6 +59,7 @@ tinytemplate = "1"
5959
metrics = { version = "0.22", optional = true }
6060
metrics-util = { version = "0.16", optional = true }
6161
metrics-exporter-prometheus = { version = "0.13", optional = true, default-features = false, features = ["http-listener"] }
62+
tokio = { version = "1", optional = true, features = ["rt", "net", "time"] }
6263

6364
# io-uring feature
6465
io-uring = { version = "0.6", optional = true }

crates/udp/src/lib.rs

+139-52
Original file line numberDiff line numberDiff line change
@@ -3,19 +3,20 @@ pub mod config;
33
pub mod workers;
44

55
use std::collections::BTreeMap;
6-
use std::thread::Builder;
6+
use std::fmt::Display;
7+
use std::thread::{sleep, Builder, JoinHandle};
78
use std::time::Duration;
89

910
use anyhow::Context;
1011
use crossbeam_channel::{bounded, unbounded};
11-
use signal_hook::consts::{SIGTERM, SIGUSR1};
12+
use signal_hook::consts::SIGUSR1;
1213
use signal_hook::iterator::Signals;
1314

1415
use aquatic_common::access_list::update_access_list;
1516
#[cfg(feature = "cpu-pinning")]
1617
use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex};
1718
use aquatic_common::privileges::PrivilegeDropper;
18-
use aquatic_common::{PanicSentinelWatcher, ServerStartInstant};
19+
use aquatic_common::ServerStartInstant;
1920

2021
use common::{
2122
ConnectedRequestSender, ConnectedResponseSender, SocketWorkerIndex, State, SwarmWorkerIndex,
@@ -28,12 +29,12 @@ pub const APP_NAME: &str = "aquatic_udp: UDP BitTorrent tracker";
2829
pub const APP_VERSION: &str = env!("CARGO_PKG_VERSION");
2930

3031
pub fn run(config: Config) -> ::anyhow::Result<()> {
31-
let mut signals = Signals::new([SIGUSR1, SIGTERM])?;
32+
let mut signals = Signals::new([SIGUSR1])?;
3233

3334
let state = State::new(config.swarm_workers);
3435
let connection_validator = ConnectionValidator::new(&config)?;
35-
let (sentinel_watcher, sentinel) = PanicSentinelWatcher::create_with_sentinel();
3636
let priv_dropper = PrivilegeDropper::new(config.privileges.clone(), config.socket_workers);
37+
let mut join_handles = Vec::new();
3738

3839
update_access_list(&config.access_list, &state.access_list)?;
3940

@@ -62,14 +63,13 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
6263
}
6364

6465
for i in 0..config.swarm_workers {
65-
let sentinel = sentinel.clone();
6666
let config = config.clone();
6767
let state = state.clone();
6868
let request_receiver = request_receivers.remove(&i).unwrap().clone();
6969
let response_sender = ConnectedResponseSender::new(response_senders.clone());
7070
let statistics_sender = statistics_sender.clone();
7171

72-
Builder::new()
72+
let handle = Builder::new()
7373
.name(format!("swarm-{:02}", i + 1))
7474
.spawn(move || {
7575
#[cfg(feature = "cpu-pinning")]
@@ -81,7 +81,6 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
8181
);
8282

8383
let mut worker = SwarmWorker {
84-
_sentinel: sentinel,
8584
config,
8685
state,
8786
server_start_instant,
@@ -91,13 +90,14 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
9190
worker_index: SwarmWorkerIndex(i),
9291
};
9392

94-
worker.run();
93+
worker.run()
9594
})
9695
.with_context(|| "spawn swarm worker")?;
96+
97+
join_handles.push((WorkerType::Swarm(i), handle));
9798
}
9899

99100
for i in 0..config.socket_workers {
100-
let sentinel = sentinel.clone();
101101
let state = state.clone();
102102
let config = config.clone();
103103
let connection_validator = connection_validator.clone();
@@ -106,7 +106,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
106106
let response_receiver = response_receivers.remove(&i).unwrap();
107107
let priv_dropper = priv_dropper.clone();
108108

109-
Builder::new()
109+
let handle = Builder::new()
110110
.name(format!("socket-{:02}", i + 1))
111111
.spawn(move || {
112112
#[cfg(feature = "cpu-pinning")]
@@ -118,46 +118,48 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
118118
);
119119

120120
workers::socket::run_socket_worker(
121-
sentinel,
122121
state,
123122
config,
124123
connection_validator,
125124
server_start_instant,
126125
request_sender,
127126
response_receiver,
128127
priv_dropper,
129-
);
128+
)
130129
})
131130
.with_context(|| "spawn socket worker")?;
131+
132+
join_handles.push((WorkerType::Socket(i), handle));
132133
}
133134

134135
if config.statistics.active() {
135-
let sentinel = sentinel.clone();
136136
let state = state.clone();
137137
let config = config.clone();
138138

139-
#[cfg(feature = "prometheus")]
140-
if config.statistics.run_prometheus_endpoint {
141-
use metrics_exporter_prometheus::PrometheusBuilder;
142-
use metrics_util::MetricKindMask;
139+
let handle = Builder::new()
140+
.name("statistics".into())
141+
.spawn(move || {
142+
#[cfg(feature = "cpu-pinning")]
143+
pin_current_if_configured_to(
144+
&config.cpu_pinning,
145+
config.socket_workers,
146+
config.swarm_workers,
147+
WorkerIndex::Util,
148+
);
143149

144-
PrometheusBuilder::new()
145-
.idle_timeout(
146-
MetricKindMask::ALL,
147-
Some(Duration::from_secs(config.statistics.interval * 2)),
148-
)
149-
.with_http_listener(config.statistics.prometheus_endpoint_address)
150-
.install()
151-
.with_context(|| {
152-
format!(
153-
"Install prometheus endpoint on {}",
154-
config.statistics.prometheus_endpoint_address
155-
)
156-
})?;
157-
}
150+
workers::statistics::run_statistics_worker(config, state, statistics_receiver)
151+
})
152+
.with_context(|| "spawn statistics worker")?;
158153

159-
Builder::new()
160-
.name("statistics".into())
154+
join_handles.push((WorkerType::Statistics, handle));
155+
}
156+
157+
#[cfg(feature = "prometheus")]
158+
if config.statistics.active() && config.statistics.run_prometheus_endpoint {
159+
let config = config.clone();
160+
161+
let handle = Builder::new()
162+
.name("prometheus".into())
161163
.spawn(move || {
162164
#[cfg(feature = "cpu-pinning")]
163165
pin_current_if_configured_to(
@@ -167,14 +169,73 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
167169
WorkerIndex::Util,
168170
);
169171

170-
workers::statistics::run_statistics_worker(
171-
sentinel,
172-
config,
173-
state,
174-
statistics_receiver,
172+
use metrics_exporter_prometheus::PrometheusBuilder;
173+
use metrics_util::MetricKindMask;
174+
175+
let rt = ::tokio::runtime::Builder::new_current_thread()
176+
.enable_all()
177+
.build()
178+
.context("build prometheus tokio runtime")?;
179+
180+
rt.block_on(async {
181+
let (recorder, exporter) = PrometheusBuilder::new()
182+
.idle_timeout(
183+
MetricKindMask::ALL,
184+
Some(Duration::from_secs(config.statistics.interval * 2)),
185+
)
186+
.with_http_listener(config.statistics.prometheus_endpoint_address)
187+
.build()
188+
.context("build prometheus recorder and exporter")?;
189+
190+
::tokio::spawn(async move {
191+
let mut interval = ::tokio::time::interval(Duration::from_secs(5));
192+
193+
loop {
194+
interval.tick().await;
195+
196+
// Periodically render metrics to make sure
197+
// idles are cleaned up
198+
recorder.handle().render();
199+
}
200+
});
201+
202+
exporter.await.context("run prometheus exporter")
203+
})
204+
})
205+
.with_context(|| "spawn prometheus exporter worker")?;
206+
207+
join_handles.push((WorkerType::Prometheus, handle));
208+
}
209+
210+
// Spawn signal handler thread
211+
{
212+
let config = config.clone();
213+
214+
let handle: JoinHandle<anyhow::Result<()>> = Builder::new()
215+
.name("signals".into())
216+
.spawn(move || {
217+
#[cfg(feature = "cpu-pinning")]
218+
pin_current_if_configured_to(
219+
&config.cpu_pinning,
220+
config.socket_workers,
221+
config.swarm_workers,
222+
WorkerIndex::Util,
175223
);
224+
225+
for signal in &mut signals {
226+
match signal {
227+
SIGUSR1 => {
228+
let _ = update_access_list(&config.access_list, &state.access_list);
229+
}
230+
_ => unreachable!(),
231+
}
232+
}
233+
234+
Ok(())
176235
})
177-
.with_context(|| "spawn statistics worker")?;
236+
.context("spawn signal worker")?;
237+
238+
join_handles.push((WorkerType::Signals, handle));
178239
}
179240

180241
#[cfg(feature = "cpu-pinning")]
@@ -185,21 +246,47 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
185246
WorkerIndex::Util,
186247
);
187248

188-
for signal in &mut signals {
189-
match signal {
190-
SIGUSR1 => {
191-
let _ = update_access_list(&config.access_list, &state.access_list);
192-
}
193-
SIGTERM => {
194-
if sentinel_watcher.panic_was_triggered() {
195-
return Err(anyhow::anyhow!("worker thread panicked"));
249+
loop {
250+
for (i, (_, handle)) in join_handles.iter().enumerate() {
251+
if handle.is_finished() {
252+
let (worker_type, handle) = join_handles.remove(i);
253+
254+
match handle.join() {
255+
Ok(Ok(())) => {
256+
return Err(anyhow::anyhow!("{} stopped", worker_type));
257+
}
258+
Ok(Err(err)) => {
259+
return Err(err.context(format!("{} stopped", worker_type)));
260+
}
261+
Err(_) => {
262+
return Err(anyhow::anyhow!("{} panicked", worker_type));
263+
}
196264
}
197-
198-
break;
199265
}
200-
_ => unreachable!(),
201266
}
267+
268+
sleep(Duration::from_secs(5));
202269
}
270+
}
271+
272+
enum WorkerType {
273+
Swarm(usize),
274+
Socket(usize),
275+
Statistics,
276+
Signals,
277+
#[cfg(feature = "prometheus")]
278+
Prometheus,
279+
}
203280

204-
Ok(())
281+
impl Display for WorkerType {
282+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
283+
match self {
284+
Self::Swarm(index) => f.write_fmt(format_args!("Swarm worker {}", index + 1)),
285+
Self::Socket(index) => f.write_fmt(format_args!("Socket worker {}", index + 1)),
286+
Self::Statistics => f.write_str("Statistics worker"),
287+
Self::Signals => f.write_str("Signals worker"),
288+
#[cfg(feature = "prometheus")]
289+
Self::Prometheus => f.write_str("Prometheus worker"),
290+
}
291+
}
205292
}

0 commit comments

Comments
 (0)