Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: multi threaded query execution #1220

Merged
merged 8 commits into from
Mar 6, 2025
5 changes: 3 additions & 2 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,11 +260,12 @@ pub struct Options {
help = "Set a fixed memory limit for query in GiB"
)]
pub query_memory_pool_size: Option<usize>,

// reduced the max row group size from 1048576
// smaller row groups help in faster query performance in multi threaded query
#[arg(
long,
env = "P_PARQUET_ROW_GROUP_SIZE",
default_value = "1048576",
default_value = "262144",
help = "Number of rows in a row group"
)]
pub row_group_size: usize,
Expand Down
12 changes: 8 additions & 4 deletions src/handlers/airplane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,10 +215,14 @@ impl FlightService for AirServiceImpl {
Status::permission_denied("User Does not have permission to access this")
})?;
let time = Instant::now();
let (records, _) = query
.execute(stream_name.clone())
.await
.map_err(|err| Status::internal(err.to_string()))?;

let stream_name_clone = stream_name.clone();
let (records, _) =
match tokio::task::spawn_blocking(move || query.execute(stream_name_clone)).await {
Ok(Ok((records, fields))) => (records, fields),
Ok(Err(e)) => return Err(Status::internal(e.to_string())),
Err(err) => return Err(Status::internal(err.to_string())),
};

/*
* INFO: No returning the schema with the data.
Expand Down
14 changes: 13 additions & 1 deletion src/handlers/http/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
use actix_web::http::header::ContentType;
use actix_web::web::{self, Json};
use actix_web::{FromRequest, HttpRequest, HttpResponse, Responder};
use arrow_array::RecordBatch;
use chrono::{DateTime, Utc};
use datafusion::common::tree_node::TreeNode;
use datafusion::error::DataFusionError;
Expand Down Expand Up @@ -130,7 +131,7 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<HttpRespons

return Ok(HttpResponse::Ok().json(response));
}
let (records, fields) = query.execute(table_name.clone()).await?;
let (records, fields) = execute_query(query, table_name.clone()).await?;

let response = QueryResponse {
records,
Expand All @@ -149,6 +150,17 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<HttpRespons
Ok(response)
}

async fn execute_query(
query: LogicalQuery,
stream_name: String,
) -> Result<(Vec<RecordBatch>, Vec<String>), QueryError> {
match tokio::task::spawn_blocking(move || query.execute(stream_name)).await {
Ok(Ok(result)) => Ok(result),
Ok(Err(e)) => Err(QueryError::Execute(e)),
Err(e) => Err(QueryError::Anyhow(anyhow::Error::msg(e.to_string()))),
}
}

pub async fn get_counts(
req: HttpRequest,
counts_request: Json<CountsRequest>,
Expand Down
17 changes: 6 additions & 11 deletions src/query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,30 +88,24 @@ impl Query {
let runtime_config = runtime_config.with_memory_limit(pool_size, fraction);
let runtime = Arc::new(runtime_config.build().unwrap());

// All the config options are explained here -
// https://datafusion.apache.org/user-guide/configs.html
let mut config = SessionConfig::default()
.with_parquet_pruning(true)
.with_prefer_existing_sort(true)
.with_round_robin_repartition(true);

// For more details refer https://datafusion.apache.org/user-guide/configs.html

// Reduce the number of rows read (if possible)
config.options_mut().execution.parquet.enable_page_index = true;
.with_batch_size(1000000);

// Pushdown filters allows DF to push the filters as far down in the plan as possible
// and thus, reducing the number of rows decoded
config.options_mut().execution.parquet.pushdown_filters = true;

// Reorder filters allows DF to decide the order of filters minimizing the cost of filter evaluation
config.options_mut().execution.parquet.reorder_filters = true;

// Enable StringViewArray
// https://www.influxdata.com/blog/faster-queries-with-stringview-part-one-influxdb/
config.options_mut().execution.parquet.binary_as_string = true;
config
.options_mut()
.execution
.parquet
.schema_force_view_types = true;
.use_row_number_estimates_to_optimize_partitioning = true;

let state = SessionStateBuilder::new()
.with_default_features()
Expand All @@ -135,6 +129,7 @@ impl Query {
SessionContext::new_with_state(state)
}

#[tokio::main(flavor = "multi_thread")]
pub async fn execute(
&self,
stream_name: String,
Expand Down
Loading