Skip to content

feat: parallel sort-preserving merge (PSRS) for >2x merge speedup (demo)#23124

Draft
Dandandan wants to merge 1 commit into
apache:mainfrom
Dandandan:feat/parallel-sort-preserving-merge
Draft

feat: parallel sort-preserving merge (PSRS) for >2x merge speedup (demo)#23124
Dandandan wants to merge 1 commit into
apache:mainfrom
Dandandan:feat/parallel-sort-preserving-merge

Conversation

@Dandandan

@Dandandan Dandandan commented Jun 23, 2026

Copy link
Copy Markdown
Contributor

Which issue does this PR close?

  • TODO

Rationale for this change

SortPreservingMergeExec performs a single-threaded k-way merge of all input
partitions on one thread. For merge-bound sort queries (e.g. ORDER BY over many
sorted partitions) this leaves most cores idle while one thread does all the work.

This PR adds a parallel order-preserving merge that splits the merge across
target_partitions threads, giving a large speedup on merge-bound queries:

  • ~2.36× on the sort_preserving_merge microbenchmark (1M rows × 3
    partitions, single u64 key): 33.7 ms → 14.3 ms.
  • ~2–4× on sort_tpch (Q1 2.54×, Q2 2.31×, Q10 1.93×).
┏━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query ┃                               HEAD ┃ feat_parallel-sort-preserving-merge ┃        Change ┃
┡━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ Q1    │  166.48 / 167.33 ±0.74 / 168.21 ms │      59.48 / 60.60 ±0.94 / 61.79 ms │ +2.76x faster │
│ Q2    │  147.59 / 149.24 ±1.72 / 152.56 ms │      36.25 / 39.73 ±3.43 / 45.47 ms │ +3.76x faster │
│ Q3    │ 667.18 / 676.09 ±12.07 / 699.81 ms │  222.54 / 239.92 ±10.35 / 250.85 ms │ +2.82x faster │
│ Q4    │  216.89 / 219.83 ±3.86 / 227.11 ms │      51.12 / 55.04 ±3.36 / 60.10 ms │ +3.99x faster │
│ Q5    │  298.30 / 300.06 ±2.05 / 304.08 ms │   158.35 / 169.90 ±6.64 / 178.49 ms │ +1.77x faster │
│ Q6    │  313.50 / 314.43 ±0.77 / 315.73 ms │  163.59 / 180.79 ±13.48 / 202.09 ms │ +1.74x faster │
│ Q7    │  489.92 / 494.97 ±4.13 / 502.27 ms │   269.47 / 277.80 ±6.58 / 287.00 ms │ +1.78x faster │
│ Q8    │  344.36 / 350.08 ±5.29 / 356.61 ms │   126.04 / 130.75 ±3.31 / 134.35 ms │ +2.68x faster │
│ Q9    │ 355.73 / 364.90 ±10.77 / 385.29 ms │   139.04 / 145.44 ±4.20 / 151.56 ms │ +2.51x faster │
│ Q10   │ 526.06 / 548.25 ±23.28 / 582.66 ms │  206.33 / 221.68 ±16.53 / 250.83 ms │ +2.47x faster │
│ Q11   │ 243.37 / 260.26 ±30.63 / 321.46 ms │   94.10 / 108.05 ±11.59 / 121.44 ms │ +2.41x faster │
└───────┴────────────────────────────────────┴─────────────────────────────────────┴───────────────┘

What changes are included in this PR?

  • ParallelSortPreservingMergeExec (sorts/parallel_merge.rs) — a parallel
    order-preserving merge using Parallel Sorting by Regular Sampling (PSRS):

    1. Materialize each locally-sorted input into a run (payload kept zero-copy;
      sort keys encoded on demand with a single shared RowConverter so all keys
      are byte-comparable).
    2. Draw a regular sample of keys across runs and pick P-1 pivots.
    3. Cut every run by the same pivots via binary search (lower_bound).
    4. Merge the P key-range buckets concurrently — one SpawnedTask each,
      reusing the existing optimized loser-tree StreamingMergeBuilder.
    5. Concatenate the buckets in order → one globally-sorted partition, the same
      output contract as SortPreservingMergeExec.

    Correctness does not depend on balanced pivots; regular sampling only affects
    load balance. Output partitioning is UnknownPartitioning(1).

  • ParallelSortMerge physical-optimizer rule (parallel_sort_merge.rs, runs
    after PushdownSort) that replaces an eligible SortPreservingMergeExec with
    the parallel exec. Gated by datafusion.optimizer.enable_parallel_sort_merge
    (default true). Eligible iff: no fetch/limit, bounded input, > 1 input
    partition, and a known row count >= batch_size * target_partitions
    (unknown-size inputs keep the serial merge to avoid regressing small data).

  • A --parallel-merge A/B flag on the sort_tpch benchmark, a parallel variant
    in the sort_preserving_merge criterion bench, and regenerated configs.md.

Limitations

It is pipeline-breaking: it materializes all of its input and emits nothing
until every bucket is merged, so it never honors a pushed-down fetch/limit and is
restricted to bounded inputs. Peak memory ≈ full input + output (tracked via
MemoryReservations, not currently spillable). Queries needing early termination
keep using SortPreservingMergeExec. Marked draft to gather feedback on the
default and the memory trade-off.

Are these changes tested?

Yes:

  • Unit tests (sorts::parallel_merge): output matches SortPreservingMergeExec
    for unique keys, heavy low-cardinality ties, descending + nulls, uneven/empty
    partitions, more-buckets-than-rows, and single-partition passthrough.
  • The optimizer-rule path is exercised end-to-end via sqllogictest with the flag
    on by default; EXPLAIN plans in a few files now show
    ParallelSortPreservingMergeExec (results are unchanged — the output is
    identical to the serial merge).
  • Benchmarks above (microbench + sort_tpch).

Are there any user-facing changes?

  • New config datafusion.optimizer.enable_parallel_sort_merge (default true).
  • EXPLAIN shows ParallelSortPreservingMergeExec where the rule applies, and
    such plans avoid the single-threaded merge bottleneck (at the cost of
    materializing the input). No public API breakage.

🤖 Generated with Claude Code

Add `ParallelSortPreservingMergeExec` (`sorts/parallel_merge.rs`), a parallel
order-preserving merge of sorted partitions using Parallel Sorting by Regular
Sampling (PSRS): materialize each locally-sorted input, pick `P-1` pivots by
regular sampling, cut every run by the same pivots (binary search on
byte-comparable row encodings), then merge the `P` key-range buckets
concurrently (one `SpawnedTask` each, reusing the loser-tree
`StreamingMergeBuilder`) and concatenate the buckets in order. Output is a
single globally-sorted partition — same contract as `SortPreservingMergeExec`,
computed with `target_partitions`-way parallelism.

A new `JoinSelection`-style physical-optimizer rule `ParallelSortMerge`
(`parallel_sort_merge.rs`, run after `PushdownSort`) replaces an eligible
`SortPreservingMergeExec` with the parallel exec, gated by
`datafusion.optimizer.enable_parallel_sort_merge` (default true). Eligibility:
no fetch/limit, bounded input, >1 input partition, and a *known* row count
>= `batch_size * target_partitions` (unknown-size inputs keep the serial merge
to avoid regressing small data).

It is pipeline-breaking (materializes all input, no early-stop), so it never
honors a pushed-down limit and is restricted to bounded inputs.

Measured: ~2.36x on the `sort_preserving_merge` bench (1M rows x 3 partitions,
u64); ~2-2.5x on sort_tpch (Q1 2.54x, Q2 2.31x, Q10 1.93x). A `--parallel-merge`
A/B flag is added to the sort_tpch benchmark.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@github-actions github-actions Bot added documentation Improvements or additions to documentation optimizer Optimizer rules sqllogictest SQL Logic Tests (.slt) common Related to common crate physical-plan Changes to the physical-plan crate labels Jun 23, 2026
@Dandandan

Copy link
Copy Markdown
Contributor Author

run benchmark sort_tpch

@adriangbot

Copy link
Copy Markdown

🤖 Benchmark running (GKE) | trigger
Instance: c4a-highmem-16 (12 vCPU / 65 GiB) | Linux bench-c4779508894-627-kkbts 6.12.68+ #1 SMP Sat May 2 07:49:07 UTC 2026 aarch64 GNU/Linux

CPU Details (lscpu)
Architecture:                            aarch64
CPU op-mode(s):                          64-bit
Byte Order:                              Little Endian
CPU(s):                                  16
On-line CPU(s) list:                     0-15
Vendor ID:                               ARM
Model name:                              Neoverse-V2
Model:                                   1
Thread(s) per core:                      1
Core(s) per cluster:                     16
Socket(s):                               -
Cluster(s):                              1
Stepping:                                r0p1
BogoMIPS:                                2000.00
Flags:                                   fp asimd evtstrm aes pmull sha1 sha2 crc32 atomics fphp asimdhp cpuid asimdrdm jscvt fcma lrcpc dcpop sha3 sm3 sm4 asimddp sha512 sve asimdfhm dit uscat ilrcpc flagm sb paca pacg dcpodp sve2 sveaes svepmull svebitperm svesha3 svesm4 flagm2 frint svei8mm svebf16 i8mm bf16 dgh rng bti
L1d cache:                               1 MiB (16 instances)
L1i cache:                               1 MiB (16 instances)
L2 cache:                                32 MiB (16 instances)
L3 cache:                                80 MiB (1 instance)
NUMA node(s):                            1
NUMA node0 CPU(s):                       0-15
Vulnerability Gather data sampling:      Not affected
Vulnerability Indirect target selection: Not affected
Vulnerability Itlb multihit:             Not affected
Vulnerability L1tf:                      Not affected
Vulnerability Mds:                       Not affected
Vulnerability Meltdown:                  Not affected
Vulnerability Mmio stale data:           Not affected
Vulnerability Reg file data sampling:    Not affected
Vulnerability Retbleed:                  Not affected
Vulnerability Spec rstack overflow:      Not affected
Vulnerability Spec store bypass:         Mitigation; Speculative Store Bypass disabled via prctl
Vulnerability Spectre v1:                Mitigation; __user pointer sanitization
Vulnerability Spectre v2:                Mitigation; CSV2, BHB
Vulnerability Srbds:                     Not affected
Vulnerability Tsa:                       Not affected
Vulnerability Tsx async abort:           Not affected
Vulnerability Vmscape:                   Not affected

Comparing feat/parallel-sort-preserving-merge (611030b) to dc92bb8 (merge-base) diff using: sort_tpch
Results will be posted here when complete


File an issue against this benchmark runner

@github-actions

Copy link
Copy Markdown

Thank you for opening this pull request!

Reviewer note: cargo-semver-checks reported the current version number is not SemVer-compatible with the changes in this pull request (compared against the base branch).

Details
     Cloning apache/main
    Building datafusion-common v54.0.0 (current)
       Built [  25.659s] (current)
     Parsing datafusion-common v54.0.0 (current)
      Parsed [   0.051s] (current)
    Building datafusion-common v54.0.0 (baseline)
       Built [  26.123s] (baseline)
     Parsing datafusion-common v54.0.0 (baseline)
      Parsed [   0.050s] (baseline)
    Checking datafusion-common v54.0.0 -> v54.0.0 (no change; assume patch)
     Checked [   0.797s] 223 checks: 222 pass, 1 fail, 0 warn, 30 skip

--- failure constructible_struct_adds_field: externally-constructible struct adds field ---

Description:
A pub struct constructible with a struct literal has a new pub field. Existing struct literals must be updated to include the new field.
        ref: https://doc.rust-lang.org/reference/expressions/struct-expr.html
       impl: https://github.com/obi1kenobi/cargo-semver-checks/tree/v0.48.0/src/lints/constructible_struct_adds_field.ron

Failed in:
  field OptimizerOptions.enable_parallel_sort_merge in /home/runner/work/datafusion/datafusion/datafusion/common/src/config.rs:1347

     Summary semver requires new major version: 1 major and 0 minor checks failed
    Finished [  53.705s] datafusion-common
    Building datafusion-physical-optimizer v54.0.0 (current)
       Built [  29.906s] (current)
     Parsing datafusion-physical-optimizer v54.0.0 (current)
      Parsed [   0.019s] (current)
    Building datafusion-physical-optimizer v54.0.0 (baseline)
       Built [  30.283s] (baseline)
     Parsing datafusion-physical-optimizer v54.0.0 (baseline)
      Parsed [   0.019s] (baseline)
    Checking datafusion-physical-optimizer v54.0.0 -> v54.0.0 (no change; assume patch)
     Checked [   0.130s] 223 checks: 223 pass, 30 skip
     Summary no semver update required
    Finished [  61.156s] datafusion-physical-optimizer
    Building datafusion-physical-plan v54.0.0 (current)
       Built [  28.619s] (current)
     Parsing datafusion-physical-plan v54.0.0 (current)
      Parsed [   0.110s] (current)
    Building datafusion-physical-plan v54.0.0 (baseline)
       Built [  27.769s] (baseline)
     Parsing datafusion-physical-plan v54.0.0 (baseline)
      Parsed [   0.111s] (baseline)
    Checking datafusion-physical-plan v54.0.0 -> v54.0.0 (no change; assume patch)
     Checked [   0.690s] 223 checks: 223 pass, 30 skip
     Summary no semver update required
    Finished [  58.283s] datafusion-physical-plan
    Building datafusion-sqllogictest v54.0.0 (current)
       Built [ 141.178s] (current)
     Parsing datafusion-sqllogictest v54.0.0 (current)
      Parsed [   0.021s] (current)
    Building datafusion-sqllogictest v54.0.0 (baseline)
       Built [ 141.620s] (baseline)
     Parsing datafusion-sqllogictest v54.0.0 (baseline)
      Parsed [   0.020s] (baseline)
    Checking datafusion-sqllogictest v54.0.0 -> v54.0.0 (no change; assume patch)
     Checked [   0.095s] 223 checks: 223 pass, 30 skip
     Summary no semver update required
    Finished [ 285.292s] datafusion-sqllogictest

@github-actions github-actions Bot added the auto detected api change Auto detected API change label Jun 23, 2026
@adriangbot

Copy link
Copy Markdown

🤖 Benchmark completed (GKE) | trigger

Instance: c4a-highmem-16 (12 vCPU / 65 GiB)

CPU Details (lscpu)
Architecture:                            aarch64
CPU op-mode(s):                          64-bit
Byte Order:                              Little Endian
CPU(s):                                  16
On-line CPU(s) list:                     0-15
Vendor ID:                               ARM
Model name:                              Neoverse-V2
Model:                                   1
Thread(s) per core:                      1
Core(s) per cluster:                     16
Socket(s):                               -
Cluster(s):                              1
Stepping:                                r0p1
BogoMIPS:                                2000.00
Flags:                                   fp asimd evtstrm aes pmull sha1 sha2 crc32 atomics fphp asimdhp cpuid asimdrdm jscvt fcma lrcpc dcpop sha3 sm3 sm4 asimddp sha512 sve asimdfhm dit uscat ilrcpc flagm sb paca pacg dcpodp sve2 sveaes svepmull svebitperm svesha3 svesm4 flagm2 frint svei8mm svebf16 i8mm bf16 dgh rng bti
L1d cache:                               1 MiB (16 instances)
L1i cache:                               1 MiB (16 instances)
L2 cache:                                32 MiB (16 instances)
L3 cache:                                80 MiB (1 instance)
NUMA node(s):                            1
NUMA node0 CPU(s):                       0-15
Vulnerability Gather data sampling:      Not affected
Vulnerability Indirect target selection: Not affected
Vulnerability Itlb multihit:             Not affected
Vulnerability L1tf:                      Not affected
Vulnerability Mds:                       Not affected
Vulnerability Meltdown:                  Not affected
Vulnerability Mmio stale data:           Not affected
Vulnerability Reg file data sampling:    Not affected
Vulnerability Retbleed:                  Not affected
Vulnerability Spec rstack overflow:      Not affected
Vulnerability Spec store bypass:         Mitigation; Speculative Store Bypass disabled via prctl
Vulnerability Spectre v1:                Mitigation; __user pointer sanitization
Vulnerability Spectre v2:                Mitigation; CSV2, BHB
Vulnerability Srbds:                     Not affected
Vulnerability Tsa:                       Not affected
Vulnerability Tsx async abort:           Not affected
Vulnerability Vmscape:                   Not affected
Details

Comparing HEAD and feat_parallel-sort-preserving-merge
--------------------
Benchmark sort_tpch1.json
--------------------
┏━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query ┃                               HEAD ┃ feat_parallel-sort-preserving-merge ┃        Change ┃
┡━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ Q1    │  166.48 / 167.33 ±0.74 / 168.21 ms │      59.48 / 60.60 ±0.94 / 61.79 ms │ +2.76x faster │
│ Q2    │  147.59 / 149.24 ±1.72 / 152.56 ms │      36.25 / 39.73 ±3.43 / 45.47 ms │ +3.76x faster │
│ Q3    │ 667.18 / 676.09 ±12.07 / 699.81 ms │  222.54 / 239.92 ±10.35 / 250.85 ms │ +2.82x faster │
│ Q4    │  216.89 / 219.83 ±3.86 / 227.11 ms │      51.12 / 55.04 ±3.36 / 60.10 ms │ +3.99x faster │
│ Q5    │  298.30 / 300.06 ±2.05 / 304.08 ms │   158.35 / 169.90 ±6.64 / 178.49 ms │ +1.77x faster │
│ Q6    │  313.50 / 314.43 ±0.77 / 315.73 ms │  163.59 / 180.79 ±13.48 / 202.09 ms │ +1.74x faster │
│ Q7    │  489.92 / 494.97 ±4.13 / 502.27 ms │   269.47 / 277.80 ±6.58 / 287.00 ms │ +1.78x faster │
│ Q8    │  344.36 / 350.08 ±5.29 / 356.61 ms │   126.04 / 130.75 ±3.31 / 134.35 ms │ +2.68x faster │
│ Q9    │ 355.73 / 364.90 ±10.77 / 385.29 ms │   139.04 / 145.44 ±4.20 / 151.56 ms │ +2.51x faster │
│ Q10   │ 526.06 / 548.25 ±23.28 / 582.66 ms │  206.33 / 221.68 ±16.53 / 250.83 ms │ +2.47x faster │
│ Q11   │ 243.37 / 260.26 ±30.63 / 321.46 ms │   94.10 / 108.05 ±11.59 / 121.44 ms │ +2.41x faster │
└───────┴────────────────────────────────────┴─────────────────────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Benchmark Summary                                  ┃           ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ Total Time (HEAD)                                  │ 3845.44ms │
│ Total Time (feat_parallel-sort-preserving-merge)   │ 1629.70ms │
│ Average Time (HEAD)                                │  349.59ms │
│ Average Time (feat_parallel-sort-preserving-merge) │  148.15ms │
│ Queries Faster                                     │        11 │
│ Queries Slower                                     │         0 │
│ Queries with No Change                             │         0 │
│ Queries with Failure                               │         0 │
└────────────────────────────────────────────────────┴───────────┘

Resource Usage

sort_tpch — base (merge-base)

Metric Value
Wall time 20.0s
Peak memory 2.4 GiB
Avg memory 1.2 GiB
CPU user 67.7s
CPU sys 3.4s
Peak spill 0 B

sort_tpch — branch

Metric Value
Wall time 10.0s
Peak memory 3.4 GiB
Avg memory 1.5 GiB
CPU user 61.9s
CPU sys 2.2s
Peak spill 0 B

File an issue against this benchmark runner

@Dandandan Dandandan changed the title feat: parallel sort-preserving merge (PSRS) for >2x merge speedup feat: parallel sort-preserving merge (PSRS) for >2x merge speedup (demo) Jun 23, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

auto detected api change Auto detected API change common Related to common crate documentation Improvements or additions to documentation optimizer Optimizer rules physical-plan Changes to the physical-plan crate sqllogictest SQL Logic Tests (.slt)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants