Skip to content

fix(runtime): Fix runtime timeouts #5236

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions runtime/test/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,15 +102,14 @@ async fn test_valid_module_and_store_with_timeout(
};

let module = WasmInstance::from_valid_module_with_ctx(
Arc::new(ValidModule::new(&logger, data_source.mapping.runtime.as_ref()).unwrap()),
Arc::new(ValidModule::new(&logger, data_source.mapping.runtime.as_ref(), timeout).unwrap()),
mock_context(
deployment.clone(),
data_source,
store.subgraph_store(),
api_version,
),
host_metrics,
timeout,
experimental_features,
)
.unwrap();
Expand Down
1 change: 0 additions & 1 deletion runtime/wasm/src/host_exports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,6 @@ impl HostExports {
valid_module.clone(),
ctx.derive_with_empty_block_state(),
host_metrics.clone(),
wasm_ctx.timeout,
wasm_ctx.experimental_features,
)?;
let result = module.handle_json_callback(&callback, &sv.value, &user_data)?;
Expand Down
45 changes: 40 additions & 5 deletions runtime/wasm/src/mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub fn spawn_module<C: Blockchain>(
where
<C as Blockchain>::MappingTrigger: ToAscPtr,
{
let valid_module = Arc::new(ValidModule::new(&logger, raw_module)?);
let valid_module = Arc::new(ValidModule::new(&logger, raw_module, timeout)?);

// Create channel for event handling requests
let (mapping_request_sender, mapping_request_receiver) = mpsc::channel(100);
Expand Down Expand Up @@ -60,7 +60,6 @@ where
valid_module.cheap_clone(),
ctx,
host_metrics.cheap_clone(),
timeout,
experimental_features,
)
.map_err(Into::into)
Expand Down Expand Up @@ -112,7 +111,6 @@ fn instantiate_module<C: Blockchain>(
valid_module: Arc<ValidModule>,
ctx: MappingContext,
host_metrics: Arc<HostMetrics>,
timeout: Option<Duration>,
experimental_features: ExperimentalFeatures,
) -> Result<WasmInstance, anyhow::Error>
where
Expand All @@ -124,7 +122,6 @@ where
valid_module,
ctx,
host_metrics.cheap_clone(),
timeout,
experimental_features,
)
.context("module instantiation failed")
Expand Down Expand Up @@ -248,11 +245,21 @@ pub struct ValidModule {
// AS now has an `@external("module", "name")` decorator which would make things cleaner, but
// the ship has sailed.
pub import_name_to_modules: BTreeMap<String, Vec<String>>,

// The timeout for the module.
pub timeout: Option<Duration>,

// Used as a guard to terminate this task dependency.
epoch_counter_abort_handle: Option<tokio::task::AbortHandle>,
}

impl ValidModule {
/// Pre-process and validate the module.
pub fn new(logger: &Logger, raw_module: &[u8]) -> Result<Self, anyhow::Error> {
pub fn new(
logger: &Logger,
raw_module: &[u8],
timeout: Option<Duration>,
) -> Result<Self, anyhow::Error> {
// Add the gas calls here. Module name "gas" must match. See also
// e3f03e62-40e4-4f8c-b4a1-d0375cca0b76. We do this by round-tripping the module through
// parity - injecting gas then serializing again.
Expand Down Expand Up @@ -318,10 +325,38 @@ impl ValidModule {
.push(module.to_string());
}

let mut epoch_counter_abort_handle = None;
if let Some(timeout) = timeout {
let timeout = timeout.clone();
let engine = engine.clone();

// The epoch counter task will perpetually increment the epoch every `timeout` seconds.
// Timeouts on instantiated modules will trigger on epoch deltas.
// Note: The epoch is an u64 so it will never overflow.
// See also: runtime-timeouts
let epoch_counter = async move {
loop {
tokio::time::sleep(timeout).await;
engine.increment_epoch();
}
};
epoch_counter_abort_handle = Some(graph::spawn(epoch_counter).abort_handle());
}

Ok(ValidModule {
module,
import_name_to_modules,
start_function,
timeout,
epoch_counter_abort_handle,
})
}
}

impl Drop for ValidModule {
fn drop(&mut self) {
if let Some(handle) = self.epoch_counter_abort_handle.take() {
handle.abort();
}
}
}
27 changes: 13 additions & 14 deletions runtime/wasm/src/module/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use graph::runtime::{asc_new, gas::GasCounter, DeterministicHostError, HostExpor

use super::asc_get;
use super::AscHeapCtx;
use super::TimeoutStopwatch;

pub(crate) struct WasmInstanceContext<'a> {
inner: StoreContextMut<'a, WasmInstanceData>,
Expand Down Expand Up @@ -56,6 +55,16 @@ impl WasmInstanceContext<'_> {
pub fn asc_heap_mut(&mut self) -> &mut AscHeapCtx {
self.as_mut().asc_heap_mut()
}

pub fn suspend_timeout(&mut self) {
// See also: runtime-timeouts
self.inner.set_epoch_deadline(u64::MAX);
}

pub fn start_timeout(&mut self) {
// See also: runtime-timeouts
self.inner.set_epoch_deadline(2);
}
}

impl AsContext for WasmInstanceContext<'_> {
Expand All @@ -76,10 +85,6 @@ pub struct WasmInstanceData {
pub ctx: MappingContext,
pub valid_module: Arc<ValidModule>,
pub host_metrics: Arc<HostMetrics>,
pub(crate) timeout: Option<Duration>,

// Used by ipfs.map.
pub(crate) timeout_stopwatch: Arc<std::sync::Mutex<TimeoutStopwatch>>,

// A trap ocurred due to a possible reorg detection.
pub possible_reorg: bool,
Expand All @@ -99,17 +104,13 @@ impl WasmInstanceData {
ctx: MappingContext,
valid_module: Arc<ValidModule>,
host_metrics: Arc<HostMetrics>,
timeout: Option<Duration>,
timeout_stopwatch: Arc<std::sync::Mutex<TimeoutStopwatch>>,
experimental_features: ExperimentalFeatures,
) -> Self {
WasmInstanceData {
asc_heap: None,
ctx,
valid_module,
host_metrics,
timeout,
timeout_stopwatch,
possible_reorg: false,
deterministic_host_trap: false,
experimental_features,
Expand Down Expand Up @@ -583,11 +584,8 @@ impl WasmInstanceContext<'_> {

let flags = asc_get(self, flags, gas)?;

// Pause the timeout while running ipfs_map, ensure it will be restarted by using a guard.
self.as_ref().timeout_stopwatch.lock().unwrap().stop();
let defer_stopwatch = self.as_ref().timeout_stopwatch.clone();
let _stopwatch_guard = defer::defer(|| defer_stopwatch.lock().unwrap().start());

// Pause the timeout while running ipfs_map, and resume it when done.
self.suspend_timeout();
let start_time = Instant::now();
let output_states = HostExports::ipfs_map(
&self.as_ref().ctx.host_exports.link_resolver.cheap_clone(),
Expand All @@ -597,6 +595,7 @@ impl WasmInstanceContext<'_> {
user_data,
flags,
)?;
self.start_timeout();

debug!(
&self.as_ref().ctx.logger,
Expand Down
32 changes: 12 additions & 20 deletions runtime/wasm/src/module/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use super::{IntoTrap, WasmInstanceContext};
use crate::error::DeterminismLevel;
use crate::mapping::MappingContext;
use crate::mapping::ValidModule;
use crate::module::{TimeoutStopwatch, WasmInstanceData};
use crate::module::WasmInstanceData;
use crate::ExperimentalFeatures;

use super::{is_trap_deterministic, AscHeapCtx, ToAscPtr};
Expand Down Expand Up @@ -191,6 +191,7 @@ impl WasmInstance {

// Treat timeouts anywhere in the error chain as a special case to have a better error
// message. Any `TrapCode::Interrupt` is assumed to be a timeout.
// See also: runtime-timeouts
Err(trap)
if trap
.chain()
Expand All @@ -200,7 +201,7 @@ impl WasmInstance {
return Err(MappingError::Unknown(Error::from(trap).context(format!(
"Handler '{}' hit the timeout of '{}' seconds",
handler,
self.instance_ctx().as_ref().timeout.unwrap().as_secs()
self.instance_ctx().as_ref().valid_module.timeout.unwrap().as_secs()
))));
}
Err(trap) => {
Expand Down Expand Up @@ -263,39 +264,30 @@ impl WasmInstance {
valid_module: Arc<ValidModule>,
ctx: MappingContext,
host_metrics: Arc<HostMetrics>,
timeout: Option<Duration>,
experimental_features: ExperimentalFeatures,
) -> Result<WasmInstance, anyhow::Error> {
let engine = valid_module.module.engine();
let mut linker: Linker<WasmInstanceData> = wasmtime::Linker::new(engine);
let host_fns = ctx.host_fns.cheap_clone();
let api_version = ctx.host_exports.data_source.api_version.clone();

// // Start the timeout watchdog task.
let timeout_stopwatch = Arc::new(std::sync::Mutex::new(TimeoutStopwatch::start_new()));

let wasm_ctx = WasmInstanceData::from_instance(
ctx,
valid_module.cheap_clone(),
host_metrics.cheap_clone(),
timeout,
timeout_stopwatch.clone(),
experimental_features,
);
let mut store = Store::new(engine, wasm_ctx);

// The epoch on the engine will only ever be incremeted if increment_epoch() is explicitly
// called, we only do so if a timeout has been set, otherwise 1 means it will run forever.
// If a timeout is provided then epoch 1 should happen roughly once the timeout duration
// has elapsed.
store.set_epoch_deadline(1);
if let Some(timeout) = timeout {
let timeout = timeout.clone();
let engine = engine.clone();
graph::spawn(async move {
tokio::time::sleep(timeout).await;
engine.increment_epoch();
});
}
// called, we only do so if a timeout has been set, it will run forever. When a timeout is
// set, the timeout duration is used as the duration of one epoch.
//
// Therefore, the setting of 2 here means that if a `timeout` is provided, then this
// interrupt will be triggered between a duration of `timeout` and `timeout * 2`.
//
// See also: runtime-timeouts
store.set_epoch_deadline(2);

// Because `gas` and `deterministic_host_trap` need to be accessed from the gas
// host fn, they need to be separate from the rest of the context.
Expand Down
2 changes: 0 additions & 2 deletions runtime/wasm/src/module/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use graph::runtime::{
IndexForAscTypeId,
};
pub use into_wasm_ret::IntoWasmRet;
pub use stopwatch::TimeoutStopwatch;

use crate::error::DeterminismLevel;
use crate::gas_rules::{GAS_COST_LOAD, GAS_COST_STORE};
Expand All @@ -31,7 +30,6 @@ pub use instance::*;
mod context;
mod instance;
mod into_wasm_ret;
pub mod stopwatch;

// Convenience for a 'top-level' asc_get, with depth 0.
fn asc_get<T, C: AscType, H: AscHeap + ?Sized>(
Expand Down
58 changes: 0 additions & 58 deletions runtime/wasm/src/module/stopwatch.rs

This file was deleted.