Skip to content

Commit 7596825

Browse files
fix: multi threaded query execution (#1220)
add multi threading execution to query execute method allows datafusion to use multiple threads to perform parallel execution of plans improves query performance. Also separated code for execute query in tokio task
1 parent 4af9a5e commit 7596825

File tree

4 files changed

+30
-18
lines changed

4 files changed

+30
-18
lines changed

src/cli.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -260,11 +260,12 @@ pub struct Options {
260260
help = "Set a fixed memory limit for query in GiB"
261261
)]
262262
pub query_memory_pool_size: Option<usize>,
263-
263+
// reduced the max row group size from 1048576
264+
// smaller row groups help in faster query performance in multi threaded query
264265
#[arg(
265266
long,
266267
env = "P_PARQUET_ROW_GROUP_SIZE",
267-
default_value = "1048576",
268+
default_value = "262144",
268269
help = "Number of rows in a row group"
269270
)]
270271
pub row_group_size: usize,

src/handlers/airplane.rs

+8-4
Original file line numberDiff line numberDiff line change
@@ -215,10 +215,14 @@ impl FlightService for AirServiceImpl {
215215
Status::permission_denied("User Does not have permission to access this")
216216
})?;
217217
let time = Instant::now();
218-
let (records, _) = query
219-
.execute(stream_name.clone())
220-
.await
221-
.map_err(|err| Status::internal(err.to_string()))?;
218+
219+
let stream_name_clone = stream_name.clone();
220+
let (records, _) =
221+
match tokio::task::spawn_blocking(move || query.execute(stream_name_clone)).await {
222+
Ok(Ok((records, fields))) => (records, fields),
223+
Ok(Err(e)) => return Err(Status::internal(e.to_string())),
224+
Err(err) => return Err(Status::internal(err.to_string())),
225+
};
222226

223227
/*
224228
* INFO: No returning the schema with the data.

src/handlers/http/query.rs

+13-1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
use actix_web::http::header::ContentType;
2020
use actix_web::web::{self, Json};
2121
use actix_web::{FromRequest, HttpRequest, HttpResponse, Responder};
22+
use arrow_array::RecordBatch;
2223
use chrono::{DateTime, Utc};
2324
use datafusion::common::tree_node::TreeNode;
2425
use datafusion::error::DataFusionError;
@@ -130,7 +131,7 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<HttpRespons
130131

131132
return Ok(HttpResponse::Ok().json(response));
132133
}
133-
let (records, fields) = query.execute(table_name.clone()).await?;
134+
let (records, fields) = execute_query(query, table_name.clone()).await?;
134135

135136
let response = QueryResponse {
136137
records,
@@ -149,6 +150,17 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<HttpRespons
149150
Ok(response)
150151
}
151152

153+
async fn execute_query(
154+
query: LogicalQuery,
155+
stream_name: String,
156+
) -> Result<(Vec<RecordBatch>, Vec<String>), QueryError> {
157+
match tokio::task::spawn_blocking(move || query.execute(stream_name)).await {
158+
Ok(Ok(result)) => Ok(result),
159+
Ok(Err(e)) => Err(QueryError::Execute(e)),
160+
Err(e) => Err(QueryError::Anyhow(anyhow::Error::msg(e.to_string()))),
161+
}
162+
}
163+
152164
pub async fn get_counts(
153165
req: HttpRequest,
154166
counts_request: Json<CountsRequest>,

src/query/mod.rs

+6-11
Original file line numberDiff line numberDiff line change
@@ -88,30 +88,24 @@ impl Query {
8888
let runtime_config = runtime_config.with_memory_limit(pool_size, fraction);
8989
let runtime = Arc::new(runtime_config.build().unwrap());
9090

91+
// All the config options are explained here -
92+
// https://datafusion.apache.org/user-guide/configs.html
9193
let mut config = SessionConfig::default()
9294
.with_parquet_pruning(true)
9395
.with_prefer_existing_sort(true)
94-
.with_round_robin_repartition(true);
95-
96-
// For more details refer https://datafusion.apache.org/user-guide/configs.html
97-
98-
// Reduce the number of rows read (if possible)
99-
config.options_mut().execution.parquet.enable_page_index = true;
96+
.with_batch_size(1000000);
10097

10198
// Pushdown filters allows DF to push the filters as far down in the plan as possible
10299
// and thus, reducing the number of rows decoded
103100
config.options_mut().execution.parquet.pushdown_filters = true;
104101

105102
// Reorder filters allows DF to decide the order of filters minimizing the cost of filter evaluation
106103
config.options_mut().execution.parquet.reorder_filters = true;
107-
108-
// Enable StringViewArray
109-
// https://www.influxdata.com/blog/faster-queries-with-stringview-part-one-influxdb/
104+
config.options_mut().execution.parquet.binary_as_string = true;
110105
config
111106
.options_mut()
112107
.execution
113-
.parquet
114-
.schema_force_view_types = true;
108+
.use_row_number_estimates_to_optimize_partitioning = true;
115109

116110
let state = SessionStateBuilder::new()
117111
.with_default_features()
@@ -135,6 +129,7 @@ impl Query {
135129
SessionContext::new_with_state(state)
136130
}
137131

132+
#[tokio::main(flavor = "multi_thread")]
138133
pub async fn execute(
139134
&self,
140135
stream_name: String,

0 commit comments

Comments
 (0)