[spark] Lazy partition pruning for engine format table#8300
Conversation
d294391 to
c97bb41
Compare
| leafDirToChildrenFiles | ||
| } | ||
|
|
||
| override def refresh(): Unit = fileStatusCache.invalidateAll() |
There was a problem hiding this comment.
This only invalidates the shared FileStatusCache. Once fullIndex has been initialized, it still keeps its own cached leaf files, leaf-dir map, and partition spec, while Spark's InMemoryFileIndex.refresh() also calls refresh0() to rebuild those fields. I reproduced this by listing an index with pt=1, creating pt=2, calling refresh(), and then listFiles(Nil, Nil) still returned only pt=1. Please refresh/recreate fullIndex here so REFRESH TABLE and write refresh paths do not leave unfiltered scans/allFiles/partitionSpec stale.
There was a problem hiding this comment.
Thanks for the review. I investigated the refresh() path:
REFRESH TABLE in Spark V2 goes through RefreshTableExec → catalog.invalidateTable(ident), which causes the next query to call loadTable() and recreate the entire table instance (including a fresh LazyPartitionPruningFileIndex). FileIndex.refresh() is not called in this path. Same pattern as CatalogFileIndex, which also uses an immutable val sizeInBytes and its refresh() only clears fileStatusCache.
Although FileIndex.refresh() is currently only called from V1 paths (InsertIntoHadoopFsRelationCommand, CacheManager.recacheByPath) which engine format tables don't hit, I've made fullIndex resettable via refresh() (using @volatile var + double-checked locking) to avoid potential issues in the future. This ensures refresh() behaves consistently with InMemoryFileIndex.
Added a config spark.paimon.format-table.engine.lazy-partition-pruning (default true). Set to false to fall back to eager listing, which may be better for small tables queried repeatedly without partition filters — eager listing caches all files at construction and avoids per-query directory traversal overhead.
c97bb41 to
495f0c7
Compare
Replace the eager InMemoryFileIndex (which recursively lists all files at construction time) with LazyPartitionPruningFileIndex that defers file listing until listFiles() is called and prunes partition directories level-by-level using partition filters. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
495f0c7 to
92faa84
Compare
Change lazy val fullIndex to @volatile var with double-checked locking. refresh() now resets fullIndex so that subsequent calls see fresh data, consistent with InMemoryFileIndex.refresh() behavior. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
When fullIndex is already created (e.g. by sizeInBytes during join planning), filtered listFiles reuses it for in-memory pruning instead of re-traversing the filesystem via discoverPartitions. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
| override def partitionSpec(): PartitionSpec = | ||
| PartitionSpec(_partitionSchema, Seq.empty) | ||
|
|
||
| override def sizeInBytes: Long = fullIndex.sizeInBytes |
There was a problem hiding this comment.
Spark also asks the scan for statistics while planning some queries. For file scans, estimateStatistics() calls fileIndex.sizeInBytes, so this initializes fullIndex by recursively listing the whole table before listFiles can apply the partition filters. I reproduced this on the latest head with a 20x30 partitioned format table: SELECT * FROM t JOIN d ON t.p1 = d.p1 WHERE t.p1 = 1 discovers 600 files instead of the expected 30 because planning touches sizeInBytes, and then _fullIndex != null makes the filtered read use the eager index too. Could we avoid constructing fullIndex for lazy-path stats, or keep filtered listFiles lazy even after stats are requested?
There was a problem hiding this comment.
The 600 files discovered comes from sizeInBytes triggered by DataSourceV2ScanRelation.computeStats() during planning, not from listFiles. When _fullIndex != null, all files have already been listed and cached in memory — filtered listFiles simply reuses fullIndex for in-memory prunePartitions with zero additional FS calls and zero additional FILES_DISCOVERED. Without the _fullIndex != null check, discoverPartitions would create a prunedIndex whose paths miss FileStatusCache (keyed by root path, not sub-paths), adding more to the metric (verified: 16 vs 15 in a 5×3 table).
To answer your two questions: (1) In Spark's query lifecycle, sizeInBytes is called during planning before listFiles during execution. At that point no listFiles results are available yet, so fullIndex must list all files. If the order were reversed, we could compute sizeInBytes from listFiles results without full listing. This requires optimizing Spark's FileScan to defer or reorder stats computation, which is beyond this PR. (2) Keeping filtered listFiles lazy after stats are requested would actually be worse — it adds redundant FS calls and increases FILES_DISCOVERED due to FileStatusCache key mismatch.
There was a problem hiding this comment.
Thanks for checking. I agree that once fullIndex has already been materialized, reusing it in listFiles is better than doing another filesystem discovery. My remaining concern is the earlier materialization itself: for filtered joins, planning still performs the full-table listing before execution, so the lazy path does not help that common query shape. Also, Spark’s FileScan.estimateStatistics() uses the index-wide sizeInBytes and does not apply the partition filters, so the full listing is not making the filtered scan stats partition-aware either. Could the lazy index return a cheap unknown/default estimate while _fullIndex is still null, and only delegate to fullIndex.sizeInBytes after an unfiltered scan or inputFiles has already materialized it? That would preserve lazy pruning for plans that touch stats without adding the redundant execution listing you mentioned.
Purpose
Replace the eager
InMemoryFileIndex(which recursively lists all files at construction time) withLazyPartitionPruningFileIndexthat defers file listing untillistFiles()is called and prunes partition directories level-by-level using partition filters.For a table with 20×30=600 partitions, querying a single partition (
p1=1 AND p2=1) now discovers 1 file instead of 600. Range queries (p1>15) and non-leading column filters (p2=1) also benefit from per-level pruning.Controlled by
spark.paimon.format-table.engine.lazy-partition-pruning(defaulttrue). Set tofalseto fall back to eager listing.Tests