Skip to content

Commit 40c57f8

Browse files
committed
apply tom patch
1 parent 118eecd commit 40c57f8

File tree

2 files changed

+55
-6
lines changed

2 files changed

+55
-6
lines changed

datafusion/optimizer/src/extract_equijoin_predicate.rs

+54-6
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,36 @@ impl OptimizerRule for ExtractEquijoinPredicate {
7676
right_schema,
7777
)?;
7878

79+
// If there are no equijoin predicates and no existing on then try a nullable join
80+
if equijoin_predicates.is_empty() && on.is_empty() {
81+
let (equinullable_predicates, non_equinullable_expr) =
82+
split_eq_and_noneq_join_predicate_nullable(
83+
expr,
84+
left_schema,
85+
right_schema,
86+
)?;
87+
if !equinullable_predicates.is_empty() {
88+
let optimized_plan = (!equinullable_predicates.is_empty())
89+
.then(|| {
90+
let mut new_on = on.clone();
91+
new_on.extend(equinullable_predicates);
92+
93+
LogicalPlan::Join(Join {
94+
left: left.clone(),
95+
right: right.clone(),
96+
on: new_on,
97+
filter: non_equinullable_expr,
98+
join_type: *join_type,
99+
join_constraint: *join_constraint,
100+
schema: schema.clone(),
101+
null_equals_null: true,
102+
})
103+
});
104+
105+
return Ok(optimized_plan);
106+
}
107+
}
108+
79109
let optimized_plan = (!equijoin_predicates.is_empty()).then(|| {
80110
let mut new_on = on.clone();
81111
new_on.extend(equijoin_predicates);
@@ -108,22 +138,19 @@ impl OptimizerRule for ExtractEquijoinPredicate {
108138
}
109139
}
110140

111-
fn split_eq_and_noneq_join_predicate(
141+
fn split_equality_join_predicate(
112142
filter: &Expr,
113143
left_schema: &Arc<DFSchema>,
114144
right_schema: &Arc<DFSchema>,
145+
match_bin_op: Operator,
115146
) -> Result<(Vec<EquijoinPredicate>, Option<Expr>)> {
116147
let exprs = split_conjunction(filter);
117148

118149
let mut accum_join_keys: Vec<(Expr, Expr)> = vec![];
119150
let mut accum_filters: Vec<Expr> = vec![];
120151
for expr in exprs {
121152
match expr {
122-
Expr::BinaryExpr(BinaryExpr {
123-
left,
124-
op: Operator::Eq,
125-
right,
126-
}) => {
153+
Expr::BinaryExpr(BinaryExpr { left, op, right }) if op == &match_bin_op => {
127154
let left = left.as_ref();
128155
let right = right.as_ref();
129156

@@ -155,6 +182,27 @@ fn split_eq_and_noneq_join_predicate(
155182
Ok((accum_join_keys, result_filter))
156183
}
157184

185+
fn split_eq_and_noneq_join_predicate(
186+
filter: &Expr,
187+
left_schema: &Arc<DFSchema>,
188+
right_schema: &Arc<DFSchema>,
189+
) -> Result<(Vec<EquijoinPredicate>, Option<Expr>)> {
190+
split_equality_join_predicate(filter, left_schema, right_schema, Operator::Eq)
191+
}
192+
193+
fn split_eq_and_noneq_join_predicate_nullable(
194+
filter: &Expr,
195+
left_schema: &Arc<DFSchema>,
196+
right_schema: &Arc<DFSchema>,
197+
) -> Result<(Vec<EquijoinPredicate>, Option<Expr>)> {
198+
split_equality_join_predicate(
199+
filter,
200+
left_schema,
201+
right_schema,
202+
Operator::IsNotDistinctFrom,
203+
)
204+
}
205+
158206
#[cfg(test)]
159207
mod tests {
160208
use super::*;

datafusion/substrait/tests/cases/roundtrip_logical_plan.rs

+1
Original file line numberDiff line numberDiff line change
@@ -934,6 +934,7 @@ async fn test_alias(sql_with_alias: &str, sql_no_alias: &str) -> Result<()> {
934934
async fn roundtrip_with_ctx(sql: &str, ctx: SessionContext) -> Result<()> {
935935
let df = ctx.sql(sql).await?;
936936
let plan = df.into_optimized_plan()?;
937+
println!("{plan:#?}");
937938
let proto = to_substrait_plan(&plan, &ctx)?;
938939
let plan2 = from_substrait_plan(&ctx, &proto).await?;
939940
let plan2 = ctx.state().optimize(&plan2)?;

0 commit comments

Comments
 (0)