-
-
Notifications
You must be signed in to change notification settings - Fork 129
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: multi threaded query execution #1220
fix: multi threaded query execution #1220
Conversation
WalkthroughThe pull request refactors asynchronous query execution across multiple modules to prevent blocking the async runtime. In both Changes
Sequence Diagram(s)sequenceDiagram
participant C as Client
participant H as Handler
participant BT as Blocking Task
participant Q as Query Executor
C->>H: Send query request
H->>H: Clone stream name
H->>BT: Spawn blocking task for query execution
BT->>Q: Execute query
Q-->>BT: Return query result (Ok/Err)
BT-->>H: Provide result
H->>C: Respond with records or error
Suggested labels
Poem
📜 Recent review detailsConfiguration used: CodeRabbit UI 📒 Files selected for processing (2)
🚧 Files skipped from review as they are similar to previous changes (2)
⏰ Context from checks skipped due to timeout of 90000ms (10)
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
src/handlers/http/query.rs (1)
133-139
: Great implementation of asynchronous query execution using spawn_blockingThe change to use
tokio::task::spawn_blocking
is an excellent improvement as it offloads potentially CPU-intensive query execution to a dedicated thread pool, preventing blocking of the async runtime. The error handling is also well-implemented with proper matching against different error scenarios.For even better readability and separation of concerns, consider extracting this pattern into a helper function since it's used in multiple handlers:
-let table_name_clone = table_name.clone(); -let (records, fields) = - match tokio::task::spawn_blocking(move || query.execute(table_name_clone)).await { - Ok(Ok((records, fields))) => (records, fields), - Ok(Err(e)) => return Err(QueryError::Execute(e)), - Err(e) => return Err(QueryError::Anyhow(anyhow::Error::msg(e.to_string()))), - }; +let (records, fields) = execute_query_in_blocking_thread(query, table_name.clone()).await?; // Then define this helper function elsewhere: // async fn execute_query_in_blocking_thread( // query: LogicalQuery, // stream_name: String, // ) -> Result<(Vec<RecordBatch>, Vec<String>), QueryError> { // match tokio::task::spawn_blocking(move || query.execute(stream_name)).await { // Ok(Ok(result)) => Ok(result), // Ok(Err(e)) => Err(QueryError::Execute(e)), // Err(e) => Err(QueryError::Anyhow(anyhow::Error::msg(e.to_string()))), // } // }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
src/handlers/airplane.rs
(1 hunks)src/handlers/http/query.rs
(1 hunks)src/parseable/streams.rs
(1 hunks)src/query/mod.rs
(2 hunks)
✅ Files skipped from review due to trivial changes (1)
- src/parseable/streams.rs
⏰ Context from checks skipped due to timeout of 90000ms (10)
- GitHub Check: Build Kafka aarch64-apple-darwin
- GitHub Check: Build Default x86_64-pc-windows-msvc
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
- GitHub Check: Build Default aarch64-apple-darwin
- GitHub Check: Build Default x86_64-apple-darwin
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
- GitHub Check: coverage
🔇 Additional comments (3)
src/handlers/airplane.rs (1)
219-225
:❓ Verification inconclusive
Effective implementation of non-blocking query execution
Offloading the query execution to a blocking thread pool using
tokio::task::spawn_blocking
is a great architectural improvement. This prevents the query execution from blocking the async Tokio runtime, improving overall system throughput and responsiveness.Let's verify the error handling improvements by checking if there are different error scenarios properly handled in both handlers:
🏁 Script executed:
#!/bin/bash # Check if both handlers handle errors consistently # First, check the query handler error handling echo "Checking HTTP query handler error handling..." rg -A 5 "tokio::task::spawn_blocking.*query.execute" src/handlers/http/query.rs # Then, check the airplane handler error handling echo "Checking airplane handler error handling..." rg -A 5 "tokio::task::spawn_blocking.*query.execute" src/handlers/airplane.rs # Compare the error types being returned echo "Comparing error types..." rg "return Err\(" src/handlers/http/query.rs | grep -A 2 "tokio::task::spawn_blocking" rg "return Err\(" src/handlers/airplane.rs | grep -A 2 "tokio::task::spawn_blocking"Length of output: 1264
Non-blocking query execution is effectively implemented, but please double-check error type differences
The offloading of
query.execute
to a blocking thread pool is well done. Our verification shows that in the airplane handler the errors are wrapped usingStatus::internal
, whereas the HTTP query handler wraps errors in customQueryError
variants. This discrepancy appears intentional given the different protocol contexts (gRPC vs. HTTP), but please confirm these differences were by design.
- Location:
src/handlers/airplane.rs
(Lines 219-225) – error handling viaStatus::internal
- Observation: HTTP handler (
src/handlers/http/query.rs
) usesQueryError::Execute
andQueryError::Anyhow
- Action: Verify that these distinct error types align with the intended behavior for each endpoint
src/query/mod.rs (2)
94-95
:✅ Verification successful
Optimization: Added batch size configuration for improved performance
Setting a specific batch size (8192) is a good optimization. This controls how many rows are processed in each batch during query execution, allowing for better memory usage and potentially improved performance.
Let's analyze how this batch size value compares to defaults and recommendations:
🌐 Web query:
What is the default batch size in DataFusion and what are the recommended values?
💡 Result:
The default batch size in Apache DataFusion is 8192 rows per batch, as shown in configuration settings and CLI outputs[1][2][3]. This value balances memory efficiency and metadata overhead, particularly for buffer-in-memory operations where small batches would create excessive metadata[2][3].
Recommended Batch Size Values
Use Case Recommended Size Rationale General workloads 8192 (default) Balances memory usage and processing efficiency[2][7] Memory-constrained environments 1024-4096 Reduces per-batch memory footprint[1][6] Large aggregation/join operations 16384 Shows ~10% performance improvement in some queries[7] TopK/sorting optimizations Match LIMIT clause Partial sorts can use smaller batches (e.g., 10-100 rows)[5] Key considerations when adjusting:
- Larger batches (16K-65K rows) improve throughput for complex queries but increase memory pressure[7][10]
- Smaller batches (1K-4K) reduce memory usage but may impact performance due to increased batch count[5][6]
- Use
SET datafusion.execution.batch_size TO <value>
for session-level adjustments[1]- Configure via environment variable
DATAFUSION_EXECUTION_BATCH_SIZE
for CLI persistence[1][3]The optimal size depends on query patterns and available memory resources, with Cloud DataFusion deployments typically recommending nodes with ≥8GB RAM[4]. Performance testing with representative workloads is advised when tuning this parameter.
Citations:
- 1: https://datafusion.apache.org/user-guide/cli/usage.html
- 2: https://datafusion.apache.org/user-guide/configs.html
- 3: https://datafusion.apache.org/_sources/user-guide/configs.md.txt
- 4: https://cloud.google.com/data-fusion/docs/concepts/cluster-sizing
- 5: https://xebia.com/blog/optimizing-topk-queries-in-datafusion/
- 6: https://docs.rs/datafusion/latest/datafusion/execution/context/struct.SessionContext.html
- 7: Tweak optimal default batch size apache/datafusion#6916
- 8: Improve RepartitionExec for better query performance apache/datafusion#7001
- 9: https://cloud.google.com/data-fusion/docs/how-to/configure-cloud-storage-batch-source
- 10: https://www.influxdata.com/blog/aggregating-millions-groups-fast-apache-arrow-datafusion/
Clarification: Explicit Batch Size Configuration Confirmed
The explicit setting of the batch size to 8192 in
src/query/mod.rs
(lines 94-95) aligns with Apache DataFusion defaults and recommendations for general workloads. Although this value is the default, specifying it enhances clarity and eases future tuning if workload requirements change. Approved.
139-139
:❓ Verification inconclusive
Excellent use of multi-threaded tokio runtime for query execution
Adding the
#[tokio::main(flavor = "multi_thread")]
attribute to theexecute
method enables multi-threaded query execution, allowing DataFusion to parallelize operations across multiple CPU cores. This is crucial for improving query performance, especially for complex queries on large datasets.Let's check if the
multi_thread
flavor is appropriate and if there are any potential limitations:
🌐 Web query:
What are the pros and cons of using #[tokio::main(flavor = "multi_thread")] on a method instead of the main function?
💡 Result:
Tokio's
multi_thread
runtime flavor balances concurrency and resource utilization through work-stealing across a thread pool, but its effectiveness depends on proper usage. Here's a breakdown of its trade-offs when applied at different scopes:Runtime Configuration Fundamentals
#[tokio::main(flavor = "multi_thread")]
initializes a global runtime with worker threads (defaulting to CPU core count).- Tasks are
Send
+'static
by default, enabling thread-safe execution across cores[1][7].
Pros of Multi-Threaded Runtime
Work-Stealing Efficiency
- Distributes tasks dynamically across threads, preventing idle cores and reducing tail latency under load[3][9].
- Example: A web server handles 500 concurrent requests faster than a single-threaded runtime, even if each task sleeps briefly[1].
Blocking Task Isolation
- Use
tokio::task::spawn_blocking
to offload CPU-heavy work to a dedicated thread pool, avoiding starvation of async tasks[2][4].Resource Scalability
- Adjust worker threads via
worker_threads=N
to balance throughput and memory usage (e.g., restrict to 1 thread for low-resource environments)[3].
Cons of Multi-Threaded Runtime
Send
Bound Complexity
- Futures and data shared across threads must implement
Send
, complicating code with non-Send
types (e.g., raw pointers, thread-local data)[4][7].Deadlock Risks
- Blocking synchronous code (e.g.,
std::thread::sleep
) in async tasks can stall the entire runtime if worker threads are saturated[2].- Mitigation: Use
yield_now()
or offload tospawn_blocking
.Overhead Trade-Offs
- Multithreading adds memory and scheduling overhead. Single-threaded runtimes (
flavor = "current_thread"
) often match performance for light workloads[3][4].
Method vs. Main Function Usage
- Main Function: Standard approach. Initializes the runtime once, ensuring all async code uses the same executor[3][7].
- Method-Level: Not recommended. Tokio prohibits nested runtimes, and method-specific initialization risks:
- Runtime conflicts or unintended thread pools.
- Fragmented resource management (e.g., duplicated thread pools)[2][4].
When to Prefer Single-Threaded
- Low-concurrency services (e.g., CLI tools).
!Send
futures or shared references across awaits (simpler withLocalSet
)[4][7].- Resource-constrained environments (embedded systems)[3].
In practice, benchmark both runtimes with
worker_threads=1
versuscurrent_thread
to quantify trade-offs for your workload[3]. Usemulti_thread
as the default for servers and adjust based on profiling.Citations:
- 1: https://users.rust-lang.org/t/dont-fully-understand-tokio-multithreaded-runtime-benefits/113393
- 2: https://users.rust-lang.org/t/tokio-using-core-threads-for-cpu-heavy-computation/83443
- 3: https://users.rust-lang.org/t/choosing-rt-multi-thread-versus-rt-option-in-tokio/106125
- 4: https://news.ycombinator.com/item?id=37466284
- 5: https://bitbashing.io/async-rust.html
- 6: https://www.howtocodeit.com/articles/master-hexagonal-architecture-rust
- 7: https://swatinem.de/blog/futures-n-tasks/
- 8: https://livebook.manning.com/book/code-like-a-pro-in-rust/chapter-11/v-8/
- 9: https://news.ycombinator.com/item?id=36782884
- 10: https://livebook.manning.com/book/code-like-a-pro-in-rust/chapter-8/
Action Required: Verify Runtime Context for Tokio Attribute Usage
The use of
#[tokio::main(flavor = "multi_thread")]
in this method leverages Tokio’s multi-threaded runtime for efficient, parallel query execution, which is beneficial for large-scale, complex queries. However, please verify that this method is not being invoked within an already active Tokio runtime. Using the attribute on a non-main function can risk nested runtime conflicts (e.g., creating multiple thread pools or potential deadlocks) if not properly isolated.
- Location:
src/query/mod.rs
, Line 139- Recommendation: Ensure that this method is the sole entry point for starting a runtime in its context, or refactor to initialize the runtime in the main function if nested execution might occur.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
src/handlers/http/query.rs (2)
134-134
: Consider resource limits or timeouts for parallel query execution.Although calling
execute_query
asynchronously improves throughput, there is no guardrail on concurrency (like a task limiter). If many queries arrive simultaneously, the thread pool might become saturated, impacting the system’s responsiveness.
153-162
: Implementation of asynchronous, multi-threaded query execution looks good.
- The usage of
tokio::task::spawn_blocking
is appropriate for CPU-bound tasks, preventing blocking of the async runtime.- The error handling cleanly distinguishes between internal query execution errors and the blocking task’s errors.
- Consider adding optional logs or telemetry within
spawn_blocking
to track query performance or detect long-running tasks.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
src/handlers/http/query.rs
(3 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (10)
- GitHub Check: Build Default x86_64-pc-windows-msvc
- GitHub Check: Build Default aarch64-apple-darwin
- GitHub Check: Build Default x86_64-apple-darwin
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: Build Kafka aarch64-apple-darwin
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: coverage
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
🔇 Additional comments (1)
src/handlers/http/query.rs (1)
22-22
: No issues noted with the newly added import.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (1)
src/handlers/http/query.rs (1)
34-34
: Importingstd::thread
is acceptable, but consider async best practices.
It’s okay to importstd::thread
for parallel work, though using it in an async context can block the runtime. If you need full non-blocking behavior,tokio::task::spawn_blocking
is often recommended.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
src/handlers/http/query.rs
(4 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (10)
- GitHub Check: Build Default x86_64-pc-windows-msvc
- GitHub Check: Build Default aarch64-apple-darwin
- GitHub Check: Build Kafka aarch64-apple-darwin
- GitHub Check: Build Default x86_64-apple-darwin
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: coverage
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
🔇 Additional comments (2)
src/handlers/http/query.rs (2)
22-22
: Import looks fine.
No issues spotted; importingRecordBatch
here is appropriate to handle query results.
135-135
: Validate concurrency approach.
The invocation of the newexecute_query
function is clear, but ensure that parallel calls to this endpoint won’t block the async runtime if all threads are occupied, especially under high load.
1ac2dc4
to
b4c3d88
Compare
add multi threading execution to query execute method allows datafusion to use multiple threads to perform parallel execution of plans improves query performance
0493eb2
to
475d1a7
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
src/query/mod.rs (1)
103-103
: Document this configuration optionPlease add a comment explaining why
binary_as_string = true
is necessary. What specific issues does this solve, and how does it affect query results?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
src/query/mod.rs
(2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (10)
- GitHub Check: Build Default x86_64-pc-windows-msvc
- GitHub Check: Build Default aarch64-apple-darwin
- GitHub Check: Build Default x86_64-apple-darwin
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: Build Kafka aarch64-apple-darwin
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
- GitHub Check: coverage
🔇 Additional comments (3)
src/query/mod.rs (3)
94-94
: Add documentation for this configuration valuePlease add a comment explaining why a batch size of 1000000 was chosen and its impact on query performance. This will help maintainers understand the rationale behind this specific value.
107-107
: Document how this optimization affects query executionThis configuration option seems important for partitioning optimization. Please add a comment explaining its purpose and expected impact on query performance.
131-131
: Good implementation of multi-threaded executionThe
#[tokio::main(flavor = "multi_thread")]
annotation aligns well with the PR objective to improve query performance through multi-threading. This allows DataFusion to utilize multiple threads for parallel execution of query plans.Consider adding a comment explaining how this annotation improves performance and any potential resource considerations (e.g., thread pool size, memory usage).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
src/query/mod.rs (1)
132-132
: Excellent improvement with multi-threaded execution!Adding
#[tokio::main(flavor = "multi_thread")]
enables DataFusion to leverage multiple CPU cores for query execution, which should significantly improve performance for complex queries or large datasets.Consider adding a brief comment explaining the performance benefits this change provides and any considerations for resource usage.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
src/cli.rs
(1 hunks)src/query/mod.rs
(2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- src/cli.rs
⏰ Context from checks skipped due to timeout of 90000ms (10)
- GitHub Check: Build Default x86_64-pc-windows-msvc
- GitHub Check: Build Default aarch64-apple-darwin
- GitHub Check: Build Default x86_64-apple-darwin
- GitHub Check: Build Kafka aarch64-apple-darwin
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: coverage
🔇 Additional comments (3)
src/query/mod.rs (3)
96-96
: Consider adding an explanation for the batch size selection.The batch size of 1,000,000 is a significant configuration parameter that affects query performance and memory usage. Adding a comment explaining why this specific value was chosen would improve code maintainability.
104-104
: Good addition of binary_as_string configuration.Setting
binary_as_string = true
ensures that binary data is treated as strings, which is appropriate for log data that may contain binary content that needs to be displayed as text.
91-92
: Great documentation reference!Providing a link to the DataFusion documentation helps developers understand the available configuration options and their implications.
f0417ec
to
5a5dc0c
Compare
add multi threading execution to query execute method
allows datafusion to use multiple threads to perform parallel execution of plans
improves query performance
Summary by CodeRabbit
Refactor
Chores