diff --git a/datafusion/common/src/join_type.rs b/datafusion/common/src/join_type.rs index c77a1475ed227..de4e4dc84bb7e 100644 --- a/datafusion/common/src/join_type.rs +++ b/datafusion/common/src/join_type.rs @@ -61,9 +61,10 @@ pub enum JoinType { /// [1]. This join type is used to decorrelate EXISTS subqueries used inside disjunctive /// predicates. /// - /// Note: This we currently do not implement the full null semantics for the mark join described - /// in [1] which will be needed if we and ANY subqueries. In our version the mark column will - /// only be true for had a match and false when no match was found, never null. + /// For scalar `NOT IN`, DataFusion can plan a null-aware hash mark join where the + /// mark column is nullable: TRUE for a match, NULL for SQL UNKNOWN, and FALSE + /// otherwise. Row-valued multi-column `NOT IN` and non-hash residual predicate + /// null-aware mark semantics are not implemented. /// /// [1]: http://btw2017.informatik.uni-stuttgart.de/slidesandpapers/F1-10-37/paper_web.pdf LeftMark, diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 190a08da12222..c654c9bda6779 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -1626,6 +1626,12 @@ impl DefaultPhysicalPlanner { let prefer_hash_join = session_state.config_options().optimizer.prefer_hash_join; + // Null-aware joins are pinned to CollectLeft hash joins (see + // `HashJoinExec::null_aware`): never repartition them, and + // never route them to the sort-merge path below. + let can_repartition_join = session_state.config().target_partitions() > 1 + && session_state.config().repartition_joins() + && !*null_aware; // TODO: Allow PWMJ to deal with residual equijoin conditions let join: Arc = if join_on.is_empty() { @@ -1754,10 +1760,7 @@ impl DefaultPhysicalPlanner { None, )?) } - } else if session_state.config().target_partitions() > 1 - && session_state.config().repartition_joins() - && !prefer_hash_join - { + } else if can_repartition_join && !prefer_hash_join { // Use SortMergeJoin if hash join is not preferred let join_on_len = join_on.len(); Arc::new(SortMergeJoinExec::try_new( @@ -1769,24 +1772,15 @@ impl DefaultPhysicalPlanner { vec![SortOptions::default(); join_on_len], *null_equality, )?) - } else if session_state.config().target_partitions() > 1 - && session_state.config().repartition_joins() - && prefer_hash_join - && !*null_aware - // Null-aware joins must use CollectLeft - { - Arc::new(HashJoinExec::try_new( - physical_left, - physical_right, - join_on, - join_filter, - join_type, - None, - PartitionMode::Auto, - *null_equality, - *null_aware, - )?) } else { + // Null-aware joins need global probe-side state, so keep + // them in CollectLeft mode. + let partition_mode = if can_repartition_join { + PartitionMode::Auto + } else { + PartitionMode::CollectLeft + }; + Arc::new(HashJoinExec::try_new( physical_left, physical_right, @@ -1794,7 +1788,7 @@ impl DefaultPhysicalPlanner { join_filter, join_type, None, - PartitionMode::CollectLeft, + partition_mode, *null_equality, *null_aware, )?) @@ -3605,6 +3599,63 @@ mod tests { Ok(()) } + #[tokio::test] + async fn correlated_not_in_is_null_uses_null_aware_hash_mark_join() -> Result<()> { + let query = " + SELECT value + FROM ( + VALUES + (1, 1, 'a'), + (3, 1, 'b'), + (1, 2, 'c'), + (NULL, 1, 'd'), + (5, 3, 'e'), + (2, 1, 'f'), + (NULL, 2, 'g') + ) AS outer_corr_table(id, grp, value) + WHERE (id NOT IN ( + SELECT id + FROM ( + VALUES + (2, 1), + (NULL, 1), + (1, 2) + ) AS inner_corr_table(id, grp) + WHERE inner_corr_table.grp = outer_corr_table.grp + )) IS NULL + ORDER BY value"; + + let config = SessionConfig::new() + .with_target_partitions(4) + .set_bool("datafusion.optimizer.prefer_hash_join", false); + let ctx = SessionContext::new_with_config(config); + + let plan = ctx.sql(query).await?.create_physical_plan().await?; + let formatted = displayable(plan.as_ref()).indent(true).to_string(); + assert_contains!( + &formatted, + "HashJoinExec: mode=CollectLeft, join_type=LeftMark" + ); + assert!(!formatted.contains("SortMergeJoinExec"), "{formatted}"); + + let batches = ctx.sql(query).await?.collect().await?; + assert_batches_eq!( + &[ + "+-------+", + "| value |", + "+-------+", + "| a |", + "| b |", + "| d |", + "| g |", + "+-------+", + ], + &batches + ); + + Ok(()) + } + #[tokio::test] async fn scalar_subquery_in_projection_and_filter_plans() -> Result<()> { let plan = plan_sql( diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 2ecb12c30afad..d937e8827a825 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -1154,8 +1154,12 @@ impl LogicalPlanBuilder { .zip(right_keys) .map(|(l, r)| (Expr::Column(l), Expr::Column(r))) .collect(); - let join_schema = - build_join_schema(self.plan.schema(), right.schema(), &join_type)?; + let join_schema = build_join_schema( + self.plan.schema(), + right.schema(), + &join_type, + null_aware, + )?; // Inner type without join condition is cross join if join_type != JoinType::Inner && on.is_empty() && filter.is_none() { @@ -1652,7 +1656,7 @@ pub fn unique_field_aliases(fields: &Fields) -> Vec> { .collect() } -fn mark_field(schema: &DFSchema) -> (Option, Arc) { +fn mark_field(schema: &DFSchema, nullable: bool) -> (Option, Arc) { let mut table_references = schema .iter() .filter_map(|(qualifier, _)| qualifier) @@ -1666,16 +1670,21 @@ fn mark_field(schema: &DFSchema) -> (Option, Arc) { ( table_reference, - Arc::new(Field::new("mark", DataType::Boolean, false)), + Arc::new(Field::new("mark", DataType::Boolean, nullable)), ) } /// Creates a schema for a join operation. -/// The fields from the left side are first +/// The fields from the left side are first. +/// +/// When `null_aware` is set, the `LeftMark`/`RightMark` `mark` column is made +/// nullable so it can represent SQL UNKNOWN for null-aware `NOT IN` semantics. +/// `null_aware` has no effect on non-mark join types. pub fn build_join_schema( left: &DFSchema, right: &DFSchema, join_type: &JoinType, + null_aware: bool, ) -> Result { fn nullify_fields<'a>( fields: impl Iterator, &'a Arc)>, @@ -1738,7 +1747,7 @@ pub fn build_join_schema( } JoinType::LeftMark => left_fields .map(|(q, f)| (q.cloned(), Arc::clone(f))) - .chain(once(mark_field(right))) + .chain(once(mark_field(right, null_aware))) .collect(), JoinType::RightSemi | JoinType::RightAnti => { // Only use the right side for the schema @@ -1748,7 +1757,7 @@ pub fn build_join_schema( } JoinType::RightMark => right_fields .map(|(q, f)| (q.cloned(), Arc::clone(f))) - .chain(once(mark_field(left))) + .chain(once(mark_field(left, null_aware))) .collect(), }; let func_dependencies = left.functional_dependencies().join( @@ -2912,13 +2921,13 @@ mod tests { )?; let join_schema = - build_join_schema(&left_schema, &right_schema, &JoinType::Left)?; + build_join_schema(&left_schema, &right_schema, &JoinType::Left, false)?; assert_eq!( join_schema.metadata(), &HashMap::from([("key".to_string(), "left".to_string())]) ); let join_schema = - build_join_schema(&left_schema, &right_schema, &JoinType::Right)?; + build_join_schema(&left_schema, &right_schema, &JoinType::Right, false)?; assert_eq!( join_schema.metadata(), &HashMap::from([("key".to_string(), "right".to_string())]) diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 9ca6941a61ce6..4aad4ce96b24d 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -667,8 +667,12 @@ impl LogicalPlan { null_equality, null_aware, }) => { - let schema = - build_join_schema(left.schema(), right.schema(), &join_type)?; + let schema = build_join_schema( + left.schema(), + right.schema(), + &join_type, + null_aware, + )?; let new_on: Vec<_> = on .into_iter() @@ -944,7 +948,12 @@ impl LogicalPlan { .. }) => { let (left, right) = self.only_two_inputs(inputs)?; - let schema = build_join_schema(left.schema(), right.schema(), join_type)?; + let schema = build_join_schema( + left.schema(), + right.schema(), + join_type, + *null_aware, + )?; let equi_expr_count = on.len() * 2; assert!(expr.len() >= equi_expr_count); @@ -4228,13 +4237,13 @@ pub struct Join { pub schema: DFSchemaRef, /// Defines the null equality for the join. pub null_equality: NullEquality, - /// Whether this is a null-aware anti join (for NOT IN semantics). + /// Whether this join needs null-aware NOT IN semantics. /// - /// Only applies to LeftAnti joins. When true, implements SQL NOT IN semantics where: - /// - If the right side (subquery) contains any NULL in join keys, no rows are output - /// - Left side rows with NULL in join keys are not output + /// For `LeftAnti`, if the right side contains any NULL in join keys, no rows are output and + /// left rows with NULL join keys are also excluded. /// - /// This is required for correct NOT IN subquery behavior with three-valued logic. + /// For `LeftMark`, the generated `mark` column becomes nullable so unmatched rows can produce + /// `NULL` rather than `false` when SQL three-valued logic requires it. pub null_aware: bool, } @@ -4253,7 +4262,7 @@ impl Join { /// * `join_type` - Type of join (Inner, Left, Right, etc.) /// * `join_constraint` - Join constraint (On, Using) /// * `null_equality` - How to handle nulls in join comparisons - /// * `null_aware` - Whether this is a null-aware anti join (for NOT IN semantics) + /// * `null_aware` - Whether this join needs null-aware NOT IN semantics /// /// # Returns /// @@ -4269,7 +4278,8 @@ impl Join { null_equality: NullEquality, null_aware: bool, ) -> Result { - let join_schema = build_join_schema(left.schema(), right.schema(), &join_type)?; + let join_schema = + build_join_schema(left.schema(), right.schema(), &join_type, null_aware)?; Ok(Join { left, @@ -4324,6 +4334,7 @@ impl Join { left_sch.schema(), right_sch.schema(), &original_join.join_type, + original_join.null_aware, )?; Ok(( diff --git a/datafusion/optimizer/src/decorrelate_predicate_subquery.rs b/datafusion/optimizer/src/decorrelate_predicate_subquery.rs index 0609109ec6e58..e954598c80f06 100644 --- a/datafusion/optimizer/src/decorrelate_predicate_subquery.rs +++ b/datafusion/optimizer/src/decorrelate_predicate_subquery.rs @@ -21,6 +21,7 @@ use std::ops::Deref; use std::sync::Arc; use crate::decorrelate::PullUpCorrelatedExpr; +use crate::extract_equijoin_predicate::split_eq_and_noneq_join_predicate; use crate::optimizer::ApplyOrder; use crate::utils::replace_qualified_name; use crate::{OptimizerConfig, OptimizerRule}; @@ -36,8 +37,8 @@ use datafusion_expr::expr_rewriter::create_col_from_scalar_expr; use datafusion_expr::logical_plan::{JoinType, Subquery}; use datafusion_expr::utils::{conjunction, expr_to_columns, split_conjunction_owned}; use datafusion_expr::{ - BinaryExpr, Expr, Filter, LogicalPlan, LogicalPlanBuilder, Operator, exists, - in_subquery, lit, not, not_exists, not_in_subquery, + BinaryExpr, Expr, ExprSchemable, Filter, LogicalPlan, LogicalPlanBuilder, Operator, + exists, in_subquery, lit, not, not_exists, not_in_subquery, }; use log::debug; @@ -354,6 +355,31 @@ fn join_keys_may_be_null( Ok(false) } +/// Check whether the value keys of a scalar `IN`/`NOT IN` predicate may +/// produce NULLs. +/// +/// Unlike [`join_keys_may_be_null`], this only examines the in-predicate's two +/// sides: SQL UNKNOWN can only arise from a NULL in the compared values. A +/// NULL in a correlation key merely makes the correlated subquery empty for +/// that outer row, which `IN`/`NOT IN` resolve to FALSE/TRUE, never UNKNOWN. +fn in_predicate_values_may_be_null( + in_predicate: &Expr, + left_schema: &DFSchemaRef, + right_schema: &DFSchemaRef, +) -> Result { + let Expr::BinaryExpr(BinaryExpr { + left, + op: Operator::Eq, + right, + }) = in_predicate + else { + // Unexpected shape: conservatively assume the values may be NULL. + return Ok(true); + }; + + Ok(left.nullable(left_schema.as_ref())? || right.nullable(right_schema.as_ref())?) +} + fn build_join( left: &LogicalPlan, subquery: &LogicalPlan, @@ -385,34 +411,32 @@ fn build_join( replace_qualified_name(filter, &all_correlated_cols, &alias).map(Some) })?; - let join_filter = match (join_filter_opt, in_predicate_opt.cloned()) { - ( - Some(join_filter), - Some(Expr::BinaryExpr(BinaryExpr { - left, - op: Operator::Eq, - right, - })), - ) => { + // Rewrite the scalar in-predicate's right side to reference the aliased + // subquery output (e.g. `outer.id = __correlated_sq_1.id`). `None` when + // there is no in-predicate or it is not a plain equality. + let aliased_in_predicate = match in_predicate_opt { + Some(Expr::BinaryExpr(BinaryExpr { + left, + op: Operator::Eq, + right, + })) => { let right_col = create_col_from_scalar_expr(right.deref(), alias)?; - let in_predicate = Expr::eq(left.deref().clone(), Expr::Column(right_col)); - in_predicate.and(join_filter) + Some(Expr::eq(left.deref().clone(), Expr::Column(right_col))) } - (Some(join_filter), _) => join_filter, - ( - _, - Some(Expr::BinaryExpr(BinaryExpr { - left, - op: Operator::Eq, - right, - })), - ) => { - let right_col = create_col_from_scalar_expr(right.deref(), alias)?; + _ => None, + }; - Expr::eq(left.deref().clone(), Expr::Column(right_col)) - } + let join_filter = match (join_filter_opt, &aliased_in_predicate) { + // Keep the in-predicate as the first conjunct: equijoin extraction + // converts conjuncts into join keys in order, and null-aware mark + // execution requires `on[0]` to be the scalar NOT IN value key (see + // `HashJoinExec::null_aware`). + (Some(join_filter), Some(in_predicate)) => in_predicate.clone().and(join_filter), + (Some(join_filter), None) => join_filter, + (None, Some(in_predicate)) => in_predicate.clone(), + // A non-equality in-predicate cannot become a join filter on its own. + (None, None) if in_predicate_opt.is_some() => return Ok(None), (None, None) => lit(true), - _ => return Ok(None), }; if matches!(join_type, JoinType::LeftMark | JoinType::RightMark) { @@ -448,9 +472,41 @@ fn build_join( sub_query_alias.clone() }; - // Mark joins don't use null-aware semantics (they use three-valued logic with mark column) + // For scalar NOT IN mark joins, propagate null-aware semantics into the + // nullable mark column when the predicate can be implemented by hash keys. + // Non-equality correlated filters stay on the legacy path because hash join + // execution cannot mark UNKNOWN candidates for residual predicates. + // Only the in-predicate's value keys decide nullability: NULLs in + // correlation keys can never make `IN`/`NOT IN` evaluate to UNKNOWN. + let null_aware = if join_type == JoinType::LeftMark && in_predicate_opt.is_some() + { + let (_, residual_filter) = split_eq_and_noneq_join_predicate( + join_filter.clone(), + left.schema(), + right_projected.schema(), + )?; + residual_filter.is_none() + && match &aliased_in_predicate { + Some(in_predicate) => in_predicate_values_may_be_null( + in_predicate, + left.schema(), + right_projected.schema(), + )?, + None => false, + } + } else { + false + }; + let new_plan = LogicalPlanBuilder::from(left.clone()) - .join_on(right_projected, join_type, Some(join_filter))? + .join_detailed_with_options( + right_projected, + join_type, + (Vec::::new(), Vec::::new()), + Some(join_filter), + NullEquality::NullEqualsNothing, + null_aware, + )? .build()?; debug!( @@ -571,6 +627,28 @@ mod tests { )) } + fn has_left_mark_join_with_null_aware( + plan: &LogicalPlan, + expected_null_aware: bool, + ) -> bool { + if let LogicalPlan::Join(join) = plan + && join.join_type == JoinType::LeftMark + { + return join.null_aware == expected_null_aware; + } + + plan.inputs() + .into_iter() + .any(|input| has_left_mark_join_with_null_aware(input, expected_null_aware)) + } + + fn optimize_with_decorrelate(plan: LogicalPlan) -> Result { + let optimizer = crate::Optimizer::with_rules(vec![Arc::new( + DecorrelatePredicateSubquery::new(), + )]); + optimizer.optimize(plan, &crate::OptimizerContext::new(), |_, _| {}) + } + /// Test for several IN subquery expressions #[test] fn in_subquery_multiple() -> Result<()> { @@ -1201,6 +1279,66 @@ mod tests { ) } + /// Builds `... WHERE (id NOT IN (SELECT id FROM inner WHERE )) + /// IS NULL` over scans whose value key `id` has the given nullability (the + /// correlation key `grp` is always nullable), optimizes it, and asserts the + /// resulting `LeftMark` join's `null_aware` flag. + fn assert_correlated_not_in_mark_join_null_aware( + value_key_nullable: bool, + hashable_filter: bool, + expected_null_aware: bool, + ) -> Result<()> { + let schema = Schema::new(vec![ + Field::new("id", DataType::Int32, value_key_nullable), + Field::new("grp", DataType::Int32, true), + ]); + let outer_scan = table_scan(Some("outer_t"), &schema, None)?.build()?; + let inner_scan = table_scan(Some("inner_t"), &schema, None)?.build()?; + + let outer_grp = out_ref_col(DataType::Int32, "outer_t.grp"); + let inner_grp = col("inner_t.grp"); + let grp_filter = if hashable_filter { + outer_grp.eq(inner_grp) + } else { + outer_grp.lt(inner_grp) + }; + + let subquery = Arc::new( + LogicalPlanBuilder::from(inner_scan) + .filter(grp_filter)? + .project(vec![col("inner_t.id")])? + .build()?, + ); + + let plan = LogicalPlanBuilder::from(outer_scan) + .filter(not_in_subquery(col("outer_t.id"), subquery).is_null())? + .build()?; + + let optimized = optimize_with_decorrelate(plan)?; + assert!( + has_left_mark_join_with_null_aware(&optimized, expected_null_aware), + "{}", + optimized.display_indent_schema() + ); + + Ok(()) + } + + #[test] + fn correlated_not_in_mark_join_null_awareness() -> Result<()> { + // Hashable (`=`) correlation filter on a nullable value key: NOT IN can + // be UNKNOWN, so the mark join is null-aware. + assert_correlated_not_in_mark_join_null_aware(true, true, true)?; + // Residual (`<`) correlation filter: the equijoin predicate can't be + // extracted, so the mark join is not null-aware. + assert_correlated_not_in_mark_join_null_aware(true, false, false)?; + // Non-nullable value key `id`: a NULL correlation key just makes the + // subquery empty, so NOT IN can never be UNKNOWN and the join is not + // null-aware. + assert_correlated_not_in_mark_join_null_aware(false, true, false)?; + Ok(()) + } + #[test] fn in_subquery_both_side_expr() -> Result<()> { let table_scan = test_table_scan()?; diff --git a/datafusion/optimizer/src/eliminate_cross_join.rs b/datafusion/optimizer/src/eliminate_cross_join.rs index 95b70da443d88..6bd6356db4609 100644 --- a/datafusion/optimizer/src/eliminate_cross_join.rs +++ b/datafusion/optimizer/src/eliminate_cross_join.rs @@ -374,6 +374,7 @@ fn find_inner_join( left_input.schema(), right_input.schema(), &JoinType::Inner, + false, )?); return Ok(LogicalPlan::Join(Join { @@ -397,6 +398,7 @@ fn find_inner_join( left_input.schema(), right.schema(), &JoinType::Inner, + false, )?); Ok(LogicalPlan::Join(Join { @@ -1402,6 +1404,7 @@ mod tests { t1.schema(), t2.schema(), &JoinType::Inner, + false, )?); let inner_join = LogicalPlan::Join(Join { diff --git a/datafusion/optimizer/src/extract_equijoin_predicate.rs b/datafusion/optimizer/src/extract_equijoin_predicate.rs index 0a50761e8a9f7..03a5aaa20402c 100644 --- a/datafusion/optimizer/src/extract_equijoin_predicate.rs +++ b/datafusion/optimizer/src/extract_equijoin_predicate.rs @@ -174,7 +174,7 @@ impl OptimizerRule for ExtractEquijoinPredicate { /// /// According to the above rule, `expr1` is the equijoin predicate, while `expr2` and `expr3` are not. /// The function returns Ok(\[expr1\], Some(expr2 AND expr3)) -fn split_eq_and_noneq_join_predicate( +pub(crate) fn split_eq_and_noneq_join_predicate( filter: Expr, left_schema: &DFSchema, right_schema: &DFSchema, diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs b/datafusion/optimizer/src/optimize_projections/mod.rs index acdbf71d05d5c..51ccf65f5fbc8 100644 --- a/datafusion/optimizer/src/optimize_projections/mod.rs +++ b/datafusion/optimizer/src/optimize_projections/mod.rs @@ -1024,7 +1024,8 @@ mod tests { let left_schema = left_child.schema(); let right_schema = right_child.schema(); let schema = Arc::new( - build_join_schema(left_schema, right_schema, &JoinType::Inner).unwrap(), + build_join_schema(left_schema, right_schema, &JoinType::Inner, false) + .unwrap(), ); Self { exprs: vec![], diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index b1d387ea74557..2ff16ab92b65c 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -65,7 +65,7 @@ use crate::{ metrics::{ExecutionPlanMetricsSet, MetricsSet}, }; -use arrow::array::{ArrayRef, BooleanBufferBuilder}; +use arrow::array::{ArrayRef, BooleanBufferBuilder, UInt64Array}; use arrow::compute::concat_batches; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; @@ -88,7 +88,7 @@ use datafusion_physical_expr::expressions::{Column, DynamicFilterPhysicalExpr, l use datafusion_physical_expr::projection::{ProjectionRef, combine_projections}; use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef}; -use datafusion_common::hash_utils::RandomState; +use datafusion_common::hash_utils::{RandomState, create_hashes}; use datafusion_physical_expr_common::physical_expr::fmt_sql; use datafusion_physical_expr_common::utils::evaluate_expressions_to_arrays; use futures::TryStreamExt; @@ -185,17 +185,57 @@ fn try_create_array_map( Ok(Some((array_map, batch, left_values))) } +/// Correlation-scope hash map over only the build rows whose scalar `NOT IN` +/// value key is NULL, used by correlated null-aware `LeftMark` joins. +/// +/// Such rows produce a NULL (UNKNOWN) mark whenever *any* probe row shares +/// their correlation scope, so every probe row must be tested against them. +/// Restricting this map to the NULL-valued build rows keeps that lookup +/// proportional to the number of NULLs instead of enumerating every scope +/// match of every probe row. +pub(super) struct NullValueScopeMap { + /// Hash table keyed by the correlation scope values of the NULL-valued + /// build rows. Stored positions index into `scope_values`/`build_indices`, + /// not the full build batch. + pub(super) map: Box, + /// Correlation scope key values of the NULL-valued build rows. + pub(super) scope_values: Vec, + /// Maps positions in `map`/`scope_values` back to row indices in the full + /// build batch. + pub(super) build_indices: UInt64Array, +} + +/// Extra build-side state for correlated null-aware `LeftMark` joins. +/// +/// Key layout: `on[0]` is the scalar `NOT IN` value key, `on[1..]` the +/// correlation scope keys; see the `null_aware` field of `HashJoinExec`. +pub(super) struct NullAwareMarkState { + /// Hash table keyed only by the correlation scope keys, covering all build + /// rows. Probed only with NULL-valued probe rows; the complementary + /// direction uses `null_value_scope_map`. + pub(super) scope_map: Box, + /// Scope map restricted to the build rows whose value key is NULL (see + /// [`NullValueScopeMap`]). `None` when the build side has no NULL value + /// keys. + pub(super) null_value_scope_map: Option, +} + /// HashTable and input data for the left (build side) of a join pub(super) struct JoinLeftData { /// The hash table with indices into `batch` /// Arc is used to allow sharing with SharedBuildAccumulator for hash map pushdown pub(super) map: Arc, + /// Extra build-side state for correlated null-aware `LeftMark` joins (see + /// [`NullAwareMarkState`]); `None` for all other joins. + null_aware_state: Option, /// The input rows for the build side batch: RecordBatch, /// The build side on expressions values values: Vec, /// Shared bitmap builder for visited left indices visited_indices_bitmap: SharedBitmapBuilder, + /// Shared bitmap builder for null marks + null_indices_bitmap: SharedBitmapBuilder, /// Counter of running probe-threads, potentially /// able to update `visited_indices_bitmap` probe_threads_counter: AtomicUsize, @@ -211,7 +251,7 @@ pub(super) struct JoinLeftData { /// Membership testing strategy for filter pushdown /// Contains either InList values for small build sides or hash table reference for large build sides pub(super) membership: PushdownStrategy, - /// Shared atomic flag indicating if any probe partition saw data (for null-aware anti joins) + /// Shared atomic flag indicating if any probe partition saw data (for null-aware anti/mark joins) /// This is shared across all probe partitions to provide global knowledge pub(super) probe_side_non_empty: AtomicBool, /// Shared atomic flag indicating if any probe partition saw NULL in join keys (for null-aware anti joins) @@ -224,6 +264,10 @@ impl JoinLeftData { &self.map } + pub(super) fn null_aware_state(&self) -> Option<&NullAwareMarkState> { + self.null_aware_state.as_ref() + } + /// returns a reference to the build side batch pub(super) fn batch(&self) -> &RecordBatch { &self.batch @@ -256,6 +300,10 @@ impl JoinLeftData { &self.visited_indices_bitmap } + pub(super) fn null_indices_bitmap(&self) -> &SharedBitmapBuilder { + &self.null_indices_bitmap + } + /// returns a reference to the InList values for filter pushdown pub(super) fn membership(&self) -> &PushdownStrategy { &self.membership @@ -421,18 +469,26 @@ impl HashJoinExecBuilder { // Validate null_aware flag if exec.null_aware { let join_type = exec.join_type(); - if !matches!(join_type, JoinType::LeftAnti) { + if !matches!(join_type, JoinType::LeftAnti | JoinType::LeftMark) { return plan_err!( - "null_aware can only be true for LeftAnti joins, got {join_type}" + "null_aware can only be true for LeftAnti or LeftMark joins, got {join_type}" ); } let on = exec.on(); - if on.len() != 1 { + if *join_type == JoinType::LeftAnti && on.len() != 1 { return plan_err!( - "null_aware anti join only supports single column join key, got {} columns", + "null_aware LeftAnti joins only support single column join key, got {} columns", on.len() ); } + // Null-aware joins need global probe-side state (and a single build-side scope + // map for correlated LeftMark), so Partitioned would miss cross-partition rows + // and is forbidden. + if matches!(exec.partition_mode(), PartitionMode::Partitioned) { + return plan_err!( + "null_aware joins require PartitionMode::CollectLeft, got PartitionMode::Partitioned" + ); + } } if preserve_properties { @@ -468,7 +524,7 @@ impl HashJoinExecBuilder { check_join_is_valid(&left_schema, &right_schema, &on)?; let (join_schema, column_indices) = - build_join_schema(&left_schema, &right_schema, &join_type); + build_join_schema(&left_schema, &right_schema, &join_type, null_aware); let join_schema = Arc::new(join_schema); @@ -764,7 +820,19 @@ pub struct HashJoinExec { column_indices: Vec, /// The equality null-handling behavior of the join algorithm. pub null_equality: NullEquality, - /// Flag to indicate if this is a null-aware anti join + /// Flag to indicate if this join uses null-aware equality semantics. + /// + /// Set for the physical lowering of scalar `NOT IN` subqueries (producing + /// `JoinType::LeftAnti` when uncorrelated or `JoinType::LeftMark` when + /// correlated). When `true`, NULLs in the join keys follow SQL `NOT IN` + /// three-valued logic rather than ordinary equi-join semantics. + /// + /// Key-ordering convention (relied on positionally, not enforced): for a + /// null-aware `LeftMark` join with more than one key, `on[0]` is the scalar + /// `NOT IN` value key and `on[1..N]` are the correlated equality scope keys. + /// Reordering these keys would silently produce wrong results, which is why + /// such joins are pinned to `PartitionMode::CollectLeft` (the only key + /// reorderer acts solely on `PartitionMode::Partitioned`). pub null_aware: bool, /// Cache holding plan properties like equivalences, output partitioning etc. cache: Arc, @@ -1370,6 +1438,11 @@ impl ExecutionPlan for HashJoinExec { .flatten() .flatten(); + // The extra scope maps + null bitmap are only built for correlated + // null-aware LeftMark joins (`on[1..]` are correlation scope keys). + let with_null_aware_mark_state = + self.null_aware && self.join_type == JoinType::LeftMark && on_left.len() > 1; + let left_fut = match self.mode { PartitionMode::CollectLeft => self.left_fut.try_once(|| { let left_stream = self.left.execute(0, Arc::clone(&context))?; @@ -1389,6 +1462,7 @@ impl ExecutionPlan for HashJoinExec { Arc::clone(context.session_config().options()), self.null_equality, array_map_created_count, + with_null_aware_mark_state, )) })?, PartitionMode::Partitioned => { @@ -1409,6 +1483,9 @@ impl ExecutionPlan for HashJoinExec { Arc::clone(context.session_config().options()), self.null_equality, array_map_created_count, + // Partitioned mode is rejected for null-aware joins (see + // `try_new`), so the extra null-aware state is never built. + false, )) } PartitionMode::Auto => { @@ -1911,6 +1988,29 @@ fn should_collect_min_max_for_perfect_hash( Ok(ArrayMap::is_supported_type(&data_type)) } +fn new_join_hashmap( + num_rows: usize, + reservation: &mut MemoryReservation, + metrics: &BuildProbeJoinMetrics, +) -> Result> { + let fixed_size_u32 = size_of::(); + let fixed_size_u64 = size_of::(); + + if num_rows > u32::MAX as usize { + let estimated_hashtable_size = + estimate_memory_size::<(u64, u64)>(num_rows, fixed_size_u64)?; + reservation.try_grow(estimated_hashtable_size)?; + metrics.build_mem_used.add(estimated_hashtable_size); + Ok(Box::new(JoinHashMapU64::with_capacity(num_rows))) + } else { + let estimated_hashtable_size = + estimate_memory_size::<(u32, u64)>(num_rows, fixed_size_u32)?; + reservation.try_grow(estimated_hashtable_size)?; + metrics.build_mem_used.add(estimated_hashtable_size); + Ok(Box::new(JoinHashMapU32::with_capacity(num_rows))) + } +} + /// Collects all batches from the left (build) side stream and creates a hash map for joining. /// /// This function is responsible for: @@ -1928,6 +2028,8 @@ fn should_collect_min_max_for_perfect_hash( /// * `with_visited_indices_bitmap` - Whether to track visited indices (for outer joins) /// * `probe_threads_count` - Number of threads that will probe this hash table /// * `should_compute_dynamic_filters` - Whether to compute min/max bounds for dynamic filtering +/// * `with_null_aware_mark_state` - Whether to build the per-build-row null-indices bitmap +/// and correlation-scope maps used by correlated null-aware `LeftMark` joins /// /// # Dynamic Filter Coordination /// When `should_compute_dynamic_filters` is true, this function computes the min/max bounds @@ -1952,6 +2054,7 @@ async fn collect_left_input( config: Arc, null_equality: NullEquality, array_map_created_count: Count, + with_null_aware_mark_state: bool, ) -> Result { let schema = left_stream.schema(); @@ -2031,25 +2134,10 @@ async fn collect_left_input( } else { // Estimation of memory size, required for hashtable, prior to allocation. // Final result can be verified using `RawTable.allocation_info()` - let fixed_size_u32 = size_of::(); - let fixed_size_u64 = size_of::(); - // Use `u32` indices for the JoinHashMap when num_rows ≤ u32::MAX, otherwise use the // `u64` indice variant // Arc is used instead of Box to allow sharing with SharedBuildAccumulator for hash map pushdown - let mut hashmap: Box = if num_rows > u32::MAX as usize { - let estimated_hashtable_size = - estimate_memory_size::<(u64, u64)>(num_rows, fixed_size_u64)?; - reservation.try_grow(estimated_hashtable_size)?; - metrics.build_mem_used.add(estimated_hashtable_size); - Box::new(JoinHashMapU64::with_capacity(num_rows)) - } else { - let estimated_hashtable_size = - estimate_memory_size::<(u32, u64)>(num_rows, fixed_size_u32)?; - reservation.try_grow(estimated_hashtable_size)?; - metrics.build_mem_used.add(estimated_hashtable_size); - Box::new(JoinHashMapU32::with_capacity(num_rows)) - }; + let mut hashmap = new_join_hashmap(num_rows, &mut reservation, &metrics)?; let mut hashes_buffer = Vec::new(); let mut offset = 0; @@ -2082,19 +2170,90 @@ async fn collect_left_input( (Map::HashMap(hashmap), batch, left_values) }; - // Reserve additional memory for visited indices bitmap and create shared builder - let visited_indices_bitmap = if with_visited_indices_bitmap { + let allocate_bitmap = || -> Result { let bitmap_size = bit_util::ceil(batch.num_rows(), 8); reservation.try_grow(bitmap_size)?; metrics.build_mem_used.add(bitmap_size); - let mut bitmap_buffer = BooleanBufferBuilder::new(batch.num_rows()); - bitmap_buffer.append_n(num_rows, false); - bitmap_buffer + let mut bitmap = BooleanBufferBuilder::new(batch.num_rows()); + bitmap.append_n(num_rows, false); + Ok(bitmap) + }; + + // Reserve additional memory for visited indices bitmap and create shared builder + let visited_indices_bitmap = if with_visited_indices_bitmap { + allocate_bitmap()? } else { BooleanBufferBuilder::new(0) }; + let null_indices_bitmap = if with_null_aware_mark_state { + allocate_bitmap()? + } else { + BooleanBufferBuilder::new(0) + }; + + let null_aware_state = if with_null_aware_mark_state { + // Per the key layout on `HashJoinExec::null_aware`, the scope map + // needs more than one key. + debug_assert!( + on_left.len() > 1, + "null-aware LeftMark needs on_left[0]=value, on_left[1..]=scope, got {} key(s)", + on_left.len() + ); + // Scope-only NULL marking uses a HashMap (the primary join map may use + // ArrayMap for full-key matches, but scope keys have arbitrary shape). + let mut scope_map = new_join_hashmap(num_rows, &mut reservation, &metrics)?; + + let mut hashes_buffer = vec![0; batch.num_rows()]; + update_hash( + &on_left[1..], + &batch, + &mut *scope_map, + 0, + &random_state, + &mut hashes_buffer, + 0, + true, + NullEquality::NullEqualsNothing, + )?; + + // Build the dedicated scope map over the NULL-valued build rows (see + // `NullValueScopeMap`). + let value_key = &left_values[0]; + let null_value_scope_map = if value_key.null_count() > 0 { + let null_mask = arrow::compute::is_null(value_key.as_ref())?; + let build_indices = UInt64Array::from_iter_values( + null_mask.values().set_indices().map(|i| i as u64), + ); + let scope_values = left_values[1..] + .iter() + .map(|values| Ok(arrow::compute::filter(values.as_ref(), &null_mask)?)) + .collect::>>()?; + + let null_rows = build_indices.len(); + let mut map = new_join_hashmap(null_rows, &mut reservation, &metrics)?; + let mut hashes_buffer = vec![0; null_rows]; + create_hashes(&scope_values, &random_state, &mut hashes_buffer)?; + map.update_from_iter(Box::new(hashes_buffer.iter().enumerate().rev()), 0); + + Some(NullValueScopeMap { + map, + scope_values, + build_indices, + }) + } else { + None + }; + + Some(NullAwareMarkState { + scope_map, + null_value_scope_map, + }) + } else { + None + }; + let map = Arc::new(join_hash_map); let membership = if num_rows == 0 { @@ -2129,9 +2288,11 @@ async fn collect_left_input( let data = JoinLeftData { map, + null_aware_state, batch, values: left_values, visited_indices_bitmap: Mutex::new(visited_indices_bitmap), + null_indices_bitmap: Mutex::new(null_indices_bitmap), probe_threads_counter: AtomicUsize::new(probe_threads_count), _reservation: reservation, bounds, @@ -6500,7 +6661,7 @@ mod tests { Ok(()) } - /// Test that null_aware validation rejects non-LeftAnti join types + /// Test that null_aware validation rejects unsupported join types #[tokio::test] async fn test_null_aware_validation_wrong_join_type() { let left = @@ -6531,7 +6692,7 @@ mod tests { result .unwrap_err() .to_string() - .contains("null_aware can only be true for LeftAnti joins") + .contains("null_aware can only be true for LeftAnti or LeftMark joins") ); } @@ -6566,15 +6727,351 @@ mod tests { true, // null_aware = true (invalid for multi-column) ); + assert!(result.is_err()); + assert!( + result.unwrap_err().to_string().contains( + "null_aware LeftAnti joins only support single column join key" + ) + ); + } + + /// Test that null_aware validation rejects PartitionMode::Partitioned (must be CollectLeft). + #[tokio::test] + async fn test_null_aware_validation_partitioned() { + let left = build_table(("a", &vec![1]), ("b", &vec![2]), ("c", &vec![3])); + let right = build_table(("x", &vec![1]), ("y", &vec![2]), ("z", &vec![3])); + + let on = vec![ + ( + Arc::new(Column::new_with_schema("a", &left.schema()).unwrap()) as _, + Arc::new(Column::new_with_schema("x", &right.schema()).unwrap()) as _, + ), + ( + Arc::new(Column::new_with_schema("b", &left.schema()).unwrap()) as _, + Arc::new(Column::new_with_schema("y", &right.schema()).unwrap()) as _, + ), + ]; + + // LeftMark (not LeftAnti) so the single-key check is skipped and the partition-mode check is reached. + let result = HashJoinExec::try_new( + left, + right, + on, + None, + &JoinType::LeftMark, + None, + PartitionMode::Partitioned, + NullEquality::NullEqualsNothing, + true, // null_aware = true (invalid with Partitioned) + ); + assert!(result.is_err()); assert!( result .unwrap_err() .to_string() - .contains("null_aware anti join only supports single column join key") + .contains("null_aware joins require PartitionMode::CollectLeft") ); } + /// Builds a null-aware `LeftMark` `HashJoinExec` on `on_names` key pairs + /// (`on_names[0]` is the `NOT IN` value key, the rest are correlation + /// scope keys), executes partition 0, and collects the result batches. + async fn null_aware_mark_join_collect( + batch_size: usize, + left: Arc, + right: Arc, + on_names: &[(&str, &str)], + ) -> Result> { + let task_ctx = prepare_task_ctx(batch_size, false); + + let mut on: JoinOn = vec![]; + for &(left_name, right_name) in on_names { + on.push(( + Arc::new(Column::new_with_schema(left_name, &left.schema())?) as _, + Arc::new(Column::new_with_schema(right_name, &right.schema())?) as _, + )); + } + + let join = HashJoinExec::try_new( + left, + right, + on, + None, + &JoinType::LeftMark, + None, + PartitionMode::CollectLeft, + NullEquality::NullEqualsNothing, + true, // null_aware + )?; + + let stream = join.execute(0, task_ctx)?; + common::collect(stream).await + } + + /// Test null-aware left mark join when probe side contains NULL. + /// Expected: + /// - matched rows => true + /// - unmatched non-NULL rows => NULL + /// - NULL build keys with non-empty probe side => NULL + #[apply(hash_join_exec_configs)] + #[tokio::test] + async fn test_null_aware_left_mark_probe_null(batch_size: usize) -> Result<()> { + let left = build_table_two_cols( + ("c1", &vec![Some(1), Some(4), None]), + ("dummy", &vec![Some(10), Some(40), Some(0)]), + ); + + let right = build_table_two_cols( + ("c2", &vec![Some(1), Some(2), None]), + ("dummy", &vec![Some(100), Some(200), Some(300)]), + ); + + let batches = + null_aware_mark_join_collect(batch_size, left, right, &[("c1", "c2")]) + .await?; + + allow_duplicates! { + assert_snapshot!(batches_to_sort_string(&batches), @r" + +----+-------+------+ + | c1 | dummy | mark | + +----+-------+------+ + | | 0 | | + | 1 | 10 | true | + | 4 | 40 | | + +----+-------+------+ + "); + } + + Ok(()) + } + + /// Test null-aware left mark join when probe side is empty. + /// Expected: all rows are marked false, including NULL build keys. + #[apply(hash_join_exec_configs)] + #[tokio::test] + async fn test_null_aware_left_mark_empty_probe(batch_size: usize) -> Result<()> { + let left = build_table_two_cols( + ("c1", &vec![Some(1), None]), + ("dummy", &vec![Some(10), Some(0)]), + ); + + let right = build_table_two_cols( + ("c2", &Vec::>::new()), + ("dummy", &Vec::>::new()), + ); + + let batches = + null_aware_mark_join_collect(batch_size, left, right, &[("c1", "c2")]) + .await?; + + allow_duplicates! { + assert_snapshot!(batches_to_sort_string(&batches), @r" + +----+-------+-------+ + | c1 | dummy | mark | + +----+-------+-------+ + | | 0 | false | + | 1 | 10 | false | + +----+-------+-------+ + "); + } + + Ok(()) + } + + /// Test scalar correlated null-aware left mark join. + /// + /// The first key is the scalar NOT IN value key. The second key is the + /// correlated scope key, so the NULL on the probe side only affects group 1. + #[apply(hash_join_exec_configs)] + #[tokio::test] + async fn test_null_aware_left_mark_correlated_scope(batch_size: usize) -> Result<()> { + let left = build_table_two_cols( + ("id", &vec![Some(1), Some(2), Some(3), None, None, Some(5)]), + ( + "grp", + &vec![Some(1), Some(1), Some(1), Some(1), Some(2), Some(3)], + ), + ); + + let right = build_table_two_cols( + ("id", &vec![Some(2), None, Some(1)]), + ("grp", &vec![Some(1), Some(1), Some(2)]), + ); + + let batches = null_aware_mark_join_collect( + batch_size, + left, + right, + &[("id", "id"), ("grp", "grp")], + ) + .await?; + + allow_duplicates! { + assert_snapshot!(batches_to_sort_string(&batches), @r" + +----+-----+-------+ + | id | grp | mark | + +----+-----+-------+ + | | 1 | | + | | 2 | | + | 1 | 1 | | + | 2 | 1 | true | + | 3 | 1 | | + | 5 | 3 | false | + +----+-----+-------+ + "); + } + + Ok(()) + } + + /// Scalar correlated null-aware left mark join where neither side's value + /// key (`id`) contains NULL. + /// + /// This exercises the fast path in `mark_null_candidates_for_probe_batch` + /// that skips the correlation-scope NULL lookup when no value key is NULL. + /// With no NULL value keys, SQL `NOT IN` can never be UNKNOWN, so the mark + /// must be `true`/`false` only and never NULL, regardless of correlation + /// scope or empty subqueries (`grp = 3` has no matching probe rows). + #[apply(hash_join_exec_configs)] + #[tokio::test] + async fn test_null_aware_left_mark_correlated_no_value_nulls( + batch_size: usize, + ) -> Result<()> { + let left = build_table_two_cols( + ("id", &vec![Some(1), Some(2), Some(3), Some(4)]), + ("grp", &vec![Some(1), Some(1), Some(2), Some(3)]), + ); + + let right = build_table_two_cols( + ("id", &vec![Some(2), Some(5)]), + ("grp", &vec![Some(1), Some(2)]), + ); + + let batches = null_aware_mark_join_collect( + batch_size, + left, + right, + &[("id", "id"), ("grp", "grp")], + ) + .await?; + + // Every mark is true/false; no UNKNOWN is produced when value keys are + // non-null. `id=2` matches within `grp=1` (true); `id=4`'s `grp=3` has + // an empty correlated subquery (false); the rest find no equal value. + allow_duplicates! { + assert_snapshot!(batches_to_sort_string(&batches), @r" + +----+-----+-------+ + | id | grp | mark | + +----+-----+-------+ + | 1 | 1 | false | + | 2 | 1 | true | + | 3 | 2 | false | + | 4 | 3 | false | + +----+-----+-------+ + "); + } + + Ok(()) + } + + /// Scalar correlated null-aware left mark join where only the build side's + /// value key contains NULL. + /// + /// This isolates the NULL-value scope map direction of + /// `mark_null_candidates_for_probe_batch`: NULL-valued build rows become + /// UNKNOWN as soon as any probe row shares their correlation scope, while + /// non-null build rows stay `true`/`false`. + #[apply(hash_join_exec_configs)] + #[tokio::test] + async fn test_null_aware_left_mark_correlated_build_null_only( + batch_size: usize, + ) -> Result<()> { + let left = build_table_two_cols( + ("id", &vec![Some(1), None, None, Some(3)]), + ("grp", &vec![Some(1), Some(1), Some(2), Some(3)]), + ); + + let right = build_table_two_cols( + ("id", &vec![Some(2), Some(1)]), + ("grp", &vec![Some(1), Some(2)]), + ); + + let batches = null_aware_mark_join_collect( + batch_size, + left, + right, + &[("id", "id"), ("grp", "grp")], + ) + .await?; + + // `(NULL, 1)` and `(NULL, 2)` have non-empty correlated scopes, so + // their marks are UNKNOWN. `(1, 1)` finds no equal value and no NULL in + // its scope (false); `(3, 3)` has an empty correlated scope (false). + allow_duplicates! { + assert_snapshot!(batches_to_sort_string(&batches), @r" + +----+-----+-------+ + | id | grp | mark | + +----+-----+-------+ + | | 1 | | + | | 2 | | + | 1 | 1 | false | + | 3 | 3 | false | + +----+-----+-------+ + "); + } + + Ok(()) + } + + /// Scalar correlated null-aware left mark join where only the probe side's + /// value key contains NULL. + /// + /// This isolates the NULL-valued-probe-row direction of + /// `mark_null_candidates_for_probe_batch`: every unmatched build row whose + /// correlation scope contains a NULL-valued probe row becomes UNKNOWN. + #[apply(hash_join_exec_configs)] + #[tokio::test] + async fn test_null_aware_left_mark_correlated_probe_null_only( + batch_size: usize, + ) -> Result<()> { + let left = build_table_two_cols( + ("id", &vec![Some(1), Some(2), Some(5), Some(7)]), + ("grp", &vec![Some(1), Some(1), Some(2), Some(3)]), + ); + + let right = build_table_two_cols( + ("id", &vec![Some(2), None, Some(4)]), + ("grp", &vec![Some(1), Some(1), Some(2)]), + ); + + let batches = null_aware_mark_join_collect( + batch_size, + left, + right, + &[("id", "id"), ("grp", "grp")], + ) + .await?; + + // `(1, 1)` is unmatched and its scope contains a NULL value (UNKNOWN); + // `(2, 1)` matches (true); `(5, 2)`'s scope holds only `4` (false); + // `(7, 3)` has an empty correlated scope (false). + allow_duplicates! { + assert_snapshot!(batches_to_sort_string(&batches), @r" + +----+-----+-------+ + | id | grp | mark | + +----+-----+-------+ + | 1 | 1 | | + | 2 | 1 | true | + | 5 | 2 | false | + | 7 | 3 | false | + +----+-----+-------+ + "); + } + + Ok(()) + } + #[test] fn test_lr_is_preserved() { assert_eq!(lr_is_preserved(JoinType::Inner), (true, true)); diff --git a/datafusion/physical-plan/src/joins/hash_join/stream.rs b/datafusion/physical-plan/src/joins/hash_join/stream.rs index 2aa6e69dff807..c71f915da3f14 100644 --- a/datafusion/physical-plan/src/joins/hash_join/stream.rs +++ b/datafusion/physical-plan/src/joins/hash_join/stream.rs @@ -43,11 +43,11 @@ use crate::{ BuildProbeJoinMetrics, ColumnIndex, JoinFilter, JoinHashMapType, StatefulStreamResult, adjust_indices_by_join_type, apply_join_filter_to_indices, build_batch_empty_build_side, build_batch_from_indices, - need_produce_result_in_final, + build_null_aware_left_mark_column, need_produce_result_in_final, }, }; -use arrow::array::{Array, ArrayRef, UInt32Array, UInt64Array}; +use arrow::array::{Array, ArrayRef, BooleanBufferBuilder, UInt32Array, UInt64Array}; use arrow::buffer::NullBuffer; use arrow::datatypes::{Schema, SchemaRef}; use arrow::record_batch::RecordBatch; @@ -325,6 +325,16 @@ pub(super) struct HashJoinStream { probe_indices_buffer: Vec, /// Scratch space for build indices during hash lookup build_indices_buffer: Vec, + + /// Scratch space for scope-key hashes in the null-aware mark pass, reused + /// across probe batches. Separate from `hashes_buffer`, which still holds + /// the probe batch's full-key hashes during the chunked lookup loop. + null_mark_hashes_buffer: Vec, + /// Scratch space for probe indices during null-aware scope lookups + null_mark_probe_indices_buffer: Vec, + /// Scratch space for build indices during null-aware scope lookups + null_mark_build_indices_buffer: Vec, + /// Specifies whether the right side has an ordering to potentially preserve right_side_ordered: bool, /// Owns this partition's build-data report lifecycle. @@ -334,7 +344,7 @@ pub(super) struct HashJoinStream { /// Output buffer for coalescing small batches into larger ones with optional fetch limit. /// Uses `LimitedBatchCoalescer` to efficiently combine batches and absorb limit with 'fetch' output_buffer: LimitedBatchCoalescer, - /// Whether this is a null-aware anti join + /// Whether this is a null-aware anti or mark join null_aware: bool, } @@ -509,6 +519,9 @@ impl HashJoinStream { hashes_buffer, probe_indices_buffer: Vec::with_capacity(batch_size), build_indices_buffer: Vec::with_capacity(batch_size), + null_mark_hashes_buffer: Vec::with_capacity(batch_size), + null_mark_probe_indices_buffer: Vec::with_capacity(batch_size), + null_mark_build_indices_buffer: Vec::with_capacity(batch_size), right_side_ordered, build_report: BuildReportHandle::new(partition, mode, build_accumulator), mode, @@ -741,10 +754,11 @@ impl HashJoinStream { let timer = self.join_metrics.join_time.timer(); - // Null-aware anti join semantics: - // For LeftAnti: output LEFT (build) rows where LEFT.key NOT IN RIGHT.key - // 1. If RIGHT (probe) contains NULL in any batch, no LEFT rows should be output - // 2. LEFT rows with NULL keys should not be output (handled in final stage) + // Null-aware join bookkeeping: + // - LeftAnti needs global knowledge of probe-side NULLs/non-emptiness to implement NOT IN. + // - Uncorrelated LeftMark uses the same global probe-side state. + // - Correlated LeftMark records per-build-row NULL candidates below using + // a separate correlation-scope lookup. if self.null_aware { // Mark that we've seen a probe batch with actual rows (probe side is non-empty) // Only set this if batch has rows - empty batches don't count @@ -756,8 +770,7 @@ impl HashJoinStream { .store(true, Ordering::Relaxed); } - // Check if probe side (RIGHT) contains NULL - // Since null_aware validation ensures single column join, we only check the first column + // Check if the scalar NOT IN value key from the probe side contains NULL. let probe_key_column = &state.values[0]; if probe_key_column.null_count() > 0 { // Found NULL in probe side - set shared flag to prevent any output @@ -767,11 +780,12 @@ impl HashJoinStream { .store(true, Ordering::Relaxed); } - // If probe side has NULL (detected in this or any other partition), return empty result - if build_side - .left_data - .probe_side_has_null - .load(Ordering::Relaxed) + // LeftAnti can short-circuit once the probe side contains NULL. + if self.join_type == JoinType::LeftAnti + && build_side + .left_data + .probe_side_has_null + .load(Ordering::Relaxed) { timer.done(); self.state = HashJoinStreamState::FetchProbeBatch; @@ -796,6 +810,25 @@ impl HashJoinStream { return Ok(StatefulStreamResult::Continue); } + // For correlated null-aware LeftMark (`on[1..]` scope keys, hence + // values len > 1), record this batch's UNKNOWN candidates once, before + // the first chunked lookup (offset == (0, None)). + if self.null_aware + && self.join_type == JoinType::LeftMark + && state.values.len() > 1 + && state.offset == (0, None) + { + mark_null_candidates_for_probe_batch( + build_side, + state, + &self.random_state, + self.batch_size, + &mut self.null_mark_hashes_buffer, + &mut self.null_mark_probe_indices_buffer, + &mut self.null_mark_build_indices_buffer, + )?; + } + // get the matched by join keys indices let (left_indices, right_indices, next_offset) = match build_side.left_data.map() { @@ -919,6 +952,7 @@ impl HashJoinStream { &self.column_indices, join_side, self.join_type, + None, )?; let push_status = self.output_buffer.push_batch(batch)?; @@ -954,6 +988,7 @@ impl HashJoinStream { let timer = self.join_metrics.join_time.timer(); if !need_produce_result_in_final(self.join_type) { + timer.done(); self.state = HashJoinStreamState::Completed; return Ok(StatefulStreamResult::Continue); } @@ -963,6 +998,7 @@ impl HashJoinStream { // For null-aware anti join, if probe side had NULL, no rows should be output // Check shared atomic state to get global knowledge across all partitions if self.null_aware + && self.join_type == JoinType::LeftAnti && build_side .left_data .probe_side_has_null @@ -972,7 +1008,9 @@ impl HashJoinStream { self.state = HashJoinStreamState::Completed; return Ok(StatefulStreamResult::Continue); } + if !build_side.left_data.report_probe_completed() { + timer.done(); self.state = HashJoinStreamState::Completed; return Ok(StatefulStreamResult::Continue); } @@ -1002,32 +1040,56 @@ impl HashJoinStream { let filtered_indices: Vec = left_side .iter() .filter_map(|idx| { - let idx_usize = idx.unwrap() as usize; + let idx = idx.expect( + "LeftAnti final indices should always contain build-side rows", + ); + let idx_usize = idx as usize; if build_key_column.is_null(idx_usize) { None // Skip rows with NULL keys } else { - Some(idx.unwrap()) + Some(idx) } }) .collect(); left_side = UInt64Array::from(filtered_indices); - - // Update right_side to match the new length - let mut builder = arrow::array::UInt32Builder::with_capacity(left_side.len()); - builder.append_nulls(left_side.len()); - right_side = builder.finish(); + right_side = UInt32Array::new_null(left_side.len()); } self.join_metrics.input_batches.add(1); self.join_metrics.input_rows.add(left_side.len()); - timer.done(); - - self.state = HashJoinStreamState::Completed; - // Push final unmatched indices to output buffer if !left_side.is_empty() { + let mark_column = if self.null_aware && self.join_type == JoinType::LeftMark { + let probe_side_has_null = build_side + .left_data + .probe_side_has_null + .load(Ordering::Relaxed); + let probe_side_non_empty = build_side + .left_data + .probe_side_non_empty + .load(Ordering::Relaxed); + let build_key_column = &build_side.left_data.values()[0]; + let null_indices_bitmap = + if build_side.left_data.null_aware_state().is_some() { + Some(build_side.left_data.null_indices_bitmap().lock()) + } else { + None + }; + + Some(build_null_aware_left_mark_column( + &left_side, + &right_side, + build_key_column.as_ref(), + null_indices_bitmap.as_deref(), + probe_side_has_null, + probe_side_non_empty, + )) + } else { + None + }; + let empty_right_batch = RecordBatch::new_empty(self.right.schema()); let batch = build_batch_from_indices( &self.schema, @@ -1038,6 +1100,7 @@ impl HashJoinStream { &self.column_indices, JoinSide::Left, self.join_type, + mark_column.as_ref(), )?; let push_status = self.output_buffer.push_batch(batch)?; @@ -1047,10 +1110,154 @@ impl HashJoinStream { } } + timer.done(); + self.state = HashJoinStreamState::Completed; + Ok(StatefulStreamResult::Continue) } } +/// Records which build rows of a correlated null-aware `LeftMark` join are +/// UNKNOWN candidates for this probe batch. +/// +/// Key layout: `on[0]` is the `NOT IN` value key, `on[1..]` the correlation +/// scope keys (see `HashJoinExec::null_aware`). A build row's mark must be +/// NULL (SQL UNKNOWN) instead of FALSE when it is unmatched and either: +/// 1. its value key is NULL and any probe row shares its correlation scope, or +/// 2. some probe row in its correlation scope has a NULL value key. +/// +/// Case 1 probes the build-side NULL-value scope map with all probe rows; +/// case 2 probes the full scope map with only the NULL-valued probe rows. +fn mark_null_candidates_for_probe_batch( + build_side: &BuildSideReadyState, + state: &ProcessProbeBatchState, + random_state: &RandomState, + batch_size: usize, + hashes_buffer: &mut Vec, + probe_indices_buffer: &mut Vec, + build_indices_buffer: &mut Vec, +) -> Result<()> { + let Some(null_aware_state) = build_side.left_data.null_aware_state() else { + return Ok(()); + }; + + debug_assert_eq!( + build_side.left_data.values().len(), + state.values.len(), + "build/probe key counts must match" + ); + debug_assert!(state.values.len() > 1, "keys must be [value, scope..]"); + + let probe_value_key = &state.values[0]; + let build_scope_values = &build_side.left_data.values()[1..]; + let probe_scope_values = &state.values[1..]; + + let null_value_scope_map = null_aware_state.null_value_scope_map.as_ref(); + let probe_has_null_values = probe_value_key.null_count() > 0; + if null_value_scope_map.is_none() && !probe_has_null_values { + return Ok(()); + } + + let mut null_bitmap = build_side.left_data.null_indices_bitmap().lock(); + + // Case 1: build rows with a NULL value key are UNKNOWN as soon as any + // probe row shares their correlation scope. + if let Some(null_value_scope_map) = null_value_scope_map { + hashes_buffer.clear(); + hashes_buffer.resize(state.batch.num_rows(), 0); + create_hashes(probe_scope_values, random_state, hashes_buffer)?; + + let build_indices = &null_value_scope_map.build_indices; + scan_scope_matches_into_bitmap( + null_value_scope_map.map.as_ref(), + &null_value_scope_map.scope_values, + probe_scope_values, + hashes_buffer, + batch_size, + probe_indices_buffer, + build_indices_buffer, + // The map indexes only the NULL-valued build rows; translate its + // positions back to row indices in the full build batch. + |position| build_indices.value(position as usize), + &mut null_bitmap, + )?; + } + + // Case 2: NULL-valued probe rows make every build row in their correlation + // scope an UNKNOWN candidate. + if probe_has_null_values { + let null_mask = arrow::compute::is_null(probe_value_key.as_ref())?; + let probe_null_scope_values = probe_scope_values + .iter() + .map(|values| Ok(arrow::compute::filter(values.as_ref(), &null_mask)?)) + .collect::>>()?; + + hashes_buffer.clear(); + hashes_buffer.resize(null_mask.true_count(), 0); + create_hashes(&probe_null_scope_values, random_state, hashes_buffer)?; + + scan_scope_matches_into_bitmap( + null_aware_state.scope_map.as_ref(), + build_scope_values, + &probe_null_scope_values, + hashes_buffer, + batch_size, + probe_indices_buffer, + build_indices_buffer, + |position| position, + &mut null_bitmap, + )?; + } + + Ok(()) +} + +/// Scans all correlation-scope matches between `build_scope_values` and +/// `probe_scope_values` and sets the bit of every matched build row in +/// `null_bitmap`, translating matched map positions through +/// `map_position_to_build_row`. +#[expect(clippy::too_many_arguments)] +fn scan_scope_matches_into_bitmap( + scope_map: &dyn JoinHashMapType, + build_scope_values: &[ArrayRef], + probe_scope_values: &[ArrayRef], + hashes_buffer: &[u64], + batch_size: usize, + probe_indices_buffer: &mut Vec, + build_indices_buffer: &mut Vec, + map_position_to_build_row: impl Fn(u64) -> u64, + null_bitmap: &mut BooleanBufferBuilder, +) -> Result<()> { + let mut offset = (0, None); + loop { + let (build_indices, _probe_indices, next_offset) = lookup_join_hashmap( + scope_map, + build_scope_values, + probe_scope_values, + NullEquality::NullEqualsNothing, + hashes_buffer, + None, + batch_size, + offset, + probe_indices_buffer, + build_indices_buffer, + )?; + + for build_idx in build_indices.iter() { + let build_idx = + build_idx.expect("scope lookup should produce non-null build indices"); + null_bitmap.set_bit(map_position_to_build_row(build_idx) as usize, true); + } + + let Some(next_offset) = next_offset else { + break; + }; + offset = next_offset; + } + + Ok(()) +} + impl Stream for HashJoinStream { type Item = Result; diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index db552fed96724..1d2937ccc60ab 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -284,7 +284,7 @@ impl NestedLoopJoinExecBuilder { let right_schema = right.schema(); check_join_is_valid(&left_schema, &right_schema, &[])?; let (join_schema, column_indices) = - build_join_schema(&left_schema, &right_schema, &join_type); + build_join_schema(&left_schema, &right_schema, &join_type, false); let join_schema = Arc::new(join_schema); let cache = NestedLoopJoinExec::compute_properties( &left, diff --git a/datafusion/physical-plan/src/joins/piecewise_merge_join/exec.rs b/datafusion/physical-plan/src/joins/piecewise_merge_join/exec.rs index 50e9252a21131..faabe6a7b2ba2 100644 --- a/datafusion/physical-plan/src/joins/piecewise_merge_join/exec.rs +++ b/datafusion/physical-plan/src/joins/piecewise_merge_join/exec.rs @@ -352,8 +352,9 @@ impl PiecewiseMergeJoinExec { let streamed_schema = streamed.schema(); // Create output schema for the join - let schema = - Arc::new(build_join_schema(&buffered_schema, &streamed_schema, &join_type).0); + let schema = Arc::new( + build_join_schema(&buffered_schema, &streamed_schema, &join_type, false).0, + ); let cache = Self::compute_properties( &buffered, &streamed, diff --git a/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs b/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs index a8d25fd002b76..3be6a3b22cade 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs @@ -185,7 +185,7 @@ impl SortMergeJoinExec { }; let schema = - Arc::new(build_join_schema(&left_schema, &right_schema, &join_type).0); + Arc::new(build_join_schema(&left_schema, &right_schema, &join_type, false).0); let cache = Self::compute_properties(&left, &right, Arc::clone(&schema), join_type, &on)?; Ok(Self { diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs index a56ad1712aa8e..3ebe0d7e49b6a 100644 --- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs +++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs @@ -234,7 +234,7 @@ impl SymmetricHashJoinExec { // Build the join schema from the left and right schemas: let (schema, column_indices) = - build_join_schema(&left_schema, &right_schema, join_type); + build_join_schema(&left_schema, &right_schema, join_type, false); // Initialize the random state for the join operation: let random_state = RandomState::with_seed(0); @@ -937,6 +937,7 @@ pub(crate) fn build_side_determined_results( column_indices, build_hash_joiner.build_side, join_type, + None, ) .map(|batch| (batch.num_rows() > 0).then_some(batch)) } else { @@ -1040,6 +1041,7 @@ pub(crate) fn join_with_probe_batch( column_indices, build_hash_joiner.build_side, join_type, + None, ) .map(|batch| (batch.num_rows() > 0).then_some(batch)) } diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index 7ecace6b0e530..776bd8ed2e80b 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -257,11 +257,16 @@ fn output_join_field(old_field: &Field, join_type: &JoinType, is_left: bool) -> } /// Creates a schema for a join operation. -/// The fields from the left side are first +/// The fields from the left side are first. +/// +/// When `null_aware` is set, the `LeftMark`/`RightMark` `mark` column is made +/// nullable so it can represent SQL UNKNOWN for null-aware `NOT IN` semantics. +/// `null_aware` has no effect on non-mark join types. pub fn build_join_schema( left: &Schema, right: &Schema, join_type: &JoinType, + null_aware: bool, ) -> (Schema, Vec) { let left_fields = || { left.fields() @@ -304,7 +309,7 @@ pub fn build_join_schema( JoinType::LeftSemi | JoinType::LeftAnti => left_fields().unzip(), JoinType::LeftMark => { let right_field = once(( - Field::new("mark", DataType::Boolean, false), + Field::new("mark", DataType::Boolean, null_aware), ColumnIndex { index: 0, side: JoinSide::None, @@ -315,7 +320,7 @@ pub fn build_join_schema( JoinType::RightSemi | JoinType::RightAnti => right_fields().unzip(), JoinType::RightMark => { let left_field = once(( - Field::new("mark", DataType::Boolean, false), + Field::new("mark", DataType::Boolean, null_aware), ColumnIndex { index: 0, side: JoinSide::None, @@ -1263,6 +1268,7 @@ pub(crate) fn apply_join_filter_to_indices( filter.column_indices(), build_side, join_type, + None, )?; let filter_result = filter .expression() @@ -1285,6 +1291,7 @@ pub(crate) fn apply_join_filter_to_indices( filter.column_indices(), build_side, join_type, + None, )?; filter @@ -1326,6 +1333,7 @@ pub(crate) fn build_batch_from_indices( column_indices: &[ColumnIndex], build_side: JoinSide, join_type: JoinType, + mark_column: Option<&ArrayRef>, ) -> Result { if schema.fields().is_empty() { // For RightAnti and RightSemi joins, after `adjust_indices_by_join_type` @@ -1345,8 +1353,12 @@ pub(crate) fn build_batch_from_indices( for column_index in column_indices { let array = if column_index.side == JoinSide::None { - // For mark joins, the mark column is a true if the indices is not null, otherwise it will be false - Arc::new(compute::is_not_null(probe_indices)?) + // For mark joins, callers can provide a custom mark column. Otherwise, + // matched rows are `true` and unmatched rows are `false`. + match mark_column { + Some(mark_col) => Arc::clone(mark_col), + None => Arc::new(compute::is_not_null(probe_indices)?), + } } else if column_index.side == build_side { let array = build_input_buffer.column(column_index.index); if array.is_empty() || build_indices.null_count() == build_indices.len() { @@ -1373,6 +1385,66 @@ pub(crate) fn build_batch_from_indices( Ok(RecordBatch::try_new(Arc::new(schema.clone()), columns)?) } +/// Builds the nullable mark column for a null-aware `LeftMark` join. +/// +/// This follows the left mark hash join described in Neumann, Leis, and Kemper, +/// "The Complete Story of Joins (in HyPer)", Section 5.6: +/// +/// +/// `build_indices` and `probe_indices` are the final aligned indices derived from the +/// visited bitmap. At this point: +/// - valid `probe_indices` mean the build row matched at least one probe row, so the mark is `TRUE` +/// - null `probe_indices` mean the build row was unmatched, so the result depends on SQL +/// three-valued logic +/// +/// For the uncorrelated single-key implementation, unmatched rows are classified as follows: +/// 1. if the build key is `NULL` and the probe side is non-empty, the mark is `NULL` +/// 2. if the build key is `NULL` and the probe side is empty, the mark is `FALSE` +/// 3. if the build key is non-null and the probe side contained a `NULL`, the mark is `NULL` +/// 4. otherwise, the mark is `FALSE` +/// +/// For correlated scalar `NOT IN`, `null_indices_bitmap` carries the same UNKNOWN +/// decision per build row, scoped by the correlated equality keys. +/// +/// This is the helper equivalent of the paper's "null bucket" and `hadNull` handling. +/// It is intentionally scoped to scalar null-aware mark joins. +pub(crate) fn build_null_aware_left_mark_column( + build_indices: &UInt64Array, + probe_indices: &UInt32Array, + build_key_column: &dyn Array, + null_indices_bitmap: Option<&BooleanBufferBuilder>, + probe_side_has_null: bool, + probe_side_non_empty: bool, +) -> ArrayRef { + // Whether an unmatched build row's mark is NULL (UNKNOWN) instead of FALSE: + // correlated joins precomputed this per row in `null_indices_bitmap`; the + // uncorrelated rules are cases 1-4 in the doc above. + let unmatched_mark_is_null = |build_idx: usize| match null_indices_bitmap { + Some(bitmap) => bitmap.get_bit(build_idx), + None if build_key_column.is_null(build_idx) => probe_side_non_empty, + None => probe_side_has_null, + }; + + let marks: BooleanArray = build_indices + .iter() + .zip(probe_indices.iter()) + .map(|(build_idx, probe_idx)| { + if probe_idx.is_some() { + return Some(true); + } + let build_idx = build_idx + .expect("LeftMark final indices should always contain build-side rows") + as usize; + if unmatched_mark_is_null(build_idx) { + None + } else { + Some(false) + } + }) + .collect(); + Arc::new(marks) +} + /// Returns a new [RecordBatch] for a probe batch when no probe row can find a /// match: the build-side map is empty, either because the build side has no /// rows or because none of its rows has a matchable (non-NULL) join key. @@ -2770,7 +2842,7 @@ mod tests { ]; for (left_in, right_in, join_type, left_out, right_out) in cases { - let (schema, _) = build_join_schema(left_in, right_in, &join_type); + let (schema, _) = build_join_schema(left_in, right_in, &join_type, false); let expected_fields = left_out .fields() @@ -4274,13 +4346,13 @@ mod tests { .with_metadata(HashMap::from([("key".to_string(), "right".to_string())])); let (join_schema, _) = - build_join_schema(&left_schema, &right_schema, &JoinType::Left); + build_join_schema(&left_schema, &right_schema, &JoinType::Left, false); assert_eq!( join_schema.metadata(), &HashMap::from([("key".to_string(), "left".to_string())]) ); let (join_schema, _) = - build_join_schema(&left_schema, &right_schema, &JoinType::Right); + build_join_schema(&left_schema, &right_schema, &JoinType::Right, false); assert_eq!( join_schema.metadata(), &HashMap::from([("key".to_string(), "right".to_string())]) diff --git a/datafusion/sqllogictest/test_files/null_aware_anti_join.slt b/datafusion/sqllogictest/test_files/null_aware_anti_join.slt index b18f3b3ae7a99..e73293f544375 100644 --- a/datafusion/sqllogictest/test_files/null_aware_anti_join.slt +++ b/datafusion/sqllogictest/test_files/null_aware_anti_join.slt @@ -147,16 +147,6 @@ WHERE id NOT IN (SELECT id FROM inner_table_no_null) ## Test 9: Multiple NOT IN conditions (OR) ############# -# KNOWN LIMITATION: Mark joins used for OR conditions don't support null-aware semantics. -# The NULL row is incorrectly returned here. According to SQL semantics: -# - NULL NOT IN (2, 4) = UNKNOWN -# - NULL NOT IN (1, 3) = UNKNOWN -# - UNKNOWN OR UNKNOWN = UNKNOWN (should be filtered out) -# But mark joins treat NULL keys as non-matching (FALSE), so: -# - NULL mark column = FALSE -# - NOT FALSE OR NOT FALSE = TRUE OR TRUE = TRUE (incorrectly included) -# TODO: Implement null-aware support for mark joins to fix this - query IT rowsort SELECT * FROM outer_table WHERE id NOT IN (SELECT id FROM inner_table_no_null) @@ -166,7 +156,6 @@ WHERE id NOT IN (SELECT id FROM inner_table_no_null) 2 b 3 c 4 d -NULL e ############# ## Test 10: NOT IN with WHERE clause in subquery diff --git a/datafusion/sqllogictest/test_files/null_aware_mark_join.slt b/datafusion/sqllogictest/test_files/null_aware_mark_join.slt new file mode 100644 index 0000000000000..d0e24eed87e7b --- /dev/null +++ b/datafusion/sqllogictest/test_files/null_aware_mark_join.slt @@ -0,0 +1,493 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +statement ok +CREATE TABLE outer_table(id INT, value TEXT) AS VALUES +(1, 'a'), +(2, 'b'), +(3, 'c'), +(4, 'd'), +(NULL, 'e'); + +statement ok +CREATE TABLE inner_table_no_null(id INT) AS VALUES +(2), +(4); + +statement ok +CREATE TABLE inner_table_with_null(id INT) AS VALUES +(2), +(NULL); + +statement ok +CREATE TABLE outer_corr_table(id INT, grp INT, value TEXT) AS VALUES +(1, 1, 'a'), +(3, 1, 'b'), +(1, 2, 'c'), +(NULL, 1, 'd'), +(5, 3, 'e'), +(2, 1, 'f'), +(NULL, 2, 'g'); + +statement ok +CREATE TABLE inner_corr_table(id INT, grp INT) AS VALUES +(2, 1), +(NULL, 1), +(1, 2); + +statement ok +CREATE TABLE empty_table(id INT) AS +SELECT * +FROM (VALUES (1)) AS seed(id) +WHERE id < 0; + +############################# +## Hash join null-aware mark +############################# + +statement ok +set datafusion.execution.target_partitions = 2; + +statement ok +set datafusion.optimizer.repartition_joins = true; + +statement ok +set datafusion.optimizer.prefer_hash_join = true; + +query TT +EXPLAIN SELECT id, value +FROM outer_table +WHERE (id NOT IN (SELECT id FROM inner_table_with_null)) IS NULL; +---- +logical_plan +01)Projection: outer_table.id, outer_table.value +02)--Filter: NOT __correlated_sq_1.mark IS NULL +03)----LeftMark Join: outer_table.id = __correlated_sq_1.id null_aware +04)------TableScan: outer_table projection=[id, value] +05)------SubqueryAlias: __correlated_sq_1 +06)--------TableScan: inner_table_with_null projection=[id] +physical_plan +01)FilterExec: NOT mark@2 IS NULL, projection=[id@0, value@1] +02)--RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +03)----HashJoinExec: mode=CollectLeft, join_type=LeftMark, on=[(id@0, id@0)], null_aware +04)------DataSourceExec: partitions=1, partition_sizes=[1] +05)------DataSourceExec: partitions=1, partition_sizes=[1] + +query T rowsort +SELECT value +FROM outer_table +WHERE (id NOT IN (SELECT id FROM inner_table_with_null)) IS NULL; +---- +a +c +d +e + +query TT +EXPLAIN SELECT value +FROM outer_table +WHERE (id NOT IN (SELECT id FROM inner_table_no_null)) IS NULL; +---- +logical_plan +01)Projection: outer_table.value +02)--Filter: NOT __correlated_sq_1.mark IS NULL +03)----Projection: outer_table.value, __correlated_sq_1.mark +04)------LeftMark Join: outer_table.id = __correlated_sq_1.id null_aware +05)--------TableScan: outer_table projection=[id, value] +06)--------SubqueryAlias: __correlated_sq_1 +07)----------TableScan: inner_table_no_null projection=[id] +physical_plan +01)FilterExec: NOT mark@1 IS NULL, projection=[value@0] +02)--RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +03)----HashJoinExec: mode=CollectLeft, join_type=LeftMark, on=[(id@0, id@0)], projection=[value@1, mark@2], null_aware +04)------DataSourceExec: partitions=1, partition_sizes=[1] +05)------DataSourceExec: partitions=1, partition_sizes=[1] + +query T rowsort +SELECT value +FROM outer_table +WHERE (id NOT IN (SELECT id FROM inner_table_no_null)) IS NULL; +---- +e + +query T rowsort +SELECT value +FROM outer_table +WHERE (id NOT IN (SELECT id FROM empty_table)) IS TRUE; +---- +a +b +c +d +e + +# Regression test for correlated NOT IN inside a mark join. +query T rowsort +SELECT value +FROM outer_corr_table +WHERE (id NOT IN ( + SELECT id + FROM inner_corr_table + WHERE inner_corr_table.grp = outer_corr_table.grp +)) IS NULL; +---- +a +b +d +g + +query T rowsort +SELECT value +FROM outer_corr_table +WHERE (id NOT IN ( + SELECT id + FROM inner_corr_table + WHERE inner_corr_table.grp = outer_corr_table.grp +)) IS TRUE; +---- +e + +# WHERE EXISTS in a disjunction, the motivating mark join example from +# "The Complete Story of Joins (in HyPer)" (Neumann, Leis, Kemper; BTW 2017), +# Section 3.3. The OR prevents the semi join rewrite, so the EXISTS must run +# as a mark join. Unlike the null-aware NOT IN marks above, an EXISTS mark is +# two-valued: NULLs on either side of the comparison produce FALSE, not NULL. + +query TT +EXPLAIN SELECT value +FROM outer_table +WHERE EXISTS ( + SELECT * + FROM inner_table_with_null + WHERE inner_table_with_null.id = outer_table.id +) OR value = 'e'; +---- +logical_plan +01)Projection: outer_table.value +02)--Filter: __correlated_sq_1.mark OR outer_table.value = Utf8View("e") +03)----Projection: outer_table.value, __correlated_sq_1.mark +04)------LeftMark Join: outer_table.id = __correlated_sq_1.id +05)--------TableScan: outer_table projection=[id, value] +06)--------SubqueryAlias: __correlated_sq_1 +07)----------TableScan: inner_table_with_null projection=[id] +physical_plan +01)FilterExec: mark@1 OR value@0 = e, projection=[value@0] +02)--RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +03)----HashJoinExec: mode=CollectLeft, join_type=RightMark, on=[(id@0, id@0)], projection=[value@1, mark@2] +04)------DataSourceExec: partitions=1, partition_sizes=[1] +05)------DataSourceExec: partitions=1, partition_sizes=[1] + +query T rowsort +SELECT value +FROM outer_table +WHERE EXISTS ( + SELECT * + FROM inner_table_with_null + WHERE inner_table_with_null.id = outer_table.id +) OR value = 'e'; +---- +b +e + +# If the EXISTS mark were wrongly null-aware, every unmatched outer row would +# get a NULL mark (the build side contains a NULL), NOT mark would stay NULL, +# and all rows except 'b' would disappear. +query TT +EXPLAIN SELECT value +FROM outer_table +WHERE NOT EXISTS ( + SELECT * + FROM inner_table_with_null + WHERE inner_table_with_null.id = outer_table.id +) OR value = 'b'; +---- +logical_plan +01)Projection: outer_table.value +02)--Filter: NOT __correlated_sq_1.mark OR outer_table.value = Utf8View("b") +03)----Projection: outer_table.value, __correlated_sq_1.mark +04)------LeftMark Join: outer_table.id = __correlated_sq_1.id +05)--------TableScan: outer_table projection=[id, value] +06)--------SubqueryAlias: __correlated_sq_1 +07)----------TableScan: inner_table_with_null projection=[id] +physical_plan +01)FilterExec: NOT mark@1 OR value@0 = b, projection=[value@0] +02)--RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +03)----HashJoinExec: mode=CollectLeft, join_type=RightMark, on=[(id@0, id@0)], projection=[value@1, mark@2] +04)------DataSourceExec: partitions=1, partition_sizes=[1] +05)------DataSourceExec: partitions=1, partition_sizes=[1] + +query T rowsort +SELECT value +FROM outer_table +WHERE NOT EXISTS ( + SELECT * + FROM inner_table_with_null + WHERE inner_table_with_null.id = outer_table.id +) OR value = 'b'; +---- +a +b +c +d +e + +# Section 3.3 also gives the genuinely three-valued mark example: +# select Title, ECTS = any (select ECTS from Courses c2 +# where Lecturer = 123) someEqual +# from Courses c1 +# where the mark itself is projected and must surface TRUE, FALSE, and NULL. +# DataFusion cannot decorrelate subqueries in a projection yet, so the +# projected forms are documented errors for now. Expected results once that +# lands (verified against DuckDB): +# +# SELECT value, id = ANY (SELECT id FROM inner_table_no_null) AS some_equal +# FROM outer_table; +# ---- +# a false +# b true +# c false +# d true +# e NULL + +query error This feature is not implemented: Physical plan does not support logical expression Exists +SELECT value, id = ANY (SELECT id FROM inner_table_no_null) AS some_equal +FROM outer_table; + +query error This feature is not implemented: Physical plan does not support logical expression InSubquery +SELECT value, id IN (SELECT id FROM inner_table_no_null) AS some_equal +FROM outer_table; + +# In a filter the paper's = ANY form works today: rewrite_set_comparison +# expands it to a CASE over EXISTS marks, and IS NULL / IS TRUE then observe +# all three mark values. + +query TT +EXPLAIN SELECT value +FROM outer_table +WHERE (id = ANY (SELECT id FROM inner_table_with_null)) IS NULL; +---- +logical_plan +01)Projection: outer_table.value +02)--Filter: __correlated_sq_1.mark OR __correlated_sq_2.mark AND NOT __correlated_sq_3.mark AND Boolean(NULL) IS NULL +03)----Projection: outer_table.value, __correlated_sq_1.mark, __correlated_sq_2.mark, __correlated_sq_3.mark +04)------LeftMark Join: Filter: outer_table.id = __correlated_sq_3.id IS TRUE +05)--------LeftMark Join: Filter: outer_table.id = __correlated_sq_2.id IS NULL +06)----------LeftMark Join: Filter: outer_table.id = __correlated_sq_1.id IS TRUE +07)------------TableScan: outer_table projection=[id, value] +08)------------SubqueryAlias: __correlated_sq_1 +09)--------------TableScan: inner_table_with_null projection=[id] +10)----------SubqueryAlias: __correlated_sq_2 +11)------------TableScan: inner_table_with_null projection=[id] +12)--------SubqueryAlias: __correlated_sq_3 +13)----------TableScan: inner_table_with_null projection=[id] +physical_plan +01)FilterExec: mark@1 OR mark@2 AND NOT mark@3 AND NULL IS NULL, projection=[value@0] +02)--NestedLoopJoinExec: join_type=RightMark, filter=(id@0 = id@1) IS NOT DISTINCT FROM true, projection=[value@1, mark@2, mark@3, mark@4] +03)----DataSourceExec: partitions=1, partition_sizes=[1] +04)----NestedLoopJoinExec: join_type=RightMark, filter=id@0 = id@1 IS NULL +05)------DataSourceExec: partitions=1, partition_sizes=[1] +06)------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +07)--------NestedLoopJoinExec: join_type=RightMark, filter=(id@0 = id@1) IS NOT DISTINCT FROM true +08)----------DataSourceExec: partitions=1, partition_sizes=[1] +09)----------DataSourceExec: partitions=1, partition_sizes=[1] + +query T rowsort +SELECT value +FROM outer_table +WHERE (id = ANY (SELECT id FROM inner_table_with_null)) IS NULL; +---- +a +c +d +e + +query TT +EXPLAIN SELECT value +FROM outer_table +WHERE (id = ANY (SELECT id FROM inner_table_no_null)) IS TRUE; +---- +logical_plan +01)Projection: outer_table.value +02)--Filter: __correlated_sq_1.mark OR __correlated_sq_2.mark AND NOT __correlated_sq_3.mark AND Boolean(NULL) IS TRUE +03)----Projection: outer_table.value, __correlated_sq_1.mark, __correlated_sq_2.mark, __correlated_sq_3.mark +04)------LeftMark Join: Filter: outer_table.id = __correlated_sq_3.id IS TRUE +05)--------LeftMark Join: Filter: outer_table.id = __correlated_sq_2.id IS NULL +06)----------LeftMark Join: Filter: outer_table.id = __correlated_sq_1.id IS TRUE +07)------------TableScan: outer_table projection=[id, value] +08)------------SubqueryAlias: __correlated_sq_1 +09)--------------TableScan: inner_table_no_null projection=[id] +10)----------SubqueryAlias: __correlated_sq_2 +11)------------TableScan: inner_table_no_null projection=[id] +12)--------SubqueryAlias: __correlated_sq_3 +13)----------TableScan: inner_table_no_null projection=[id] +physical_plan +01)FilterExec: (mark@1 OR mark@2 AND NOT mark@3 AND NULL) IS NOT DISTINCT FROM true, projection=[value@0] +02)--NestedLoopJoinExec: join_type=RightMark, filter=(id@0 = id@1) IS NOT DISTINCT FROM true, projection=[value@1, mark@2, mark@3, mark@4] +03)----DataSourceExec: partitions=1, partition_sizes=[1] +04)----NestedLoopJoinExec: join_type=RightMark, filter=id@0 = id@1 IS NULL +05)------DataSourceExec: partitions=1, partition_sizes=[1] +06)------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +07)--------NestedLoopJoinExec: join_type=RightMark, filter=(id@0 = id@1) IS NOT DISTINCT FROM true +08)----------DataSourceExec: partitions=1, partition_sizes=[1] +09)----------DataSourceExec: partitions=1, partition_sizes=[1] + +query T rowsort +SELECT value +FROM outer_table +WHERE (id = ANY (SELECT id FROM inner_table_no_null)) IS TRUE; +---- +b +d + +# Same plan shape as the `= ANY (... with_null) IS NULL` case above; only the +# result differs, so assert just the result here. +query T rowsort +SELECT value +FROM outer_table +WHERE (id = ANY (SELECT id FROM inner_table_no_null)) IS NULL; +---- +e + +# Positive IN goes through InSubquery decorrelation rather than the = ANY +# CASE rewrite; make sure its mark is null-aware too. +query TT +EXPLAIN SELECT value +FROM outer_table +WHERE (id IN (SELECT id FROM inner_table_with_null)) IS NULL; +---- +logical_plan +01)Projection: outer_table.value +02)--Filter: __correlated_sq_1.mark IS NULL +03)----Projection: outer_table.value, __correlated_sq_1.mark +04)------LeftMark Join: outer_table.id = __correlated_sq_1.id null_aware +05)--------TableScan: outer_table projection=[id, value] +06)--------SubqueryAlias: __correlated_sq_1 +07)----------TableScan: inner_table_with_null projection=[id] +physical_plan +01)FilterExec: mark@1 IS NULL, projection=[value@0] +02)--RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +03)----HashJoinExec: mode=CollectLeft, join_type=LeftMark, on=[(id@0, id@0)], projection=[value@1, mark@2], null_aware +04)------DataSourceExec: partitions=1, partition_sizes=[1] +05)------DataSourceExec: partitions=1, partition_sizes=[1] + +query T rowsort +SELECT value +FROM outer_table +WHERE (id IN (SELECT id FROM inner_table_with_null)) IS NULL; +---- +a +c +d +e + +# Null-aware mark joins are pinned to CollectLeft hash joins (mark joins have no +# sort-merge path); flipping prefer_hash_join off must not drop null awareness. + +statement ok +set datafusion.optimizer.prefer_hash_join = false; + +query TT +EXPLAIN SELECT id, value +FROM outer_table +WHERE (id NOT IN (SELECT id FROM inner_table_with_null)) IS NULL; +---- +logical_plan +01)Projection: outer_table.id, outer_table.value +02)--Filter: NOT __correlated_sq_1.mark IS NULL +03)----LeftMark Join: outer_table.id = __correlated_sq_1.id null_aware +04)------TableScan: outer_table projection=[id, value] +05)------SubqueryAlias: __correlated_sq_1 +06)--------TableScan: inner_table_with_null projection=[id] +physical_plan +01)FilterExec: NOT mark@2 IS NULL, projection=[id@0, value@1] +02)--RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +03)----HashJoinExec: mode=CollectLeft, join_type=LeftMark, on=[(id@0, id@0)], null_aware +04)------DataSourceExec: partitions=1, partition_sizes=[1] +05)------DataSourceExec: partitions=1, partition_sizes=[1] + +#################################### +## Nested loop mark join with NULLs +#################################### + +statement ok +set datafusion.execution.target_partitions = 1; + +statement ok +set datafusion.optimizer.prefer_hash_join = true; + +query TT +EXPLAIN SELECT value +FROM outer_table +WHERE (EXISTS ( + SELECT 1 + FROM inner_table_no_null + WHERE outer_table.id < inner_table_no_null.id +)) IS TRUE; +---- +logical_plan +01)Projection: outer_table.value +02)--Filter: __correlated_sq_1.mark IS TRUE +03)----Projection: outer_table.value, __correlated_sq_1.mark +04)------LeftMark Join: Filter: outer_table.id < __correlated_sq_1.id +05)--------TableScan: outer_table projection=[id, value] +06)--------SubqueryAlias: __correlated_sq_1 +07)----------TableScan: inner_table_no_null projection=[id] +physical_plan +01)FilterExec: mark@1 IS NOT DISTINCT FROM true, projection=[value@0] +02)--NestedLoopJoinExec: join_type=RightMark, filter=id@0 < id@1, projection=[value@1, mark@2] +03)----DataSourceExec: partitions=1, partition_sizes=[1] +04)----DataSourceExec: partitions=1, partition_sizes=[1] + +query T rowsort +SELECT value +FROM outer_table +WHERE (EXISTS ( + SELECT 1 + FROM inner_table_no_null + WHERE outer_table.id < inner_table_no_null.id +)) IS TRUE; +---- +a +b +c + +statement ok +reset datafusion.optimizer.prefer_hash_join; + +statement ok +reset datafusion.optimizer.repartition_joins; + +statement ok +set datafusion.execution.target_partitions = 4; + +statement ok +DROP TABLE empty_table; + +statement ok +DROP TABLE inner_table_with_null; + +statement ok +DROP TABLE inner_table_no_null; + +statement ok +DROP TABLE inner_corr_table; + +statement ok +DROP TABLE outer_corr_table; + +statement ok +DROP TABLE outer_table;