Skip to content

Commit

Permalink
[release/candidate/v0.9.2]: Apply open #1274: `execution_set.check_au…
Browse files Browse the repository at this point in the history
…th(...)` on initial subscription
  • Loading branch information
Zeke Foppa committed May 23, 2024
1 parent c86ec40 commit e4f1021
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 105 deletions.
14 changes: 12 additions & 2 deletions crates/core/src/db/relational_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -516,11 +516,12 @@ impl RelationalDB {
.collect()
}

pub fn create_table_for_test(
pub fn create_table_for_test_with_access(
&self,
name: &str,
schema: &[(&str, AlgebraicType)],
indexes: &[(ColId, &str)],
access: StAccess,
) -> Result<TableId, DBError> {
let indexes = indexes
.iter()
Expand All @@ -531,11 +532,20 @@ impl RelationalDB {
let schema = TableDef::new(name.into(), Self::col_def_for_test(schema))
.with_indexes(indexes)
.with_type(StTableType::User)
.with_access(StAccess::Public);
.with_access(access);

self.with_auto_commit(&ExecutionContext::default(), |tx| self.create_table(tx, schema))
}

pub fn create_table_for_test(
&self,
name: &str,
schema: &[(&str, AlgebraicType)],
indexes: &[(ColId, &str)],
) -> Result<TableId, DBError> {
self.create_table_for_test_with_access(name, schema, indexes, StAccess::Public)
}

pub fn create_table_for_test_multi_column(
&self,
name: &str,
Expand Down
11 changes: 9 additions & 2 deletions crates/core/src/subscription/execution_unit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@ use crate::json::client_api::TableUpdateJson;
use crate::util::slow::SlowQueryLogger;
use crate::vm::{build_query, TxMode};
use spacetimedb_client_api_messages::client_api::{TableRowOperation, TableUpdate};
use spacetimedb_lib::ProductValue;
use spacetimedb_lib::{Identity, ProductValue};
use spacetimedb_primitives::TableId;
use spacetimedb_sats::db::error::AuthError;
use spacetimedb_sats::relation::DbTable;
use spacetimedb_vm::eval::IterRows;
use spacetimedb_vm::expr::{NoInMemUsed, Query, QueryExpr, SourceExpr, SourceId};
use spacetimedb_vm::expr::{AuthAccess, NoInMemUsed, Query, QueryExpr, SourceExpr, SourceId};
use spacetimedb_vm::rel_ops::RelOps;
use spacetimedb_vm::relation::RelValue;
use std::borrow::Cow;
Expand Down Expand Up @@ -343,3 +344,9 @@ impl ExecutionUnit {
Ok(())
}
}

impl AuthAccess for ExecutionUnit {
fn check_auth(&self, owner: Identity, caller: Identity) -> Result<(), AuthError> {
self.eval_plan.check_auth(owner, caller)
}
}
90 changes: 71 additions & 19 deletions crates/core/src/subscription/module_subscription_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ use crate::worker_metrics::WORKER_METRICS;
use parking_lot::RwLock;
use spacetimedb_lib::identity::AuthCtx;
use spacetimedb_lib::Identity;
use spacetimedb_vm::errors::ErrorVm;
use spacetimedb_vm::expr::AuthAccess;
use std::{sync::Arc, time::Instant};

type Subscriptions = Arc<RwLock<SubscriptionManager>>;
Expand Down Expand Up @@ -83,19 +85,24 @@ impl ModuleSubscriptions {
queries.push(unit);
} else {
let mut compiled = compile_read_only_query(&self.relational_db, &tx, sql)?;

if compiled.len() > 1 {
return Result::Err(
SubscriptionError::Unsupported(String::from("Multiple statements in subscription query"))
.into(),
);
}

queries.push(Arc::new(ExecutionUnit::new(compiled.remove(0), hash)?));
}
}

drop(guard);

let execution_set: ExecutionSet = queries.into();
execution_set
.check_auth(auth.owner, auth.caller)
.map_err(ErrorVm::Auth)?;
let database_update = execution_set.eval(&ctx, sender.protocol, &self.relational_db, &tx)?;

WORKER_METRICS
Expand Down Expand Up @@ -190,19 +197,37 @@ pub struct WriteSkew;

#[cfg(test)]
mod tests {
use super::ModuleSubscriptions;
use super::{AssertTxFn, ModuleSubscriptions};
use crate::client::{ClientActorId, ClientConnectionSender, Protocol};
use crate::db::relational_db::tests_utils::TestDB;
use crate::db::relational_db::RelationalDB;
use crate::error::DBError;
use crate::execution_context::ExecutionContext;
use spacetimedb_client_api_messages::client_api::Subscribe;
use spacetimedb_lib::{error::ResultTest, AlgebraicType, Identity};
use spacetimedb_sats::db::auth::StAccess;
use spacetimedb_sats::db::error::AuthError;
use spacetimedb_sats::product;
use spacetimedb_vm::errors::ErrorVm;
use std::time::Instant;
use std::{sync::Arc, time::Duration};
use tokio::sync::mpsc;

#[test]
fn add_subscriber(db: Arc<RelationalDB>, sql: &str, assert: Option<AssertTxFn>) -> Result<(), DBError> {
let owner = Identity::from_byte_array([1; 32]);
let client = ClientActorId::for_test(Identity::ZERO);
let sender = Arc::new(ClientConnectionSender::dummy(client, Protocol::Binary));
let module_subscriptions = ModuleSubscriptions::new(db.clone(), owner);

let subscribe = Subscribe {
query_strings: [sql.into()].into(),
request_id: 0,
};
module_subscriptions.add_subscriber(sender, subscribe, Instant::now(), assert)
}

/// Asserts that a subscription holds a tx handle for the entire length of its evaluation.
#[test]
fn test_tx_subscription_ordering() -> ResultTest<()> {
let test_db = TestDB::durable()?;

Expand All @@ -215,24 +240,14 @@ mod tests {
db.insert(tx, table_id, product!(1_u8))
})?;

let id = Identity::ZERO;
let client = ClientActorId::for_test(id);
let sender = Arc::new(ClientConnectionSender::dummy(client, Protocol::Binary));
let module_subscriptions = ModuleSubscriptions::new(db.clone(), id);

let (send, mut recv) = mpsc::unbounded_channel();

// Subscribing to T should return a single row
// Subscribing to T should return a single row.
let db2 = db.clone();
let query_handle = runtime.spawn_blocking(move || {
let db = module_subscriptions.relational_db.clone();
let query_strings = vec!["select * from T".into()];
module_subscriptions.add_subscriber(
sender,
Subscribe {
query_strings,
request_id: 0,
},
Instant::now(),
add_subscriber(
db.clone(),
"select * from T",
Some(Arc::new(move |tx: &_| {
// Wake up writer thread after starting the reader tx
let _ = send.send(());
Expand All @@ -250,8 +265,8 @@ mod tests {
// Write a second row to T concurrently with the reader thread
let write_handle = runtime.spawn(async move {
let _ = recv.recv().await;
db.with_auto_commit(&ExecutionContext::default(), |tx| {
db.insert(tx, table_id, product!(2_u8))
db2.with_auto_commit(&ExecutionContext::default(), |tx| {
db2.insert(tx, table_id, product!(2_u8))
})
});

Expand All @@ -262,4 +277,41 @@ mod tests {

Ok(())
}

#[test]
fn subs_cannot_access_private_tables() -> ResultTest<()> {
let test_db = TestDB::durable()?;
let db = Arc::new(test_db.db.clone());

// Create a public table.
let indexes = &[(0.into(), "a")];
let cols = &[("a", AlgebraicType::U8)];
let _ = db.create_table_for_test("public", cols, indexes)?;

// Create a private table.
let _ = db.create_table_for_test_with_access("private", cols, indexes, StAccess::Private)?;

// We can subscribe to a public table.
let subscribe = |sql| add_subscriber(db.clone(), sql, None);
assert!(subscribe("SELECT * FROM public").is_ok());

// We cannot subscribe when a private table is mentioned,
// not even when in a join where the projection doesn't mention the table,
// as the mere fact of joining can leak information from the private table.
for sql in [
"SELECT * FROM private",
// Even if the query will return no rows, we still reject it.
"SELECT * FROM private WHERE 1 = 0",
"SELECT private.* FROM private",
"SELECT public.* FROM public JOIN private ON public.a = private.a WHERE private.a = 1",
"SELECT private.* FROM private JOIN public ON private.a = public.a WHERE public.a = 1",
] {
assert!(matches!(
subscribe(sql).unwrap_err(),
DBError::Vm(ErrorVm::Auth(AuthError::TablePrivate { .. }))
));
}

Ok(())
}
}
11 changes: 9 additions & 2 deletions crates/core/src/subscription/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,13 @@ use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
use spacetimedb_client_api_messages::client_api::TableUpdate;
use spacetimedb_data_structures::map::HashSet;
use spacetimedb_lib::identity::AuthCtx;
use spacetimedb_lib::ProductValue;
use spacetimedb_lib::{Identity, ProductValue};
use spacetimedb_primitives::TableId;
use spacetimedb_sats::db::auth::{StAccess, StTableType};
use spacetimedb_sats::db::error::AuthError;
use spacetimedb_sats::relation::DbTable;
use spacetimedb_vm::errors::ErrorVm;
use spacetimedb_vm::expr::{self, IndexJoin, Query, QueryExpr, SourceProvider, SourceSet};
use spacetimedb_vm::expr::{self, AuthAccess, IndexJoin, Query, QueryExpr, SourceProvider, SourceSet};
use spacetimedb_vm::rel_ops::RelOps;
use spacetimedb_vm::relation::{MemTable, RelValue};
use std::borrow::Cow;
Expand Down Expand Up @@ -623,6 +624,12 @@ impl From<Vec<SupportedQuery>> for ExecutionSet {
}
}

impl AuthAccess for ExecutionSet {
fn check_auth(&self, owner: Identity, caller: Identity) -> Result<(), AuthError> {
self.exec_units.iter().try_for_each(|eu| eu.check_auth(owner, caller))
}
}

/// Queries all the [`StTableType::User`] tables *right now*
/// and turns them into [`QueryExpr`],
/// the moral equivalent of `SELECT * FROM table`.
Expand Down
Loading

0 comments on commit e4f1021

Please sign in to comment.