From fb5aff78a501b6ac0048403c5188265480ab614b Mon Sep 17 00:00:00 2001 From: Eduard Tudenhoefner Date: Mon, 22 Jun 2026 16:44:50 +0200 Subject: [PATCH 1/2] GH-3628: prevent NPE & unclosed releaser --- .../hadoop/ColumnChunkPageReadStore.java | 11 +- .../hadoop/TestColumnChunkPageReadStore.java | 149 ++++++++++++++++++ 2 files changed, 157 insertions(+), 3 deletions(-) create mode 100644 parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageReadStore.java diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java index c7fc22b29f..fc6d47f885 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java @@ -410,9 +410,14 @@ void setReleaser(ByteBufferReleaser releaser) { @Override public void close() { - for (ColumnChunkPageReader reader : readers.values()) { - reader.releaseBuffers(); + try { + for (ColumnChunkPageReader reader : readers.values()) { + reader.releaseBuffers(); + } + } finally { + if (releaser != null) { + releaser.close(); + } } - releaser.close(); } } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageReadStore.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageReadStore.java new file mode 100644 index 0000000000..00d205e136 --- /dev/null +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageReadStore.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.hadoop; + +import static org.apache.parquet.column.Encoding.PLAIN; +import static org.apache.parquet.column.Encoding.RLE; +import static org.junit.Assert.assertEquals; + +import java.nio.ByteBuffer; +import java.util.Collections; +import org.apache.parquet.ParquetReadOptions; +import org.apache.parquet.bytes.ByteBufferAllocator; +import org.apache.parquet.bytes.ByteBufferReleaser; +import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.bytes.HeapByteBufferAllocator; +import org.apache.parquet.bytes.TrackingByteBufferAllocator; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.page.DataPage; +import org.apache.parquet.column.page.DataPageV1; +import org.apache.parquet.compression.CompressionCodecFactory.BytesInputDecompressor; +import org.apache.parquet.hadoop.ColumnChunkPageReadStore.ColumnChunkPageReader; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; +import org.apache.parquet.schema.Type.Repetition; +import org.junit.Test; + +public class TestColumnChunkPageReadStore { + + private static final ColumnDescriptor COLUMN = new ColumnDescriptor( + new String[] {"x"}, new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.INT32, "x"), 0, 0); + + private static final BytesInputDecompressor NOOP_DECOMPRESSOR = new BytesInputDecompressor() { + @Override + public BytesInput decompress(BytesInput bytes, int decompressedSize) { + return bytes; + } + + @Override + public void decompress(ByteBuffer input, int compressedSize, ByteBuffer output, int decompressedSize) {} + + @Override + public void release() {} + }; + + @Test + public void closeWithoutSetReleaserDoesNotThrow() { + try (TrackingByteBufferAllocator allocator = TrackingByteBufferAllocator.wrap(new HeapByteBufferAllocator())) { + ParquetReadOptions options = + ParquetReadOptions.builder().withAllocator(allocator).build(); + + ColumnChunkPageReadStore store = new ColumnChunkPageReadStore(0L); + store.addColumn(COLUMN, newReaderWithoutPages(options)); + + // setReleaser() is intentionally NOT called here. + store.close(); + } + } + + @Test + public void closeReleasesReleaserEvenWhenReaderThrows() throws Exception { + RuntimeException releaseFailure = new RuntimeException("release boom"); + + ByteBufferAllocator throwingAllocator = new ByteBufferAllocator() { + @Override + public ByteBuffer allocate(int size) { + return ByteBuffer.allocateDirect(size); + } + + @Override + public void release(ByteBuffer b) { + throw releaseFailure; + } + + @Override + public boolean isDirect() { + return true; + } + }; + + try (TrackingByteBufferAllocator storeAllocator = + TrackingByteBufferAllocator.wrap(new HeapByteBufferAllocator())) { + ColumnChunkPageReadStore store = new ColumnChunkPageReadStore(1L); + store.addColumn(COLUMN, newReaderWithQueuedBuffer(throwingAllocator)); + + // The store-level releaser holds a tracked buffer that must be released by close()'s finally block. + ByteBufferReleaser storeReleaser = new ByteBufferReleaser(storeAllocator); + storeReleaser.releaseLater(storeAllocator.allocate(8)); + store.setReleaser(storeReleaser); + + try { + store.close(); + throw new AssertionError("Expected close() to propagate the reader failure"); + } catch (RuntimeException e) { + assertEquals(releaseFailure, e); + } + } + } + + private static ColumnChunkPageReader newReaderWithoutPages(ParquetReadOptions options) { + return new ColumnChunkPageReader( + NOOP_DECOMPRESSOR, Collections.emptyList(), null, null, 0L, null, null, 0, 0, options); + } + + private static ColumnChunkPageReader newReaderWithQueuedBuffer(ByteBufferAllocator allocator) { + ParquetReadOptions options = ParquetReadOptions.builder() + .withAllocator(allocator) + .useOffHeapDecryptBuffer(true) + .build(); + + ByteBuffer pageBytes = ByteBuffer.allocateDirect(4); + pageBytes.putInt(0, 42); + DataPageV1 page = new DataPageV1(BytesInput.from(pageBytes), 1, 4, null, RLE, RLE, PLAIN); + + ColumnChunkPageReader reader = new ColumnChunkPageReader( + NOOP_DECOMPRESSOR, + Collections.singletonList(page), + null, + null, + 1L, + null, + null, + 0, + 0, + options); + + // Reading the page through the off-heap path queues a buffer into the reader's internal releaser, so + // releaseBuffers() will later invoke the throwing allocator's release(). + if (reader.readPage() == null) { + throw new IllegalStateException("Expected a page to be read"); + } + return reader; + } +} From 90ecae43d97fedf9e2ad8528facd1707304f4ceb Mon Sep 17 00:00:00 2001 From: Eduard Tudenhoefner Date: Wed, 24 Jun 2026 09:37:23 +0200 Subject: [PATCH 2/2] address review feedback --- .../hadoop/ColumnChunkPageReadStore.java | 17 +++-- .../hadoop/TestColumnChunkPageReadStore.java | 71 ++++++++++++++----- 2 files changed, 60 insertions(+), 28 deletions(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java index fc6d47f885..5ddd6e443d 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayDeque; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -46,6 +47,7 @@ import org.apache.parquet.internal.column.columnindex.OffsetIndex; import org.apache.parquet.internal.filter2.columnindex.RowRanges; import org.apache.parquet.io.ParquetDecodingException; +import org.apache.parquet.util.AutoCloseables; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -410,14 +412,11 @@ void setReleaser(ByteBufferReleaser releaser) { @Override public void close() { - try { - for (ColumnChunkPageReader reader : readers.values()) { - reader.releaseBuffers(); - } - } finally { - if (releaser != null) { - releaser.close(); - } - } + // Wrap each reader + the releaser as an AutoCloseable so AutoCloseables.uncheckedClose() + // releases every resource even if one fails, and aggregates failures via suppressed exceptions + List toClose = new ArrayList<>(readers.size() + 1); + readers.values().forEach(reader -> toClose.add(reader::releaseBuffers)); + toClose.add(releaser); + AutoCloseables.uncheckedClose(toClose); } } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageReadStore.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageReadStore.java index 00d205e136..42d753b292 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageReadStore.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageReadStore.java @@ -21,6 +21,7 @@ import static org.apache.parquet.column.Encoding.PLAIN; import static org.apache.parquet.column.Encoding.RLE; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; import java.nio.ByteBuffer; import java.util.Collections; @@ -75,30 +76,15 @@ public void closeWithoutSetReleaserDoesNotThrow() { @Test public void closeReleasesReleaserEvenWhenReaderThrows() throws Exception { RuntimeException releaseFailure = new RuntimeException("release boom"); - - ByteBufferAllocator throwingAllocator = new ByteBufferAllocator() { - @Override - public ByteBuffer allocate(int size) { - return ByteBuffer.allocateDirect(size); - } - - @Override - public void release(ByteBuffer b) { - throw releaseFailure; - } - - @Override - public boolean isDirect() { - return true; - } - }; + ByteBufferAllocator throwingAllocator = throwingAllocator(releaseFailure); try (TrackingByteBufferAllocator storeAllocator = TrackingByteBufferAllocator.wrap(new HeapByteBufferAllocator())) { ColumnChunkPageReadStore store = new ColumnChunkPageReadStore(1L); store.addColumn(COLUMN, newReaderWithQueuedBuffer(throwingAllocator)); - // The store-level releaser holds a tracked buffer that must be released by close()'s finally block. + // The store-level releaser holds a tracked buffer that must still be released even though the reader + // fails first. ByteBufferReleaser storeReleaser = new ByteBufferReleaser(storeAllocator); storeReleaser.releaseLater(storeAllocator.allocate(8)); store.setReleaser(storeReleaser); @@ -107,11 +93,58 @@ public boolean isDirect() { store.close(); throw new AssertionError("Expected close() to propagate the reader failure"); } catch (RuntimeException e) { - assertEquals(releaseFailure, e); + assertSame(releaseFailure, e.getCause()); } } } + @Test + public void closeReportsBothReaderAndReleaserFailures() { + RuntimeException readerFailure = new RuntimeException("reader boom"); + RuntimeException releaserFailure = new RuntimeException("releaser boom"); + + ColumnChunkPageReadStore store = new ColumnChunkPageReadStore(1L); + store.addColumn(COLUMN, newReaderWithQueuedBuffer(throwingAllocator(readerFailure))); + + // The store-level releaser also fails to release its queued buffer. + ByteBufferAllocator throwingReleaserAllocator = throwingAllocator(releaserFailure); + ByteBufferReleaser storeReleaser = new ByteBufferReleaser(throwingReleaserAllocator); + storeReleaser.releaseLater(throwingReleaserAllocator.allocate(8)); + store.setReleaser(storeReleaser); + + try { + store.close(); + throw new AssertionError("Expected close() to propagate the failures"); + } catch (RuntimeException e) { + // Readers are released before the releaser, so the reader failure is the primary (root) cause and the + // releaser failure is attached to it as a suppressed exception, ensuring both are reported. + Throwable root = e.getCause(); + assertSame(readerFailure, root); + Throwable[] suppressed = root.getSuppressed(); + assertEquals(1, suppressed.length); + assertSame(releaserFailure, suppressed[0]); + } + } + + private static ByteBufferAllocator throwingAllocator(RuntimeException releaseFailure) { + return new ByteBufferAllocator() { + @Override + public ByteBuffer allocate(int size) { + return ByteBuffer.allocateDirect(size); + } + + @Override + public void release(ByteBuffer b) { + throw releaseFailure; + } + + @Override + public boolean isDirect() { + return true; + } + }; + } + private static ColumnChunkPageReader newReaderWithoutPages(ParquetReadOptions options) { return new ColumnChunkPageReader( NOOP_DECOMPRESSOR, Collections.emptyList(), null, null, 0L, null, null, 0, 0, options);