Skip to content

Commit e0cc8c8

Browse files
alambmustafasrepoDandandanDaniël Heresr4ntix
authored
Vectorized hash grouping (#6904)
* Vectorized hash grouping * Prepare for merge to main * Improve comments and update size calculations * Implement test for accumulate_boolean refactor * Use resize instead of resize_with * fix avg size calculation * Simplify sum accumulator * Add comments explaining i64 as counts * Clarify `aggreate_arguments` * Apply suggestions from code review Co-authored-by: Mustafa Akur <[email protected]> * Clarify rationale for ScratchSpace being a field * use slice syntax * Update datafusion/physical-expr/src/aggregate/average.rs Co-authored-by: Mustafa Akur <[email protected]> * Update datafusion/physical-expr/src/aggregate/count.rs Co-authored-by: Mustafa Akur <[email protected]> * Update datafusion/physical-expr/src/aggregate/groups_accumulator/adapter.rs Co-authored-by: Mustafa Akur <[email protected]> * fix diagram * Update datafusion/physical-expr/src/aggregate/groups_accumulator/adapter.rs Co-authored-by: Mustafa Akur <[email protected]> * simplify the supported logic * Add a log message when using slow adapter * fmt * Revert "chore(deps): update bigdecimal requirement from 0.3.0 to 0.4.0 (#6848)" (#6896) This reverts commit d0def42. * Make FileScanConfig::project pub (#6931) Co-authored-by: Daniël Heres <[email protected]> * feat: add round trip test of physical plan in tpch unit tests (#6918) * Use thiserror to implement the From trait for DFSqlLogicTestError (#6924) * parallel csv scan (#6801) * parallel csv scan * add max line length * Update according to review comments * Update Configuration doc --------- Co-authored-by: Andrew Lamb <[email protected]> * Add additional test coverage for aggregaes using dates/times/timestamps/decimals (#6939) * Add additional test coverage for aggregaes using dates/times/timestamps/decimals * Add coverage for date32/date64 * Support timestamp types for min/max * Fix aggregate nullability calculation --------- Co-authored-by: Mustafa Akur <[email protected]> Co-authored-by: Daniël Heres <[email protected]> Co-authored-by: Daniël Heres <[email protected]> Co-authored-by: r.4ntix <[email protected]> Co-authored-by: Jonah Gao <[email protected]> Co-authored-by: Yongting You <[email protected]>
1 parent 46182c8 commit e0cc8c8

File tree

21 files changed

+3056
-575
lines changed

21 files changed

+3056
-575
lines changed

Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -56,4 +56,4 @@ lto = false
5656
opt-level = 3
5757
overflow-checks = false
5858
panic = 'unwind'
59-
rpath = false
59+
rpath = false

datafusion-cli/Cargo.lock

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/core/src/physical_plan/aggregates/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -998,7 +998,7 @@ fn group_schema(schema: &Schema, group_count: usize) -> SchemaRef {
998998
Arc::new(Schema::new(group_fields))
999999
}
10001000

1001-
/// returns physical expressions to evaluate against a batch
1001+
/// returns physical expressions for arguments to evaluate against a batch
10021002
/// The expressions are different depending on `mode`:
10031003
/// * Partial: AggregateExpr::expressions
10041004
/// * Final: columns of `AggregateExpr::state_fields()`

datafusion/core/src/physical_plan/aggregates/row_hash.rs

+356-545
Large diffs are not rendered by default.

datafusion/core/tests/user_defined/user_defined_window_functions.rs

+2-5
Original file line numberDiff line numberDiff line change
@@ -515,7 +515,7 @@ impl PartitionEvaluator for OddCounter {
515515
println!("evaluate, values: {values:#?}, range: {range:?}");
516516

517517
self.test_state.inc_evaluate_called();
518-
let values: &Int64Array = values.get(0).unwrap().as_primitive();
518+
let values: &Int64Array = values[0].as_primitive();
519519
let values = values.slice(range.start, range.len());
520520
let scalar = ScalarValue::Int64(
521521
match (odd_count(&values), self.test_state.null_for_zero) {
@@ -534,10 +534,7 @@ impl PartitionEvaluator for OddCounter {
534534
println!("evaluate_all, values: {values:#?}, num_rows: {num_rows}");
535535

536536
self.test_state.inc_evaluate_all_called();
537-
Ok(odd_count_arr(
538-
values.get(0).unwrap().as_primitive(),
539-
num_rows,
540-
))
537+
Ok(odd_count_arr(values[0].as_primitive(), num_rows))
541538
}
542539

543540
fn evaluate_all_with_rank(

datafusion/execution/src/memory_pool/proxy.rs

+8
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,11 @@ pub trait VecAllocExt {
2626

2727
/// [Push](Vec::push) new element to vector and store additional allocated bytes in `accounting` (additive).
2828
fn push_accounted(&mut self, x: Self::T, accounting: &mut usize);
29+
30+
/// Return the amount of memory allocated by this Vec (not
31+
/// recursively counting any heap allocations contained within the
32+
/// structure). Does not include the size of `self`
33+
fn allocated_size(&self) -> usize;
2934
}
3035

3136
impl<T> VecAllocExt for Vec<T> {
@@ -44,6 +49,9 @@ impl<T> VecAllocExt for Vec<T> {
4449

4550
self.push(x);
4651
}
52+
fn allocated_size(&self) -> usize {
53+
std::mem::size_of::<T>() * self.capacity()
54+
}
4755
}
4856

4957
/// Extension trait for [`RawTable`] to account for allocations.

datafusion/physical-expr/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ indexmap = "2.0.0"
6262
itertools = { version = "0.11", features = ["use_std"] }
6363
lazy_static = { version = "^1.4.0" }
6464
libc = "0.2.140"
65+
log = "^0.4"
6566
md-5 = { version = "^0.10.0", optional = true }
6667
paste = "^1.0"
6768
petgraph = "0.6.2"

datafusion/physical-expr/src/aggregate/average.rs

+239-3
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,14 @@
1717

1818
//! Defines physical expressions that can evaluated at runtime during query execution
1919
20+
use arrow::array::{AsArray, PrimitiveBuilder};
21+
use log::debug;
22+
2023
use std::any::Any;
2124
use std::convert::TryFrom;
2225
use std::sync::Arc;
2326

27+
use crate::aggregate::groups_accumulator::accumulate::NullState;
2428
use crate::aggregate::row_accumulator::{
2529
is_row_accumulator_support_dtype, RowAccumulator,
2630
};
@@ -29,19 +33,23 @@ use crate::aggregate::sum::sum_batch;
2933
use crate::aggregate::utils::calculate_result_decimal_for_avg;
3034
use crate::aggregate::utils::down_cast_any_ref;
3135
use crate::expressions::format_state_name;
32-
use crate::{AggregateExpr, PhysicalExpr};
36+
use crate::{AggregateExpr, GroupsAccumulator, PhysicalExpr};
3337
use arrow::compute;
34-
use arrow::datatypes::DataType;
38+
use arrow::datatypes::{DataType, Decimal128Type, Float64Type, UInt64Type};
3539
use arrow::{
3640
array::{ArrayRef, UInt64Array},
3741
datatypes::Field,
3842
};
39-
use arrow_array::Array;
43+
use arrow_array::{
44+
Array, ArrowNativeTypeOp, ArrowNumericType, ArrowPrimitiveType, PrimitiveArray,
45+
};
4046
use datafusion_common::{downcast_value, ScalarValue};
4147
use datafusion_common::{DataFusionError, Result};
4248
use datafusion_expr::Accumulator;
4349
use datafusion_row::accessor::RowAccessor;
4450

51+
use super::utils::{adjust_output_array, Decimal128Averager};
52+
4553
/// AVG aggregate expression
4654
#[derive(Debug, Clone)]
4755
pub struct Avg {
@@ -155,6 +163,50 @@ impl AggregateExpr for Avg {
155163
&self.rt_data_type,
156164
)?))
157165
}
166+
167+
fn groups_accumulator_supported(&self) -> bool {
168+
use DataType::*;
169+
170+
matches!(&self.rt_data_type, Float64 | Decimal128(_, _))
171+
}
172+
173+
fn create_groups_accumulator(&self) -> Result<Box<dyn GroupsAccumulator>> {
174+
use DataType::*;
175+
// instantiate specialized accumulator based for the type
176+
match (&self.sum_data_type, &self.rt_data_type) {
177+
(Float64, Float64) => {
178+
Ok(Box::new(AvgGroupsAccumulator::<Float64Type, _>::new(
179+
&self.sum_data_type,
180+
&self.rt_data_type,
181+
|sum: f64, count: u64| Ok(sum / count as f64),
182+
)))
183+
}
184+
(
185+
Decimal128(_sum_precision, sum_scale),
186+
Decimal128(target_precision, target_scale),
187+
) => {
188+
let decimal_averager = Decimal128Averager::try_new(
189+
*sum_scale,
190+
*target_precision,
191+
*target_scale,
192+
)?;
193+
194+
let avg_fn =
195+
move |sum: i128, count: u64| decimal_averager.avg(sum, count as i128);
196+
197+
Ok(Box::new(AvgGroupsAccumulator::<Decimal128Type, _>::new(
198+
&self.sum_data_type,
199+
&self.rt_data_type,
200+
avg_fn,
201+
)))
202+
}
203+
204+
_ => Err(DataFusionError::NotImplemented(format!(
205+
"AvgGroupsAccumulator for ({} --> {})",
206+
self.sum_data_type, self.rt_data_type,
207+
))),
208+
}
209+
}
158210
}
159211

160212
impl PartialEq<dyn Any> for Avg {
@@ -383,6 +435,190 @@ impl RowAccumulator for AvgRowAccumulator {
383435
}
384436
}
385437

438+
/// An accumulator to compute the average of `[PrimitiveArray<T>]`.
439+
/// Stores values as native types, and does overflow checking
440+
///
441+
/// F: Function that calculates the average value from a sum of
442+
/// T::Native and a total count
443+
#[derive(Debug)]
444+
struct AvgGroupsAccumulator<T, F>
445+
where
446+
T: ArrowNumericType + Send,
447+
F: Fn(T::Native, u64) -> Result<T::Native> + Send,
448+
{
449+
/// The type of the internal sum
450+
sum_data_type: DataType,
451+
452+
/// The type of the returned sum
453+
return_data_type: DataType,
454+
455+
/// Count per group (use u64 to make UInt64Array)
456+
counts: Vec<u64>,
457+
458+
/// Sums per group, stored as the native type
459+
sums: Vec<T::Native>,
460+
461+
/// Track nulls in the input / filters
462+
null_state: NullState,
463+
464+
/// Function that computes the final average (value / count)
465+
avg_fn: F,
466+
}
467+
468+
impl<T, F> AvgGroupsAccumulator<T, F>
469+
where
470+
T: ArrowNumericType + Send,
471+
F: Fn(T::Native, u64) -> Result<T::Native> + Send,
472+
{
473+
pub fn new(sum_data_type: &DataType, return_data_type: &DataType, avg_fn: F) -> Self {
474+
debug!(
475+
"AvgGroupsAccumulator ({}, sum type: {sum_data_type:?}) --> {return_data_type:?}",
476+
std::any::type_name::<T>()
477+
);
478+
479+
Self {
480+
return_data_type: return_data_type.clone(),
481+
sum_data_type: sum_data_type.clone(),
482+
counts: vec![],
483+
sums: vec![],
484+
null_state: NullState::new(),
485+
avg_fn,
486+
}
487+
}
488+
}
489+
490+
impl<T, F> GroupsAccumulator for AvgGroupsAccumulator<T, F>
491+
where
492+
T: ArrowNumericType + Send,
493+
F: Fn(T::Native, u64) -> Result<T::Native> + Send,
494+
{
495+
fn update_batch(
496+
&mut self,
497+
values: &[ArrayRef],
498+
group_indices: &[usize],
499+
opt_filter: Option<&arrow_array::BooleanArray>,
500+
total_num_groups: usize,
501+
) -> Result<()> {
502+
assert_eq!(values.len(), 1, "single argument to update_batch");
503+
let values = values[0].as_primitive::<T>();
504+
505+
// increment counts, update sums
506+
self.counts.resize(total_num_groups, 0);
507+
self.sums.resize(total_num_groups, T::default_value());
508+
self.null_state.accumulate(
509+
group_indices,
510+
values,
511+
opt_filter,
512+
total_num_groups,
513+
|group_index, new_value| {
514+
let sum = &mut self.sums[group_index];
515+
*sum = sum.add_wrapping(new_value);
516+
517+
self.counts[group_index] += 1;
518+
},
519+
);
520+
521+
Ok(())
522+
}
523+
524+
fn merge_batch(
525+
&mut self,
526+
values: &[ArrayRef],
527+
group_indices: &[usize],
528+
opt_filter: Option<&arrow_array::BooleanArray>,
529+
total_num_groups: usize,
530+
) -> Result<()> {
531+
assert_eq!(values.len(), 2, "two arguments to merge_batch");
532+
// first batch is counts, second is partial sums
533+
let partial_counts = values[0].as_primitive::<UInt64Type>();
534+
let partial_sums = values[1].as_primitive::<T>();
535+
// update counts with partial counts
536+
self.counts.resize(total_num_groups, 0);
537+
self.null_state.accumulate(
538+
group_indices,
539+
partial_counts,
540+
opt_filter,
541+
total_num_groups,
542+
|group_index, partial_count| {
543+
self.counts[group_index] += partial_count;
544+
},
545+
);
546+
547+
// update sums
548+
self.sums.resize(total_num_groups, T::default_value());
549+
self.null_state.accumulate(
550+
group_indices,
551+
partial_sums,
552+
opt_filter,
553+
total_num_groups,
554+
|group_index, new_value: <T as ArrowPrimitiveType>::Native| {
555+
let sum = &mut self.sums[group_index];
556+
*sum = sum.add_wrapping(new_value);
557+
},
558+
);
559+
560+
Ok(())
561+
}
562+
563+
fn evaluate(&mut self) -> Result<ArrayRef> {
564+
let counts = std::mem::take(&mut self.counts);
565+
let sums = std::mem::take(&mut self.sums);
566+
let nulls = self.null_state.build();
567+
568+
assert_eq!(nulls.len(), sums.len());
569+
assert_eq!(counts.len(), sums.len());
570+
571+
// don't evaluate averages with null inputs to avoid errors on null values
572+
573+
let array: PrimitiveArray<T> = if nulls.null_count() > 0 {
574+
let mut builder = PrimitiveBuilder::<T>::with_capacity(nulls.len());
575+
let iter = sums.into_iter().zip(counts.into_iter()).zip(nulls.iter());
576+
577+
for ((sum, count), is_valid) in iter {
578+
if is_valid {
579+
builder.append_value((self.avg_fn)(sum, count)?)
580+
} else {
581+
builder.append_null();
582+
}
583+
}
584+
builder.finish()
585+
} else {
586+
let averages: Vec<T::Native> = sums
587+
.into_iter()
588+
.zip(counts.into_iter())
589+
.map(|(sum, count)| (self.avg_fn)(sum, count))
590+
.collect::<Result<Vec<_>>>()?;
591+
PrimitiveArray::new(averages.into(), Some(nulls)) // no copy
592+
};
593+
594+
// fix up decimal precision and scale for decimals
595+
let array = adjust_output_array(&self.return_data_type, Arc::new(array))?;
596+
597+
Ok(array)
598+
}
599+
600+
// return arrays for sums and counts
601+
fn state(&mut self) -> Result<Vec<ArrayRef>> {
602+
let nulls = Some(self.null_state.build());
603+
let counts = std::mem::take(&mut self.counts);
604+
let counts = UInt64Array::new(counts.into(), nulls.clone()); // zero copy
605+
606+
let sums = std::mem::take(&mut self.sums);
607+
let sums = PrimitiveArray::<T>::new(sums.into(), nulls); // zero copy
608+
let sums = adjust_output_array(&self.sum_data_type, Arc::new(sums))?;
609+
610+
Ok(vec![
611+
Arc::new(counts) as ArrayRef,
612+
Arc::new(sums) as ArrayRef,
613+
])
614+
}
615+
616+
fn size(&self) -> usize {
617+
self.counts.capacity() * std::mem::size_of::<u64>()
618+
+ self.sums.capacity() * std::mem::size_of::<T>()
619+
}
620+
}
621+
386622
#[cfg(test)]
387623
mod tests {
388624
use super::*;

0 commit comments

Comments
 (0)