diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java index 81bf1fe0e0d0..7b1afa254dad 100644 --- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java @@ -21,6 +21,7 @@ import org.apache.paimon.annotation.Documentation; import org.apache.paimon.annotation.Documentation.ExcludeFromDocumentation; import org.apache.paimon.annotation.Documentation.Immutable; +import org.apache.paimon.annotation.Experimental; import org.apache.paimon.compression.CompressOptions; import org.apache.paimon.fileindex.FileIndexOptions; import org.apache.paimon.fs.Path; @@ -299,6 +300,40 @@ public InlineElement getDescription() { .withDescription( "Specify the message format of data files, currently orc, parquet and avro are supported."); + @Experimental + @ExcludeFromDocumentation("Experimental format-provider SPI") + public static final ConfigOption FILE_FORMAT_PROVIDER = + key("file.format.provider") + .stringType() + .noDefaultValue() + .withDescription( + "Selects an experimental file format provider discovered from the classpath."); + + @Experimental + @ExcludeFromDocumentation("Experimental format-provider SPI") + public static final ConfigOption FILE_FORMAT_READ_PROVIDER = + key("file.format.read-provider") + .stringType() + .noDefaultValue() + .withDescription("Selects an experimental file format provider for readers."); + + @Experimental + @ExcludeFromDocumentation("Experimental format-provider SPI") + public static final ConfigOption FILE_FORMAT_WRITE_PROVIDER = + key("file.format.write-provider") + .stringType() + .noDefaultValue() + .withDescription("Selects an experimental file format provider for writers."); + + @Experimental + @ExcludeFromDocumentation("Experimental format-provider SPI") + public static final ConfigOption FILE_FORMAT_VALIDATION_PROVIDER = + key("file.format.validation-provider") + .stringType() + .noDefaultValue() + .withDescription( + "Selects an experimental file format provider for schema validation."); + public static final ConfigOption> FILE_COMPRESSION_PER_LEVEL = key("file.compression.per.level") .mapType() @@ -2756,7 +2791,7 @@ public Map statsModePerLevel() { } public static String normalizeFileFormat(String fileFormat) { - return StringUtils.isEmpty(fileFormat) ? fileFormat : fileFormat.toLowerCase(); + return StringUtils.isEmpty(fileFormat) ? fileFormat : fileFormat.toLowerCase(Locale.ROOT); } public String dataFilePrefix() { diff --git a/paimon-common/src/main/java/org/apache/paimon/factories/FormatFactoryUtil.java b/paimon-common/src/main/java/org/apache/paimon/factories/FormatFactoryUtil.java index 083b775461aa..252a770ab731 100644 --- a/paimon-common/src/main/java/org/apache/paimon/factories/FormatFactoryUtil.java +++ b/paimon-common/src/main/java/org/apache/paimon/factories/FormatFactoryUtil.java @@ -19,11 +19,13 @@ package org.apache.paimon.factories; import org.apache.paimon.format.FileFormatFactory; +import org.apache.paimon.format.FileFormatProvider; import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache; import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine; import java.util.List; +import java.util.Locale; import java.util.stream.Collectors; import static org.apache.paimon.factories.FactoryUtil.discoverFactories; @@ -34,6 +36,9 @@ public class FormatFactoryUtil { private static final Cache> FACTORIES = Caffeine.newBuilder().softValues().maximumSize(100).executor(Runnable::run).build(); + private static final Cache> PROVIDERS = + Caffeine.newBuilder().softValues().maximumSize(100).executor(Runnable::run).build(); + /** Discovers a file format factory. */ @SuppressWarnings("unchecked") public static T discoverFactory( @@ -64,4 +69,53 @@ private static List getFactories(ClassLoader classLoader) { return FACTORIES.get( classLoader, s -> discoverFactories(classLoader, FileFormatFactory.class)); } + + /** Discovers file format providers. */ + public static List discoverProviders(ClassLoader classLoader) { + return PROVIDERS.get( + classLoader, s -> discoverFactories(classLoader, FileFormatProvider.class)); + } + + /** Discovers a file format provider. */ + public static FileFormatProvider discoverProvider(ClassLoader classLoader, String identifier) { + final List foundProviders = discoverProviders(classLoader); + final String normalizedIdentifier = identifier.trim().toLowerCase(Locale.ROOT); + + final List matchingProviders = + foundProviders.stream() + .filter( + f -> + f.identifier() + .trim() + .toLowerCase(Locale.ROOT) + .equals(normalizedIdentifier)) + .collect(Collectors.toList()); + + if (matchingProviders.isEmpty()) { + throw new FactoryException( + String.format( + "Could not find any provider for identifier '%s' that implements FileFormatProvider in the classpath.\n\n" + + "Available provider identifiers are:\n\n" + + "%s", + identifier, + foundProviders.stream() + .map(FileFormatProvider::identifier) + .collect(Collectors.joining("\n")))); + } + + if (matchingProviders.size() > 1) { + throw new FactoryException( + String.format( + "Multiple providers for identifier '%s' that implement FileFormatProvider found in the classpath.\n\n" + + "Ambiguous provider classes are:\n\n" + + "%s", + identifier, + matchingProviders.stream() + .map(p -> p.getClass().getName()) + .sorted() + .collect(Collectors.joining("\n")))); + } + + return matchingProviders.get(0); + } } diff --git a/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java b/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java index 430fc1404cd2..7c087228d5df 100644 --- a/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java +++ b/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java @@ -30,6 +30,7 @@ import java.util.HashMap; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Optional; @@ -74,6 +75,23 @@ public Optional createStatsExtractor( } public static FileFormat fromIdentifier(String identifier, Options options) { + return fromIdentifier(identifier, options, FileFormatProvider.FORMAT_PROVIDER); + } + + public static FileFormat readerFromIdentifier(String identifier, Options options) { + return fromIdentifier(identifier, options, FileFormatProvider.READ_FORMAT_PROVIDER); + } + + public static FileFormat writerFromIdentifier(String identifier, Options options) { + return fromIdentifier(identifier, options, FileFormatProvider.WRITE_FORMAT_PROVIDER); + } + + public static FileFormat validationFromIdentifier(String identifier, Options options) { + return fromIdentifier(identifier, options, FileFormatProvider.VALIDATION_FORMAT_PROVIDER); + } + + private static FileFormat fromIdentifier( + String identifier, Options options, String providerOptionKey) { return fromIdentifier( normalizeFileFormat(identifier), new FormatContext( @@ -82,21 +100,88 @@ public static FileFormat fromIdentifier(String identifier, Options options) { options.get(CoreOptions.WRITE_BATCH_SIZE), options.get(CoreOptions.WRITE_BATCH_MEMORY), options.get(CoreOptions.FILE_COMPRESSION_ZSTD_LEVEL), - options.get(CoreOptions.FILE_BLOCK_SIZE))); + options.get(CoreOptions.FILE_BLOCK_SIZE)), + providerOptionKey); } /** Create a {@link FileFormat} from format identifier and format options. */ public static FileFormat fromIdentifier(String identifier, FormatContext context) { + return fromIdentifier(identifier, context, FileFormatProvider.FORMAT_PROVIDER); + } + + public static FileFormat readerFromIdentifier(String identifier, FormatContext context) { + return fromIdentifier(identifier, context, FileFormatProvider.READ_FORMAT_PROVIDER); + } + + public static FileFormat writerFromIdentifier(String identifier, FormatContext context) { + return fromIdentifier(identifier, context, FileFormatProvider.WRITE_FORMAT_PROVIDER); + } + + public static FileFormat validationFromIdentifier(String identifier, FormatContext context) { + return fromIdentifier(identifier, context, FileFormatProvider.VALIDATION_FORMAT_PROVIDER); + } + + private static FileFormat fromIdentifier( + String identifier, FormatContext context, String providerOptionKey) { + String normalizedIdentifier = normalizeFileFormat(identifier); + ClassLoader classLoader = providerClassLoader(); + Optional operationFormat = + createFromProvider(classLoader, context, providerOptionKey, normalizedIdentifier); + if (operationFormat.isPresent()) { + return operationFormat.get(); + } + + if (!FileFormatProvider.FORMAT_PROVIDER.equals(providerOptionKey)) { + Optional genericFormat = + createFromProvider( + classLoader, + context, + FileFormatProvider.FORMAT_PROVIDER, + normalizedIdentifier); + if (genericFormat.isPresent()) { + return genericFormat.get(); + } + } + return FormatFactoryUtil.discoverFactory( - FileFormat.class.getClassLoader(), identifier.toLowerCase()) + FileFormat.class.getClassLoader(), normalizedIdentifier) .create(context); } + private static Optional createFromProvider( + ClassLoader classLoader, + FormatContext context, + String providerOptionKey, + String normalizedIdentifier) { + String providerIdentifier = context.options().getString(providerOptionKey, null); + if (providerIdentifier == null || providerIdentifier.trim().isEmpty()) { + return Optional.empty(); + } + return FormatFactoryUtil.discoverProvider( + classLoader, providerIdentifier.trim().toLowerCase(Locale.ROOT)) + .create(normalizedIdentifier, context); + } + + private static ClassLoader providerClassLoader() { + ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader(); + if (contextClassLoader != null) { + try { + if (contextClassLoader.loadClass(FileFormatProvider.class.getName()) + == FileFormatProvider.class) { + return contextClassLoader; + } + } catch (ClassNotFoundException ignored) { + // Fall back to the class loader that loaded Paimon's format API. + } + } + return FileFormat.class.getClassLoader(); + } + protected Options getIdentifierPrefixOptions(Options options) { Map result = new HashMap<>(); - String prefix = formatIdentifier.toLowerCase() + "."; + String prefix = formatIdentifier.toLowerCase(Locale.ROOT) + "."; for (String key : options.keySet()) { - if (key.toLowerCase().startsWith(prefix)) { + if (key.toLowerCase(Locale.ROOT).startsWith(prefix)) { result.put(prefix + key.substring(prefix.length()), options.get(key)); } } @@ -107,13 +192,37 @@ public static FileFormat fileFormat(CoreOptions options) { return FileFormat.fromIdentifier(options.fileFormatString(), options.toConfiguration()); } + public static FileFormat readerFileFormat(CoreOptions options) { + return FileFormat.readerFromIdentifier( + options.fileFormatString(), options.toConfiguration()); + } + + public static FileFormat writerFileFormat(CoreOptions options) { + return FileFormat.writerFromIdentifier( + options.fileFormatString(), options.toConfiguration()); + } + + public static FileFormat validationFileFormat(CoreOptions options) { + return FileFormat.validationFromIdentifier( + options.fileFormatString(), options.toConfiguration()); + } + @Nullable public static FileFormat vectorFileFormat(CoreOptions options) { String vectorFileFormat = options.vectorFileFormatString(); if (vectorFileFormat == null) { return null; } - return FileFormat.fromIdentifier(vectorFileFormat, options.toConfiguration()); + return FileFormat.writerFromIdentifier(vectorFileFormat, options.toConfiguration()); + } + + @Nullable + public static FileFormat validationVectorFileFormat(CoreOptions options) { + String vectorFileFormat = options.vectorFileFormatString(); + if (vectorFileFormat == null) { + return null; + } + return FileFormat.validationFromIdentifier(vectorFileFormat, options.toConfiguration()); } public static FileFormat manifestFormat(CoreOptions options) { diff --git a/paimon-common/src/main/java/org/apache/paimon/format/FileFormatProvider.java b/paimon-common/src/main/java/org/apache/paimon/format/FileFormatProvider.java new file mode 100644 index 000000000000..46abf1c7d834 --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/format/FileFormatProvider.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.format; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.annotation.Experimental; +import org.apache.paimon.format.FileFormatFactory.FormatContext; + +import java.util.Optional; + +/** + * Provider for engine-specific file format implementations. + * + *

Engines can use this SPI to provide readers, writers, or validation logic without exposing the + * implementation dependencies used by Paimon's built-in file formats. For example, an engine may + * provide ORC or Parquet implementations backed by its own filesystem and format libraries. + * + *

The operation-specific options take precedence for their operation. If the operation-specific + * provider is not configured or returns {@link Optional#empty()}, Paimon falls back to {@link + * #FORMAT_PROVIDER}. If no configured provider handles the format, Paimon uses the built-in {@link + * FileFormatFactory} for the requested format. + */ +@Experimental +public interface FileFormatProvider { + + /** Option key used to select a provider discovered from the classpath. */ + String FORMAT_PROVIDER = CoreOptions.FILE_FORMAT_PROVIDER.key(); + + /** Option key used to select a provider for file-format readers. */ + String READ_FORMAT_PROVIDER = CoreOptions.FILE_FORMAT_READ_PROVIDER.key(); + + /** Option key used to select a provider for file-format writers. */ + String WRITE_FORMAT_PROVIDER = CoreOptions.FILE_FORMAT_WRITE_PROVIDER.key(); + + /** Option key used to select a provider for file-format validation. */ + String VALIDATION_FORMAT_PROVIDER = CoreOptions.FILE_FORMAT_VALIDATION_PROVIDER.key(); + + /** Identifier used to select this provider. */ + String identifier(); + + /** + * Creates a file format for the identifier. + * + *

The identifier is the normalized file format identifier, such as {@code parquet} or {@code + * orc}. Return {@link Optional#empty()} to let Paimon use the default service-loaded format + * factory for this format. + */ + Optional create(String identifier, FormatContext context); +} diff --git a/paimon-common/src/test/java/org/apache/paimon/format/FileFormatProviderTest.java b/paimon-common/src/test/java/org/apache/paimon/format/FileFormatProviderTest.java new file mode 100644 index 000000000000..2f000b513341 --- /dev/null +++ b/paimon-common/src/test/java/org/apache/paimon/format/FileFormatProviderTest.java @@ -0,0 +1,400 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.format; + +import org.apache.paimon.format.FileFormatFactory.FormatContext; +import org.apache.paimon.options.Options; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.statistics.SimpleColStatsCollector; +import org.apache.paimon.types.RowType; + +import org.junit.jupiter.api.Test; + +import javax.annotation.Nullable; + +import java.io.File; +import java.lang.reflect.Method; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.Collections; +import java.util.List; +import java.util.Optional; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for {@link FileFormatProvider}. */ +public class FileFormatProviderTest { + + private static final String TEST_PROVIDER = "test-provider"; + private static final String WRITE_ONLY_PROVIDER = "write-only-provider"; + private static final String DUPLICATE_PROVIDER = "duplicate-provider"; + private static final String PROVIDER_ONLY_FORMAT = "provider-only"; + private static final String WRITE_ONLY_FORMAT = "write-only"; + private static final String DEFAULT_FORMAT = "default-format"; + + @Test + public void testProviderIsExplicitlySelectedWithoutHadoopClasses() throws Exception { + try (URLClassLoader classLoader = + new NoHadoopClassLoader( + classpathUrls(), FileFormatProviderTest.class.getClassLoader())) { + Class fileFormatClass = classLoader.loadClass(FileFormat.class.getName()); + Class optionsClass = classLoader.loadClass(Options.class.getName()); + Method fromIdentifier = + fileFormatClass.getMethod("fromIdentifier", String.class, optionsClass); + + Object options = optionsClass.newInstance(); + optionsClass + .getMethod("set", String.class, String.class) + .invoke(options, FileFormatProvider.FORMAT_PROVIDER, TEST_PROVIDER); + Object fileFormat = fromIdentifier.invoke(null, PROVIDER_ONLY_FORMAT, options); + + assertThat(fileFormat.getClass().getName()) + .isEqualTo(ProviderFileFormat.class.getName()); + } + } + + @Test + public void testProviderIsNotUsedWithoutExplicitSelection() throws Exception { + try (URLClassLoader classLoader = + new NoHadoopClassLoader( + classpathUrls(), FileFormatProviderTest.class.getClassLoader())) { + Class fileFormatClass = classLoader.loadClass(FileFormat.class.getName()); + Class optionsClass = classLoader.loadClass(Options.class.getName()); + Method fromIdentifier = + fileFormatClass.getMethod("fromIdentifier", String.class, optionsClass); + + Object options = optionsClass.newInstance(); + + try { + fromIdentifier.invoke(null, PROVIDER_ONLY_FORMAT, options); + } catch (ReflectiveOperationException e) { + assertThat(e.getCause()).isNotNull(); + assertThat(e.getCause()).hasMessageContaining("Could not find any factory"); + return; + } + } + + throw new AssertionError("Provider should not be used without explicit selection."); + } + + @Test + public void testProviderFallsBackToDefaultFactoryWhenFormatIsNotHandled() { + Options options = new Options(); + options.setString(FileFormatProvider.FORMAT_PROVIDER, TEST_PROVIDER); + + FileFormat fileFormat = FileFormat.fromIdentifier(DEFAULT_FORMAT, options); + + assertThat(fileFormat).isInstanceOf(FactoryFileFormat.class); + } + + @Test + public void testOperationSpecificProvidersDoNotAffectOtherOperations() { + Options options = new Options(); + options.setString(FileFormatProvider.WRITE_FORMAT_PROVIDER, TEST_PROVIDER); + + FileFormat readerFormat = FileFormat.readerFromIdentifier(DEFAULT_FORMAT, options); + FileFormat writerFormat = FileFormat.writerFromIdentifier(PROVIDER_ONLY_FORMAT, options); + + assertThat(readerFormat).isInstanceOf(FactoryFileFormat.class); + assertThat(writerFormat).isInstanceOf(ProviderFileFormat.class); + + options = new Options(); + options.setString(FileFormatProvider.READ_FORMAT_PROVIDER, TEST_PROVIDER); + + readerFormat = FileFormat.readerFromIdentifier(PROVIDER_ONLY_FORMAT, options); + writerFormat = FileFormat.writerFromIdentifier(DEFAULT_FORMAT, options); + + assertThat(readerFormat).isInstanceOf(ProviderFileFormat.class); + assertThat(writerFormat).isInstanceOf(FactoryFileFormat.class); + } + + @Test + public void testGenericProviderAppliesToOperationSpecificLookups() { + Options options = new Options(); + options.setString(FileFormatProvider.FORMAT_PROVIDER, TEST_PROVIDER); + + FileFormat readerFormat = FileFormat.readerFromIdentifier(PROVIDER_ONLY_FORMAT, options); + FileFormat writerFormat = FileFormat.writerFromIdentifier(PROVIDER_ONLY_FORMAT, options); + FileFormat validationFormat = + FileFormat.validationFromIdentifier(PROVIDER_ONLY_FORMAT, options); + + assertThat(readerFormat).isInstanceOf(ProviderFileFormat.class); + assertThat(writerFormat).isInstanceOf(ProviderFileFormat.class); + assertThat(validationFormat).isInstanceOf(ProviderFileFormat.class); + } + + @Test + public void testOperationSpecificProviderTakesPrecedenceOverGenericProvider() { + Options options = new Options(); + options.setString(FileFormatProvider.FORMAT_PROVIDER, TEST_PROVIDER); + options.setString(FileFormatProvider.WRITE_FORMAT_PROVIDER, WRITE_ONLY_PROVIDER); + + FileFormat readerFormat = FileFormat.readerFromIdentifier(PROVIDER_ONLY_FORMAT, options); + FileFormat writerFormat = FileFormat.writerFromIdentifier(WRITE_ONLY_FORMAT, options); + + assertThat(readerFormat).isInstanceOf(ProviderFileFormat.class); + assertThat(writerFormat).isInstanceOf(WriteOnlyProviderFileFormat.class); + } + + @Test + public void testOperationSpecificProviderFallsBackToGenericProviderWhenFormatIsNotHandled() { + Options options = new Options(); + options.setString(FileFormatProvider.FORMAT_PROVIDER, TEST_PROVIDER); + options.setString(FileFormatProvider.WRITE_FORMAT_PROVIDER, WRITE_ONLY_PROVIDER); + + FileFormat writerFormat = FileFormat.writerFromIdentifier(PROVIDER_ONLY_FORMAT, options); + + assertThat(writerFormat).isInstanceOf(ProviderFileFormat.class); + } + + @Test + public void testOperationSpecificProviderCanBeSelectedFromFormatContext() { + Options options = new Options(); + options.setString(FileFormatProvider.READ_FORMAT_PROVIDER, TEST_PROVIDER); + FormatContext context = new FormatContext(options, 1024, 1024); + + FileFormat readerFormat = + FileFormat.readerFromIdentifier(PROVIDER_ONLY_FORMAT.toUpperCase(), context); + FileFormat genericFormat = FileFormat.fromIdentifier(DEFAULT_FORMAT, context); + + assertThat(readerFormat).isInstanceOf(ProviderFileFormat.class); + assertThat(readerFormat.getFormatIdentifier()).isEqualTo(PROVIDER_ONLY_FORMAT); + assertThat(genericFormat).isInstanceOf(FactoryFileFormat.class); + } + + @Test + public void testProviderIdentifierSelectionIsCaseInsensitive() { + Options options = new Options(); + options.setString(FileFormatProvider.FORMAT_PROVIDER, " MIXED-CASE-PROVIDER "); + + FileFormat fileFormat = FileFormat.fromIdentifier(PROVIDER_ONLY_FORMAT, options); + + assertThat(fileFormat).isInstanceOf(ProviderFileFormat.class); + } + + @Test + public void testValidationProviderCanBeSelectedSeparately() { + Options options = new Options(); + options.setString(FileFormatProvider.VALIDATION_FORMAT_PROVIDER, TEST_PROVIDER); + + FileFormat validationFormat = + FileFormat.validationFromIdentifier(PROVIDER_ONLY_FORMAT, options); + FileFormat readerFormat = FileFormat.readerFromIdentifier(DEFAULT_FORMAT, options); + + assertThat(validationFormat).isInstanceOf(ProviderFileFormat.class); + assertThat(readerFormat).isInstanceOf(FactoryFileFormat.class); + } + + @Test + public void testDuplicateProviderIdentifiersFailClearly() { + Options options = new Options(); + options.setString(FileFormatProvider.FORMAT_PROVIDER, DUPLICATE_PROVIDER); + + assertThatThrownBy(() -> FileFormat.fromIdentifier(PROVIDER_ONLY_FORMAT, options)) + .hasMessageContaining("Multiple providers for identifier") + .hasMessageContaining(DuplicateFileFormatProvider.class.getName()) + .hasMessageContaining(OtherDuplicateFileFormatProvider.class.getName()) + .satisfies( + e -> + assertThat( + e.getMessage() + .indexOf( + DuplicateFileFormatProvider.class + .getName())) + .isLessThan( + e.getMessage() + .indexOf( + OtherDuplicateFileFormatProvider + .class + .getName()))); + } + + private static URL[] classpathUrls() throws Exception { + String[] entries = System.getProperty("java.class.path").split(File.pathSeparator); + URL[] urls = new URL[entries.length]; + for (int i = 0; i < entries.length; i++) { + urls[i] = new File(entries[i]).toURI().toURL(); + } + return urls; + } + + private static class NoHadoopClassLoader extends URLClassLoader { + + private static final List PARENT_FIRST_PREFIXES = + Collections.singletonList("org.junit."); + + private NoHadoopClassLoader(URL[] urls, ClassLoader parent) { + super(urls, parent); + } + + @Override + public Class loadClass(String name, boolean resolve) throws ClassNotFoundException { + if (name.startsWith("org.apache.hadoop.")) { + throw new ClassNotFoundException(name); + } + + for (String prefix : PARENT_FIRST_PREFIXES) { + if (name.startsWith(prefix)) { + return super.loadClass(name, resolve); + } + } + + synchronized (getClassLoadingLock(name)) { + Class loadedClass = findLoadedClass(name); + if (loadedClass == null) { + try { + loadedClass = findClass(name); + } catch (ClassNotFoundException ignored) { + loadedClass = super.loadClass(name, false); + } + } + if (resolve) { + resolveClass(loadedClass); + } + return loadedClass; + } + } + } + + /** Test provider that handles only {@link #PROVIDER_ONLY_FORMAT}. */ + public static class TestFileFormatProvider implements FileFormatProvider { + + @Override + public String identifier() { + return TEST_PROVIDER; + } + + @Override + public Optional create(String identifier, FormatContext context) { + if (PROVIDER_ONLY_FORMAT.equals(identifier)) { + return Optional.of(new ProviderFileFormat(identifier)); + } + return Optional.empty(); + } + } + + /** Test provider used to verify operation-specific provider precedence. */ + public static class WriteOnlyFileFormatProvider implements FileFormatProvider { + + @Override + public String identifier() { + return WRITE_ONLY_PROVIDER; + } + + @Override + public Optional create(String identifier, FormatContext context) { + if (WRITE_ONLY_FORMAT.equals(identifier)) { + return Optional.of(new WriteOnlyProviderFileFormat(identifier)); + } + return Optional.empty(); + } + } + + /** Test provider with a mixed-case identifier. */ + public static class MixedCaseFileFormatProvider extends TestFileFormatProvider { + + @Override + public String identifier() { + return "Mixed-Case-Provider"; + } + } + + /** Test provider with a duplicate identifier. */ + public static class DuplicateFileFormatProvider implements FileFormatProvider { + + @Override + public String identifier() { + return DUPLICATE_PROVIDER; + } + + @Override + public Optional create(String identifier, FormatContext context) { + return Optional.of(new TestFileFormat(identifier)); + } + } + + /** Second test provider with the same duplicate identifier. */ + public static class OtherDuplicateFileFormatProvider extends DuplicateFileFormatProvider {} + + /** Test factory used when no provider handles the requested format. */ + public static class TestFileFormatFactory implements FileFormatFactory { + + @Override + public String identifier() { + return DEFAULT_FORMAT; + } + + @Override + public FileFormat create(FormatContext formatContext) { + return new FactoryFileFormat(DEFAULT_FORMAT); + } + } + + private static class ProviderFileFormat extends TestFileFormat { + + private ProviderFileFormat(String formatIdentifier) { + super(formatIdentifier); + } + } + + private static class WriteOnlyProviderFileFormat extends TestFileFormat { + + private WriteOnlyProviderFileFormat(String formatIdentifier) { + super(formatIdentifier); + } + } + + private static class FactoryFileFormat extends TestFileFormat { + + private FactoryFileFormat(String formatIdentifier) { + super(formatIdentifier); + } + } + + /** Base test file format implementation. */ + public static class TestFileFormat extends FileFormat { + + private TestFileFormat(String formatIdentifier) { + super(formatIdentifier); + } + + @Override + public FormatReaderFactory createReaderFactory( + RowType dataSchemaRowType, + RowType projectedRowType, + @Nullable List filters) { + throw new UnsupportedOperationException(); + } + + @Override + public FormatWriterFactory createWriterFactory(RowType type) { + throw new UnsupportedOperationException(); + } + + @Override + public void validateDataFields(RowType rowType) {} + + @Override + public Optional createStatsExtractor( + RowType type, SimpleColStatsCollector.Factory[] statsCollectors) { + return Optional.empty(); + } + } +} diff --git a/paimon-common/src/test/resources/META-INF/services/org.apache.paimon.format.FileFormatFactory b/paimon-common/src/test/resources/META-INF/services/org.apache.paimon.format.FileFormatFactory new file mode 100644 index 000000000000..57dfb970dfd0 --- /dev/null +++ b/paimon-common/src/test/resources/META-INF/services/org.apache.paimon.format.FileFormatFactory @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.paimon.format.FileFormatProviderTest$TestFileFormatFactory diff --git a/paimon-common/src/test/resources/META-INF/services/org.apache.paimon.format.FileFormatProvider b/paimon-common/src/test/resources/META-INF/services/org.apache.paimon.format.FileFormatProvider new file mode 100644 index 000000000000..9d9f274c3456 --- /dev/null +++ b/paimon-common/src/test/resources/META-INF/services/org.apache.paimon.format.FileFormatProvider @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.paimon.format.FileFormatProviderTest$TestFileFormatProvider +org.apache.paimon.format.FileFormatProviderTest$WriteOnlyFileFormatProvider +org.apache.paimon.format.FileFormatProviderTest$MixedCaseFileFormatProvider +org.apache.paimon.format.FileFormatProviderTest$OtherDuplicateFileFormatProvider +org.apache.paimon.format.FileFormatProviderTest$DuplicateFileFormatProvider diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java index 4a8a8b1b91b6..728747f3b168 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java @@ -74,6 +74,7 @@ import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemTable; import static org.apache.paimon.catalog.CatalogUtils.isSystemDatabase; import static org.apache.paimon.catalog.CatalogUtils.listPartitionsFromFileSystem; +import static org.apache.paimon.catalog.CatalogUtils.tableRuntimeOptions; import static org.apache.paimon.catalog.CatalogUtils.validateCreateTable; import static org.apache.paimon.catalog.Identifier.DEFAULT_MAIN_BRANCH; import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED; @@ -86,12 +87,14 @@ public abstract class AbstractCatalog implements Catalog { protected final FileIO fileIO; protected final Map tableDefaultOptions; + protected final Map tableRuntimeOptions; protected final CatalogContext context; protected final @Nullable LocalCacheManager cacheManager; protected AbstractCatalog(FileIO fileIO) { this.fileIO = fileIO; this.tableDefaultOptions = new HashMap<>(); + this.tableRuntimeOptions = new HashMap<>(); this.context = CatalogContext.create(new Options()); this.cacheManager = null; } @@ -100,6 +103,7 @@ protected AbstractCatalog(FileIO fileIO, CatalogContext context) { this.cacheManager = CachingFileIO.createCacheManager(context); this.fileIO = CachingFileIO.wrapWithCachingIfNeeded(fileIO, context, cacheManager); this.tableDefaultOptions = CatalogUtils.tableDefaultOptions(context.options().toMap()); + this.tableRuntimeOptions = tableRuntimeOptions(context.options().toMap()); this.context = context; } @@ -573,7 +577,7 @@ protected long appendNewSchema(FileStoreTable existingTable, Schema newSchema) while (true) { TableSchema latest = sm.latestOrThrow("Cannot replace: schema chain is empty."); TableSchema staged = TableSchema.create(latest.id() + 1, newSchema); - if (sm.commit(staged)) { + if (sm.commit(staged, tableRuntimeOptions)) { return staged.id(); } } @@ -601,6 +605,7 @@ public Table getTable(Identifier identifier) throws TableNotExistException { lockFactory().orElse(null), lockContext().orElse(null), context, + tableRuntimeOptions, false); } diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java index 57fa040a2acd..4cdbe2a29871 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java @@ -1241,6 +1241,7 @@ TableQueryAuthResult authTableQuery(Identifier identifier, @Nullable List tableDefaultOptions(Map option return convertToPropertiesPrefixKey(options, TABLE_DEFAULT_OPTION_PREFIX); } + public static Map tableRuntimeOptions(Map options) { + return convertToPropertiesPrefixKey(options, TABLE_RUNTIME_OPTION_PREFIX); + } + public static boolean isSystemDatabase(String database) { return SYSTEM_DATABASE_NAME.equals(database); } @@ -253,6 +258,7 @@ public static Table loadTable( @Nullable CatalogLockFactory lockFactory, @Nullable CatalogLockContext lockContext, @Nullable CatalogContext catalogContext, + Map tableRuntimeOptions, boolean isRestCatalog) throws Catalog.TableNotExistException { if (SYSTEM_DATABASE_NAME.equals(identifier.getDatabaseName())) { @@ -266,7 +272,8 @@ public static Table loadTable( Function dataFileIO = metadata.isExternal() ? externalFileIO : internalFileIO; if (options.type() == TableType.FORMAT_TABLE) { - return toFormatTable(identifier, schema, dataFileIO, catalogContext); + return toFormatTable( + identifier, schema, dataFileIO, catalogContext, tableRuntimeOptions); } if (options.type() == TableType.OBJECT_TABLE) { @@ -302,7 +309,12 @@ public static Table loadTable( catalog.supportsPartitionModification()); Path path = new Path(schema.options().get(PATH.key())); FileStoreTable table = - FileStoreTableFactory.create(dataFileIO.apply(path), path, schema, catalogEnv); + FileStoreTableFactory.create( + dataFileIO.apply(path), + path, + schema, + Options.fromMap(tableRuntimeOptions), + catalogEnv); if (identifier.isSystemTable()) { return CatalogUtils.createSystemTable(identifier, table); @@ -410,14 +422,17 @@ private static FormatTable toFormatTable( Identifier identifier, TableSchema schema, Function fileIO, - CatalogContext catalogContext) { - Map options = schema.options(); + CatalogContext catalogContext, + Map tableRuntimeOptions) { + Map options = new HashMap<>(schema.options()); + options.putAll(tableRuntimeOptions); + String location = schema.options().get(CoreOptions.PATH.key()); + options.put(CoreOptions.PATH.key(), location); FormatTable.Format format = FormatTable.parseFormat( options.getOrDefault( CoreOptions.FILE_FORMAT.key(), CoreOptions.FILE_FORMAT.defaultValue())); - String location = options.get(CoreOptions.PATH.key()); return FormatTable.builder() .fileIO(fileIO.apply(new Path(location))) .identifier(identifier) diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java index b0b8b1ac7f30..52b9b84becdc 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java @@ -130,7 +130,9 @@ protected void dropTableImpl(Identifier identifier, List externalPaths) { public void createTableImpl(Identifier identifier, Schema schema) { SchemaManager schemaManager = schemaManager(identifier); try { - runWithLock(identifier, () -> uncheck(() -> schemaManager.createTable(schema))); + runWithLock( + identifier, + () -> uncheck(() -> schemaManager.createTable(schema, tableRuntimeOptions))); } catch (RuntimeException e) { throw e; } catch (Exception e) { @@ -169,7 +171,8 @@ protected void alterTableImpl(Identifier identifier, List changes) throws TableNotExistException, ColumnAlreadyExistException, ColumnNotExistException { SchemaManager schemaManager = schemaManager(identifier); try { - runWithLock(identifier, () -> schemaManager.commitChanges(changes)); + runWithLock( + identifier, () -> schemaManager.commitChanges(changes, tableRuntimeOptions)); } catch (TableNotExistException | ColumnAlreadyExistException | ColumnNotExistException diff --git a/paimon-core/src/main/java/org/apache/paimon/format/FileFormatDiscover.java b/paimon-core/src/main/java/org/apache/paimon/format/FileFormatDiscover.java index a692e3a5eed9..f34231dc3b33 100644 --- a/paimon-core/src/main/java/org/apache/paimon/format/FileFormatDiscover.java +++ b/paimon-core/src/main/java/org/apache/paimon/format/FileFormatDiscover.java @@ -39,7 +39,7 @@ public FileFormat discover(String identifier) { } private FileFormat create(String identifier) { - return FileFormat.fromIdentifier(identifier, options.toConfiguration()); + return FileFormat.readerFromIdentifier(identifier, options.toConfiguration()); } }; } diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java index b1ae451e5974..923d8c17a58b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java @@ -416,7 +416,8 @@ private FormatWriterFactory writerFactory(WriteFormatKey key) { private FileFormat fileFormat(String format) { return formatFactory.computeIfAbsent( - format, k -> FileFormat.fromIdentifier(format, options.toConfiguration())); + format, + k -> FileFormat.writerFromIdentifier(format, options.toConfiguration())); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java index f765e5f88db5..9979160cc4c0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java @@ -337,7 +337,9 @@ protected void createTableImpl(Identifier identifier, Schema schema) { // create table file SchemaManager schemaManager = getSchemaManager(identifier); TableSchema tableSchema = - runWithLock(identifier, () -> schemaManager.createTable(schema)); + runWithLock( + identifier, + () -> schemaManager.createTable(schema, tableRuntimeOptions)); // Update schema metadata Path path = getTableLocation(identifier); if (JdbcUtils.insertTable( @@ -412,7 +414,8 @@ protected void alterTableImpl(Identifier identifier, List changes) assertMainBranch(identifier); SchemaManager schemaManager = getSchemaManager(identifier); try { - runWithLock(identifier, () -> schemaManager.commitChanges(changes)); + runWithLock( + identifier, () -> schemaManager.commitChanges(changes, tableRuntimeOptions)); if (syncTableProperties()) { TableSchema updatedSchema = schemaManager.latest().get(); execute( diff --git a/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java b/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java index de51a91da24f..71b38816c80f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java +++ b/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java @@ -157,7 +157,7 @@ public static DataFileMeta constructFileMeta( options.statsMode(), options, rowTypeWithSchemaId.getFieldNames()); SimpleStatsExtractor simpleStatsExtractor = - FileFormat.fromIdentifier(format, options.toConfiguration()) + FileFormat.readerFromIdentifier(format, options.toConfiguration()) .createStatsExtractor(rowTypeWithSchemaId, factories) .orElseThrow( () -> @@ -295,7 +295,7 @@ public static SimpleStatsExtractor createSimpleStatsExtractor(Table table, Strin StatsCollectorFactories.createStatsFactories( options.statsMode(), options, table.rowType().getFieldNames()); - return FileFormat.fromIdentifier(format, options.toConfiguration()) + return FileFormat.readerFromIdentifier(format, options.toConfiguration()) .createStatsExtractor(table.rowType(), factories) .orElseThrow( () -> diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java index 94620205a232..8131896a6fcb 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java @@ -62,7 +62,7 @@ import java.util.function.Function; import java.util.function.Supplier; -import static org.apache.paimon.format.FileFormat.fileFormat; +import static org.apache.paimon.format.FileFormat.writerFileFormat; import static org.apache.paimon.utils.StatsCollectorFactories.createStatsFactories; /** {@link FileStoreWrite} for {@link AppendOnlyFileStore}. */ @@ -103,7 +103,7 @@ public BaseAppendFileStoreWrite( this.rowType = rowType; this.writeType = rowType; this.writeCols = null; - this.fileFormat = fileFormat(options); + this.fileFormat = writerFileFormat(options); this.pathFactory = pathFactory; this.blobContext = BlobFileContext.create(rowType, options); this.fileIndexOptions = options.indexColumnsOptions(); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java index 41ecc6d0aa4e..db082c1c8bac 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java @@ -61,7 +61,7 @@ import java.util.function.Function; import java.util.function.Supplier; -import static org.apache.paimon.format.FileFormat.fileFormat; +import static org.apache.paimon.format.FileFormat.writerFileFormat; import static org.apache.paimon.utils.FileStorePathFactory.createFormatPathFactories; /** {@link FileStoreWrite} for {@link KeyValueFileStore}. */ @@ -128,7 +128,7 @@ public KeyValueFileStoreWrite( schema.id(), keyType, valueType, - fileFormat(options), + writerFileFormat(options), createFormatPathFactories(options, formatPathFactory), options.targetFileSize(true)); this.keyComparatorSupplier = keyComparatorSupplier; diff --git a/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketFileStoreWrite.java index a6ba8febe567..9a47f6a1aa1f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketFileStoreWrite.java @@ -62,7 +62,7 @@ import java.util.function.BiFunction; import java.util.function.Function; -import static org.apache.paimon.format.FileFormat.fileFormat; +import static org.apache.paimon.format.FileFormat.writerFileFormat; import static org.apache.paimon.utils.FileStorePathFactory.createFormatPathFactories; /** {@link FileStoreWrite} for {@code bucket = -2} tables. */ @@ -126,7 +126,7 @@ public PostponeBucketFileStoreWrite( : writeId)); this.options = new CoreOptions(newOptions); - FileFormat fileFormat = fileFormat(this.options); + FileFormat fileFormat = writerFileFormat(this.options); this.writerFactoryBuilder = KeyValueFileWriterFactory.builder( fileIO, diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java index 9d76cfdf4fef..ae9b6cd90c9e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java @@ -92,6 +92,7 @@ import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemTable; import static org.apache.paimon.catalog.CatalogUtils.isSystemDatabase; import static org.apache.paimon.catalog.CatalogUtils.listPartitionsFromFileSystem; +import static org.apache.paimon.catalog.CatalogUtils.tableRuntimeOptions; import static org.apache.paimon.catalog.CatalogUtils.validateCreateTable; import static org.apache.paimon.options.CatalogOptions.CASE_SENSITIVE; @@ -102,6 +103,7 @@ public class RESTCatalog implements Catalog { private final CatalogContext context; private final boolean dataTokenEnabled; protected final Map tableDefaultOptions; + private final Map tableRuntimeOptions; private final @Nullable LocalCacheManager cacheManager; public RESTCatalog(CatalogContext context) { @@ -118,6 +120,7 @@ public RESTCatalog(CatalogContext context, boolean configRequired) { context.fallbackIO()); this.dataTokenEnabled = api.options().get(RESTTokenFileIO.DATA_TOKEN_ENABLED); this.tableDefaultOptions = CatalogUtils.tableDefaultOptions(this.context.options().toMap()); + this.tableRuntimeOptions = tableRuntimeOptions(this.context.options().toMap()); this.cacheManager = CachingFileIO.createCacheManager(this.context); } @@ -316,6 +319,7 @@ public Table getTable(Identifier identifier) throws TableNotExistException { null, null, context, + tableRuntimeOptions, true); } @@ -504,6 +508,7 @@ private Table toTable(String db, GetTableResponse response) { null, null, context, + tableRuntimeOptions, true); } catch (TableNotExistException e) { throw new RuntimeException(e); @@ -1249,7 +1254,7 @@ private Schema inferSchemaIfExternalPaimonTable(Schema schema) throws Exception Optional latest = schemaManager.latest(); if (latest.isPresent()) { // Note we just validate schema here, will not create a new table - schemaManager.createTable(schema, true); + schemaManager.createTable(schema, true, tableRuntimeOptions); Schema existsSchema = latest.get().toSchema(); // use `owner` and `path` from the user provide schema if (Objects.nonNull(schema.options().get(Catalog.OWNER_PROP))) { diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java index bfe3b7de84d7..ec527be8f7d3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java @@ -25,6 +25,7 @@ import org.apache.paimon.catalog.Identifier; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; +import org.apache.paimon.options.Options; import org.apache.paimon.schema.ColumnDirectiveUtils.ConvertedColumn; import org.apache.paimon.schema.SchemaChange.AddColumn; import org.apache.paimon.schema.SchemaChange.DropColumn; @@ -37,6 +38,7 @@ import org.apache.paimon.schema.SchemaChange.UpdateColumnPosition; import org.apache.paimon.schema.SchemaChange.UpdateColumnType; import org.apache.paimon.schema.SchemaChange.UpdateComment; +import org.apache.paimon.table.CatalogEnvironment; import org.apache.paimon.table.FileStoreTableFactory; import org.apache.paimon.table.SchemaModification; import org.apache.paimon.types.ArrayType; @@ -186,6 +188,17 @@ public TableSchema createTable(Schema schema) throws Exception { } public TableSchema createTable(Schema schema, boolean externalTable) throws Exception { + return createTable(schema, externalTable, Collections.emptyMap()); + } + + public TableSchema createTable(Schema schema, Map dynamicOptions) + throws Exception { + return createTable(schema, false, dynamicOptions); + } + + public TableSchema createTable( + Schema schema, boolean externalTable, Map dynamicOptions) + throws Exception { while (true) { Optional latest = latest(); if (latest.isPresent()) { @@ -203,9 +216,15 @@ public TableSchema createTable(Schema schema, boolean externalTable) throws Exce TableSchema newSchema = TableSchema.create(0, schema); // validate table from creating table - FileStoreTableFactory.create(fileIO, tableRoot, newSchema).store(); - - boolean success = commit(newSchema); + FileStoreTableFactory.create( + fileIO, + tableRoot, + newSchema, + Options.fromMap(dynamicOptions), + CatalogEnvironment.empty()) + .store(); + + boolean success = commit(newSchema, dynamicOptions); if (success) { return newSchema; } @@ -256,6 +275,13 @@ public TableSchema commitChanges(SchemaChange... changes) throws Exception { public TableSchema commitChanges(List changes) throws Catalog.TableNotExistException, Catalog.ColumnAlreadyExistException, Catalog.ColumnNotExistException { + return commitChanges(changes, Collections.emptyMap()); + } + + /** Update {@link SchemaChange}s. */ + public TableSchema commitChanges(List changes, Map dynamicOptions) + throws Catalog.TableNotExistException, Catalog.ColumnAlreadyExistException, + Catalog.ColumnNotExistException { SnapshotManager snapshotManager = new SnapshotManager(fileIO, tableRoot, branch, null, null); LazyField hasSnapshots = @@ -271,9 +297,10 @@ public TableSchema commitChanges(List changes) LazyField lazyIdentifier = new LazyField<>(() -> identifierFromPath(tableRoot.toString(), true, branch)); TableSchema newTableSchema = - generateTableSchema(oldTableSchema, changes, hasSnapshots, lazyIdentifier); + generateTableSchema( + oldTableSchema, changes, hasSnapshots, lazyIdentifier, dynamicOptions); try { - boolean success = commit(newTableSchema); + boolean success = commit(newTableSchema, dynamicOptions); if (success) { return newTableSchema; } @@ -289,6 +316,17 @@ public static TableSchema generateTableSchema( LazyField hasSnapshots, LazyField lazyIdentifier) throws Catalog.ColumnAlreadyExistException, Catalog.ColumnNotExistException { + return generateTableSchema( + oldTableSchema, changes, hasSnapshots, lazyIdentifier, Collections.emptyMap()); + } + + public static TableSchema generateTableSchema( + TableSchema oldTableSchema, + List changes, + LazyField hasSnapshots, + LazyField lazyIdentifier, + Map dynamicOptions) + throws Catalog.ColumnAlreadyExistException, Catalog.ColumnNotExistException { Map oldOptions = new HashMap<>(oldTableSchema.options()); Map newOptions = new HashMap<>(oldTableSchema.options()); boolean disableNullToNotNull = @@ -618,7 +656,7 @@ protected void updateLastColumn( newSchema.primaryKeys(), newSchema.options(), newSchema.comment()); - SchemaValidation.validateTableSchema(newTableSchema); + SchemaValidation.validateTableSchema(newTableSchema, dynamicOptions); return newTableSchema; } @@ -1151,7 +1189,12 @@ protected void updateLastColumn(int depth, List newFields, String fie @VisibleForTesting public boolean commit(TableSchema newSchema) throws Exception { - SchemaValidation.validateTableSchema(newSchema); + return commit(newSchema, Collections.emptyMap()); + } + + public boolean commit(TableSchema newSchema, Map dynamicOptions) + throws Exception { + SchemaValidation.validateTableSchema(newSchema, dynamicOptions); SchemaValidation.validateFallbackBranch(this, newSchema); Path schemaPath = toSchemaPath(newSchema.id()); return fileIO.tryToWriteAtomic(schemaPath, newSchema.toString()); diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java index 7733b6a080b5..846b46256539 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java @@ -69,6 +69,7 @@ import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN; import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP; import static org.apache.paimon.CoreOptions.INCREMENTAL_TO_AUTO_TAG; +import static org.apache.paimon.CoreOptions.PATH; import static org.apache.paimon.CoreOptions.PRIMARY_KEY; import static org.apache.paimon.CoreOptions.SCAN_FILE_CREATION_TIME_MILLIS; import static org.apache.paimon.CoreOptions.SCAN_MODE; @@ -80,7 +81,7 @@ import static org.apache.paimon.CoreOptions.SNAPSHOT_NUM_RETAINED_MAX; import static org.apache.paimon.CoreOptions.SNAPSHOT_NUM_RETAINED_MIN; import static org.apache.paimon.CoreOptions.STREAMING_READ_OVERWRITE; -import static org.apache.paimon.format.FileFormat.vectorFileFormat; +import static org.apache.paimon.format.FileFormat.validationVectorFileFormat; import static org.apache.paimon.schema.TableSchema.PAIMON_07_VERSION; import static org.apache.paimon.table.PrimaryKeyTableUtils.createMergeFunctionFactory; import static org.apache.paimon.table.SpecialFields.KEY_FIELD_PREFIX; @@ -120,31 +121,64 @@ public static void validateTableSchema(TableSchema schema) { * validations such as the {@code write-only} requirement for snapshot ordering */ public static void validateTableSchema(TableSchema schema, Set dynamicOptionKeys) { - CoreOptions options = new CoreOptions(schema.options()); + validateTableSchema(schema, Collections.emptyMap(), dynamicOptionKeys); + } - validateOnlyContainPrimitiveType(schema.fields(), schema.primaryKeys(), "primary key"); - validateOnlyContainPrimitiveType(schema.fields(), schema.partitionKeys(), "partition"); - validateOnlyContainPrimitiveType(schema.fields(), options.upsertKey(), "upsert key"); + /** + * Validate the {@link TableSchema} and {@link CoreOptions}. + * + * @param schema the schema to be validated + * @param dynamicOptions options that are overridden dynamically at runtime. These options + * participate in validation but are not required to be persisted in the table schema. + */ + public static void validateTableSchema(TableSchema schema, Map dynamicOptions) { + validateTableSchema(schema, dynamicOptions, dynamicOptions.keySet()); + } - if (!options.upsertKey().isEmpty() && !schema.primaryKeys().isEmpty()) { + private static void validateTableSchema( + TableSchema schema, Map dynamicOptions, Set dynamicOptionKeys) { + Map validationOptions = new HashMap<>(schema.options()); + dynamicOptions.forEach( + (key, value) -> { + if (PATH.key().equals(key)) { + return; + } + if (value == null) { + validationOptions.remove(key); + } else { + validationOptions.put(key, value); + } + }); + TableSchema validationSchema = schema.copy(validationOptions); + CoreOptions options = new CoreOptions(validationOptions); + + validateOnlyContainPrimitiveType( + validationSchema.fields(), validationSchema.primaryKeys(), "primary key"); + validateOnlyContainPrimitiveType( + validationSchema.fields(), validationSchema.partitionKeys(), "partition"); + validateOnlyContainPrimitiveType( + validationSchema.fields(), options.upsertKey(), "upsert key"); + + if (!options.upsertKey().isEmpty() && !validationSchema.primaryKeys().isEmpty()) { throw new RuntimeException( String.format( "Cannot define 'upsert-key' %s with 'primary-key' %s.", - options.upsertKey(), schema.primaryKeys())); + options.upsertKey(), validationSchema.primaryKeys())); } - validateBucket(schema, options); + validateBucket(validationSchema, options); validateStartupMode(options); - validateFieldsPrefix(schema, options); + validateFieldsPrefix(validationSchema, options); - validateSequenceField(schema, options); + validateSequenceField(validationSchema, options); - validateMergeFunction(schema); + validateMergeFunction(validationSchema); ChangelogProducer changelogProducer = options.changelogProducer(); - if (schema.primaryKeys().isEmpty() && changelogProducer != ChangelogProducer.NONE) { + if (validationSchema.primaryKeys().isEmpty() + && changelogProducer != ChangelogProducer.NONE) { throw new UnsupportedOperationException( String.format( "Can not set %s on table without primary keys, please define primary keys.", @@ -195,8 +229,9 @@ public static void validateTableSchema(TableSchema schema, Set dynamicOp + CHANGELOG_NUM_RETAINED_MAX.key()); FileFormat fileFormat = - FileFormat.fromIdentifier(options.formatType(), new Options(schema.options())); - RowType tableRowType = new RowType(schema.fields()); + FileFormat.validationFromIdentifier( + options.formatType(), Options.fromMap(validationOptions)); + RowType tableRowType = new RowType(validationSchema.fields()); validateBlobFields(tableRowType, options); Set blobDescriptorFields = validateBlobDescriptorFields(tableRowType, options); Set blobViewFields = @@ -247,7 +282,8 @@ public static void validateTableSchema(TableSchema schema, Set dynamicOp } // Check column names in schema - schema.fieldNames() + validationSchema + .fieldNames() .forEach( f -> { checkState( @@ -262,7 +298,7 @@ public static void validateTableSchema(TableSchema schema, Set dynamicOp f, KEY_FIELD_PREFIX)); }); - if (schema.primaryKeys().isEmpty() && options.streamingReadOverwrite()) { + if (validationSchema.primaryKeys().isEmpty() && options.streamingReadOverwrite()) { throw new RuntimeException( String.format( "Doesn't support streaming read the changes from overwrite when the primary keys are " @@ -273,7 +309,7 @@ public static void validateTableSchema(TableSchema schema, Set dynamicOp String recordLevelTimeField = options.recordLevelTimeField(); if (recordLevelTimeField != null) { Optional field = - schema.fields().stream() + validationSchema.fields().stream() .filter(dataField -> dataField.name().equals(recordLevelTimeField)) .findFirst(); if (!field.isPresent()) { @@ -306,7 +342,7 @@ public static void validateTableSchema(TableSchema schema, Set dynamicOp .ifPresent( field -> checkArgument( - schema.fieldNames().contains(field), + validationSchema.fieldNames().contains(field), "Rowkind field: '%s' can not be found in table schema.", field)); @@ -319,12 +355,13 @@ public static void validateTableSchema(TableSchema schema, Set dynamicOp } if (options.snapshotSequenceOrdering()) { - validateSnapshotSequenceOrdering(schema, options, dynamicOptionKeys); + validateSnapshotSequenceOrdering(validationSchema, options, dynamicOptionKeys); } // vector field names must point to vector type Set fieldNamesSpecifiedAsVector = options.vectorField(); - schema.fields() + validationSchema + .fields() .forEach( f -> checkState( @@ -335,26 +372,26 @@ public static void validateTableSchema(TableSchema schema, Set dynamicOp + " the type must be vector, but it is %s", f.name(), f.type()))); // vector field names must exist in table schema - schema.fieldNames().forEach(fieldNamesSpecifiedAsVector::remove); + validationSchema.fieldNames().forEach(fieldNamesSpecifiedAsVector::remove); checkArgument( fieldNamesSpecifiedAsVector.isEmpty(), "Some of the columns specified as vector-field are unknown."); - validateMergeFunctionFactory(schema); + validateMergeFunctionFactory(validationSchema); - validateFileIndex(schema); + validateFileIndex(validationSchema); - validateRowTracking(schema, options); + validateRowTracking(validationSchema, options); - validateIncrementalClustering(schema, options); + validateIncrementalClustering(validationSchema, options); - validateChainTable(schema, options); + validateChainTable(validationSchema, options); - validateChangelogReadSequenceNumber(schema, options); + validateChangelogReadSequenceNumber(validationSchema, options); validatePkClusteringOverride(options); - validateManifestSort(schema, options); + validateManifestSort(validationSchema, options); } public static void validateFallbackBranch(SchemaManager schemaManager, TableSchema schema) { @@ -864,7 +901,7 @@ private static void validateRowTracking(TableSchema schema, CoreOptions options) "The BLOB type column can not be part of partition keys."); } - FileFormat vectorFileFormat = vectorFileFormat(options); + FileFormat vectorFileFormat = validationVectorFileFormat(options); if (vectorFileFormat != null) { Set vectorStoreNames = fieldNamesInVectorFile(schema.logicalRowType(), true); checkArgument( diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index 0249a8e6b397..33669868d4fc 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -20,6 +20,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.Snapshot; +import org.apache.paimon.catalog.CatalogUtils; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.consumer.ConsumerManager; import org.apache.paimon.fs.FileIO; @@ -374,7 +375,7 @@ protected FileStoreTable copyInternal( } // validate schema with new options - SchemaValidation.validateTableSchema(newTableSchema, dynamicOptions.keySet()); + SchemaValidation.validateTableSchema(newTableSchema, dynamicOptions); return copy(newTableSchema); } @@ -785,10 +786,19 @@ public FileStoreTable switchToBranch(String branchName) { fileIO(), location(), branchSchema, - new Options(), + catalogRuntimeOptions(catalogEnvironment), newCatalogEnvironment(targetBranch)); } + static Options catalogRuntimeOptions(CatalogEnvironment catalogEnvironment) { + if (catalogEnvironment.catalogContext() == null) { + return new Options(); + } + return Options.fromMap( + CatalogUtils.tableRuntimeOptions( + catalogEnvironment.catalogContext().options().toMap())); + } + private RollbackHelper rollbackHelper() { return new RollbackHelper(snapshotManager(), changelogManager(), tagManager(), fileIO); } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java index 5a1a5b5a89c1..5b62fd8b5878 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java @@ -172,7 +172,11 @@ protected FileStoreTable switchWrappedToBranch(String branchName) { } return FileStoreTableFactory.createWithoutFallbackBranch( - wrapped.fileIO(), wrapped.location(), branchSchema, new Options(), branchEnv); + wrapped.fileIO(), + wrapped.location(), + branchSchema, + AbstractFileStoreTable.catalogRuntimeOptions(branchEnv), + branchEnv); } protected Map rewriteOtherOptions(Map options) { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java index b07465a25828..98d0c264a1d0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java @@ -132,7 +132,7 @@ default Optional comment() { /** * Get {@link DataTable} with branch identified by {@code branchName}. Note that this method - * does not keep dynamic options in current table. + * does not keep dynamic options in current table, except catalog runtime options. */ @Override FileStoreTable switchToBranch(String branchName); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableFileWriter.java index 23061e0d1437..5c6cacd2447d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableFileWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableFileWriter.java @@ -35,7 +35,7 @@ import java.util.List; import java.util.Map; -import static org.apache.paimon.format.FileFormat.fileFormat; +import static org.apache.paimon.format.FileFormat.writerFileFormat; import static org.apache.paimon.utils.PartitionPathUtils.generatePartitionPathUtil; /** File writer for format table. */ @@ -52,7 +52,7 @@ public FormatTableFileWriter( FileIO fileIO, RowType writeRowType, CoreOptions options, RowType partitionType) { this.fileIO = fileIO; this.writeRowType = writeRowType; - this.fileFormat = fileFormat(options); + this.fileFormat = writerFileFormat(options); this.writers = new HashMap<>(); this.options = options; this.pathFactory = diff --git a/paimon-core/src/test/java/org/apache/paimon/CoreOptionsTest.java b/paimon-core/src/test/java/org/apache/paimon/CoreOptionsTest.java index 359b9f25e817..a797ef8ac0d4 100644 --- a/paimon-core/src/test/java/org/apache/paimon/CoreOptionsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/CoreOptionsTest.java @@ -22,6 +22,10 @@ import org.junit.jupiter.api.Test; +import java.util.Arrays; +import java.util.Locale; +import java.util.stream.Collectors; + import static org.assertj.core.api.Assertions.assertThat; /** Tests for {@link org.apache.paimon.CoreOptions}. */ @@ -96,6 +100,41 @@ public void testSequenceFieldTrim() { assertThat(options.sequenceField()).containsExactly("f1", "f2", "f3"); } + @Test + public void testNormalizeFileFormatUsesRootLocale() { + Locale originalLocale = Locale.getDefault(); + try { + Locale.setDefault(Locale.forLanguageTag("tr")); + assertThat(CoreOptions.normalizeFileFormat("MOSAIC")) + .isEqualTo(CoreOptions.FILE_FORMAT_MOSAIC); + } finally { + Locale.setDefault(originalLocale); + } + } + + @Test + public void testFileFormatProviderOptionsAreRegistered() { + assertThat(org.apache.paimon.format.FileFormatProvider.FORMAT_PROVIDER) + .isEqualTo(CoreOptions.FILE_FORMAT_PROVIDER.key()); + assertThat(org.apache.paimon.format.FileFormatProvider.READ_FORMAT_PROVIDER) + .isEqualTo(CoreOptions.FILE_FORMAT_READ_PROVIDER.key()); + assertThat(org.apache.paimon.format.FileFormatProvider.WRITE_FORMAT_PROVIDER) + .isEqualTo(CoreOptions.FILE_FORMAT_WRITE_PROVIDER.key()); + assertThat(org.apache.paimon.format.FileFormatProvider.VALIDATION_FORMAT_PROVIDER) + .isEqualTo(CoreOptions.FILE_FORMAT_VALIDATION_PROVIDER.key()); + + assertThat( + CoreOptions.getOptions().stream() + .map(option -> option.key()) + .collect(Collectors.toSet())) + .containsAll( + Arrays.asList( + CoreOptions.FILE_FORMAT_PROVIDER.key(), + CoreOptions.FILE_FORMAT_READ_PROVIDER.key(), + CoreOptions.FILE_FORMAT_WRITE_PROVIDER.key(), + CoreOptions.FILE_FORMAT_VALIDATION_PROVIDER.key())); + } + @Test public void testBlobSplitByFileSizeDefault() { Options conf = new Options(); diff --git a/paimon-core/src/test/java/org/apache/paimon/FileFormatTest.java b/paimon-core/src/test/java/org/apache/paimon/FileFormatTest.java index 1981c13a0c19..6604281318a8 100644 --- a/paimon-core/src/test/java/org/apache/paimon/FileFormatTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/FileFormatTest.java @@ -42,6 +42,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Locale; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -105,7 +106,7 @@ public void testCreateFileFormat(String identifier) { Options tableOptions = new Options(); tableOptions.set(CoreOptions.FILE_FORMAT, identifier); tableOptions.set(CoreOptions.READ_BATCH_SIZE, 1024); - tableOptions.setString(identifier.toLowerCase() + ".hello", "world"); + tableOptions.setString(identifier.toLowerCase(Locale.ROOT) + ".hello", "world"); FileFormat fileFormat = FileFormat.fromIdentifier(identifier, tableOptions); assertThat(fileFormat instanceof OrcFileFormat).isTrue(); diff --git a/paimon-core/src/test/java/org/apache/paimon/catalog/FileSystemCatalogTest.java b/paimon-core/src/test/java/org/apache/paimon/catalog/FileSystemCatalogTest.java index 04f67c018d1b..6f57ca74cd72 100644 --- a/paimon-core/src/test/java/org/apache/paimon/catalog/FileSystemCatalogTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/catalog/FileSystemCatalogTest.java @@ -18,21 +18,52 @@ package org.apache.paimon.catalog; +import org.apache.paimon.CoreOptions; +import org.apache.paimon.TableType; +import org.apache.paimon.format.FileFormat; +import org.apache.paimon.format.FileFormatFactory.FormatContext; +import org.apache.paimon.format.FileFormatProvider; +import org.apache.paimon.format.FormatReaderFactory; +import org.apache.paimon.format.FormatWriterFactory; +import org.apache.paimon.format.SimpleStatsExtractor; import org.apache.paimon.fs.Path; import org.apache.paimon.options.Options; +import org.apache.paimon.predicate.Predicate; import org.apache.paimon.schema.Schema; +import org.apache.paimon.schema.SchemaChange; +import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.statistics.SimpleColStatsCollector; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.FormatTable; import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; import org.apache.paimon.shade.guava30.com.google.common.collect.Lists; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.mockito.Mockito; +import javax.annotation.Nullable; + +import java.util.List; +import java.util.Optional; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.Mockito.when; /** Tests for {@link FileSystemCatalog}. */ public class FileSystemCatalogTest extends CatalogTestBase { + private static final AtomicInteger RUNTIME_PROVIDER_VALIDATIONS = new AtomicInteger(); + private static final Queue RUNTIME_PROVIDER_CONTEXT_PATHS = + new ConcurrentLinkedQueue<>(); + @BeforeEach public void setUp() throws Exception { super.setUp(); @@ -65,6 +96,141 @@ public void testCreateTableCaseSensitive() throws Exception { catalog.createTable(identifier, schema, false); } + @Test + public void testCreateTableWithRuntimeCatalogOptions() throws Exception { + RUNTIME_PROVIDER_VALIDATIONS.set(0); + RUNTIME_PROVIDER_CONTEXT_PATHS.clear(); + Options options = new Options(); + options.set( + Catalog.TABLE_RUNTIME_OPTION_PREFIX + FileFormatProvider.VALIDATION_FORMAT_PROVIDER, + RuntimeOptionFileFormatProvider.IDENTIFIER); + String runtimePath = new Path(new Path(warehouse), "runtime-path-ignored").toString(); + options.set(Catalog.TABLE_RUNTIME_OPTION_PREFIX + CoreOptions.PATH.key(), runtimePath); + catalog = + new FileSystemCatalog(fileIO, new Path(warehouse), CatalogContext.create(options)); + + catalog.createDatabase("test_db", false); + Identifier identifier = Identifier.create("test_db", "new_table"); + String schemaPath = new Path(new Path(warehouse), "test_db.db/new_table").toString(); + Schema schema = + Schema.newBuilder() + .column("pk", DataTypes.INT()) + .column("value", DataTypes.STRING()) + .primaryKey("pk") + .option(CoreOptions.BUCKET.key(), "-1") + .option(CoreOptions.FILE_FORMAT.key(), CoreOptions.FILE_FORMAT_AVRO) + .build(); + catalog.createTable(identifier, schema, false); + + assertThat(RUNTIME_PROVIDER_VALIDATIONS.get()).isGreaterThan(0); + assertThat(RUNTIME_PROVIDER_CONTEXT_PATHS).contains(schemaPath).doesNotContain(runtimePath); + FileStoreTable table = (FileStoreTable) catalog.getTable(identifier); + assertThat(table.options()) + .containsEntry(CoreOptions.FILE_FORMAT.key(), CoreOptions.FILE_FORMAT_AVRO) + .containsEntry(CoreOptions.PATH.key(), schemaPath) + .containsEntry( + FileFormatProvider.VALIDATION_FORMAT_PROVIDER, + RuntimeOptionFileFormatProvider.IDENTIFIER); + assertThat(table.location().toString()).isEqualTo(schemaPath); + assertThat( + new SchemaManager( + fileIO, + new Path(new Path(warehouse), "test_db.db/new_table")) + .latestOrThrow("Table schema should exist") + .options()) + .containsEntry(CoreOptions.FILE_FORMAT.key(), CoreOptions.FILE_FORMAT_AVRO) + .doesNotContainKey(CoreOptions.PATH.key()) + .doesNotContainKey(FileFormatProvider.VALIDATION_FORMAT_PROVIDER); + + RUNTIME_PROVIDER_VALIDATIONS.set(0); + catalog.alterTable( + identifier, SchemaChange.addColumn("new_value", DataTypes.STRING()), false); + + assertThat(RUNTIME_PROVIDER_VALIDATIONS.get()).isGreaterThan(0); + assertThat(RUNTIME_PROVIDER_CONTEXT_PATHS).doesNotContain(runtimePath); + assertThat( + new SchemaManager( + fileIO, + new Path(new Path(warehouse), "test_db.db/new_table")) + .latestOrThrow("Table schema should exist") + .options()) + .containsEntry(CoreOptions.FILE_FORMAT.key(), CoreOptions.FILE_FORMAT_AVRO) + .doesNotContainKey(CoreOptions.PATH.key()) + .doesNotContainKey(FileFormatProvider.VALIDATION_FORMAT_PROVIDER); + + table.createBranch("audit"); + FileStoreTable branchTable = + (FileStoreTable) catalog.getTable(new Identifier("test_db", "new_table", "audit")); + assertThat(branchTable.options()) + .containsEntry( + FileFormatProvider.VALIDATION_FORMAT_PROVIDER, + RuntimeOptionFileFormatProvider.IDENTIFIER); + assertThat(branchTable.location().toString()).isEqualTo(schemaPath); + assertThat(RUNTIME_PROVIDER_CONTEXT_PATHS).doesNotContain(runtimePath); + assertThat( + new SchemaManager( + fileIO, + new Path(new Path(warehouse), "test_db.db/new_table"), + "audit") + .latestOrThrow("Branch schema should exist") + .options()) + .doesNotContainKey(CoreOptions.PATH.key()) + .doesNotContainKey(FileFormatProvider.VALIDATION_FORMAT_PROVIDER); + } + + @Test + public void testLoadFormatTableWithRuntimeCatalogOptions() throws Exception { + Options options = new Options(); + options.set( + Catalog.TABLE_RUNTIME_OPTION_PREFIX + FileFormatProvider.READ_FORMAT_PROVIDER, + RuntimeOptionFileFormatProvider.IDENTIFIER); + String schemaPath = new Path(new Path(warehouse), "format-table-data").toString(); + String runtimePath = new Path(new Path(warehouse), "runtime-path-ignored").toString(); + options.set(Catalog.TABLE_RUNTIME_OPTION_PREFIX + CoreOptions.PATH.key(), runtimePath); + catalog = + new FileSystemCatalog(fileIO, new Path(warehouse), CatalogContext.create(options)); + + Identifier identifier = Identifier.create("test_db", "format_table"); + Schema schema = + Schema.newBuilder() + .column("value", DataTypes.STRING()) + .option(CoreOptions.TYPE.key(), TableType.FORMAT_TABLE.toString()) + .option(CoreOptions.FILE_FORMAT.key(), CoreOptions.FILE_FORMAT_PARQUET) + .option(CoreOptions.PATH.key(), schemaPath) + .build(); + + Catalog loadCatalog = Mockito.mock(Catalog.class); + when(loadCatalog.supportsVersionManagement()).thenReturn(false); + when(loadCatalog.supportsPartitionModification()).thenReturn(false); + FormatTable table = + (FormatTable) + CatalogUtils.loadTable( + loadCatalog, + identifier, + path -> fileIO, + path -> fileIO, + ignored -> + new TableMetadata( + TableSchema.create(0, schema), false, null), + null, + null, + CatalogContext.create(options), + CatalogUtils.tableRuntimeOptions(options.toMap()), + false); + + assertThat(table.options()) + .containsEntry(CoreOptions.FILE_FORMAT.key(), CoreOptions.FILE_FORMAT_PARQUET) + .containsEntry(CoreOptions.PATH.key(), schemaPath) + .containsEntry( + FileFormatProvider.READ_FORMAT_PROVIDER, + RuntimeOptionFileFormatProvider.IDENTIFIER); + assertThat(table.location()).isEqualTo(schemaPath); + assertThat(TableSchema.create(0, schema).options()) + .containsEntry(CoreOptions.FILE_FORMAT.key(), CoreOptions.FILE_FORMAT_PARQUET) + .containsEntry(CoreOptions.PATH.key(), schemaPath) + .doesNotContainKey(FileFormatProvider.READ_FORMAT_PROVIDER); + } + @Test public void testAlterDatabase() throws Exception { String databaseName = "test_alter_db"; @@ -77,4 +243,58 @@ public void testAlterDatabase() throws Exception { false)) .isInstanceOf(UnsupportedOperationException.class); } + + /** Test provider selected through catalog runtime options. */ + public static class RuntimeOptionFileFormatProvider implements FileFormatProvider { + + static final String IDENTIFIER = "catalog-runtime-provider"; + + @Override + public String identifier() { + return IDENTIFIER; + } + + @Override + public Optional create(String identifier, FormatContext context) { + if (CoreOptions.FILE_FORMAT_AVRO.equals(identifier)) { + String contextPath = context.options().get(CoreOptions.PATH.key()); + if (contextPath != null) { + RUNTIME_PROVIDER_CONTEXT_PATHS.add(contextPath); + } + return Optional.of(new RuntimeOptionFileFormat(identifier)); + } + return Optional.empty(); + } + } + + private static class RuntimeOptionFileFormat extends FileFormat { + + private RuntimeOptionFileFormat(String formatIdentifier) { + super(formatIdentifier); + } + + @Override + public FormatReaderFactory createReaderFactory( + RowType dataSchemaRowType, + RowType projectedRowType, + @Nullable List filters) { + throw new UnsupportedOperationException(); + } + + @Override + public FormatWriterFactory createWriterFactory(RowType type) { + throw new UnsupportedOperationException(); + } + + @Override + public void validateDataFields(RowType rowType) { + RUNTIME_PROVIDER_VALIDATIONS.incrementAndGet(); + } + + @Override + public Optional createStatsExtractor( + RowType type, SimpleColStatsCollector.Factory[] statsCollectors) { + return Optional.empty(); + } + } } diff --git a/paimon-core/src/test/java/org/apache/paimon/migrate/FileMetaUtilsTest.java b/paimon-core/src/test/java/org/apache/paimon/migrate/FileMetaUtilsTest.java new file mode 100644 index 000000000000..a3ffd9d2b571 --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/migrate/FileMetaUtilsTest.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.migrate; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.format.FileFormat; +import org.apache.paimon.format.FileFormatFactory.FormatContext; +import org.apache.paimon.format.FileFormatProvider; +import org.apache.paimon.format.FormatReaderFactory; +import org.apache.paimon.format.FormatWriterFactory; +import org.apache.paimon.format.SimpleColStats; +import org.apache.paimon.format.SimpleStatsExtractor; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.options.Options; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.statistics.SimpleColStatsCollector; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.Pair; + +import org.junit.jupiter.api.Test; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** Tests for {@link FileMetaUtils}. */ +public class FileMetaUtilsTest { + + private static final String READ_PROVIDER = "file-meta-read-provider"; + private static final String PROVIDER_FORMAT = "file-meta-provider-format"; + private static final RowType ROW_TYPE = RowType.of(DataTypes.INT()); + + @Test + public void testCreateSimpleStatsExtractorUsesReadProvider() { + Options options = new Options(); + options.set(FileFormatProvider.READ_FORMAT_PROVIDER, READ_PROVIDER); + + FileStoreTable table = mock(FileStoreTable.class); + when(table.coreOptions()).thenReturn(new CoreOptions(options)); + when(table.rowType()).thenReturn(ROW_TYPE); + + SimpleStatsExtractor extractor = + FileMetaUtils.createSimpleStatsExtractor(table, PROVIDER_FORMAT); + + assertThat(options.toMap()).doesNotContainKey(FileFormatProvider.FORMAT_PROVIDER); + assertThat(extractor).isInstanceOf(ReadStatsExtractor.class); + } + + /** Test provider selected only for external-file stats reads. */ + public static class ReadStatsFileFormatProvider implements FileFormatProvider { + + @Override + public String identifier() { + return READ_PROVIDER; + } + + @Override + public Optional create(String identifier, FormatContext context) { + if (PROVIDER_FORMAT.equals(identifier)) { + return Optional.of(new ReadStatsFileFormat(identifier)); + } + return Optional.empty(); + } + } + + private static class ReadStatsFileFormat extends FileFormat { + + private ReadStatsFileFormat(String formatIdentifier) { + super(formatIdentifier); + } + + @Override + public FormatReaderFactory createReaderFactory( + RowType dataSchemaRowType, + RowType projectedRowType, + @Nullable List filters) { + throw new UnsupportedOperationException(); + } + + @Override + public FormatWriterFactory createWriterFactory(RowType type) { + throw new UnsupportedOperationException(); + } + + @Override + public void validateDataFields(RowType rowType) {} + + @Override + public Optional createStatsExtractor( + RowType type, SimpleColStatsCollector.Factory[] statsCollectors) { + return Optional.of(new ReadStatsExtractor()); + } + } + + private static class ReadStatsExtractor implements SimpleStatsExtractor { + + @Override + public SimpleColStats[] extract(FileIO fileIO, Path path, long length) throws IOException { + return new SimpleColStats[0]; + } + + @Override + public Pair extractWithFileInfo( + FileIO fileIO, Path path, long length) throws IOException { + return Pair.of(new SimpleColStats[0], new FileInfo(0)); + } + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java index 4b7e6f3b315b..9d7815555abf 100644 --- a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java @@ -160,6 +160,54 @@ public void testCreateTableIllegal() { .hasMessageContaining("Sequence field: 'f4' can not be found in table schema."); } + @Test + public void testCreateTableWithDynamicOptions() throws Exception { + Map schemaOptions = new HashMap<>(); + schemaOptions.put(CoreOptions.SEQUENCE_SNAPSHOT_ORDERING.key(), "true"); + Schema schema = + new Schema(rowType.getFields(), partitionKeys, primaryKeys, schemaOptions, ""); + + Map dynamicOptions = + Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), "true"); + TableSchema tableSchema = + retryArtificialException(() -> manager.createTable(schema, dynamicOptions)); + + assertThat(tableSchema.options()) + .containsEntry(CoreOptions.SEQUENCE_SNAPSHOT_ORDERING.key(), "true") + .doesNotContainKey(CoreOptions.WRITE_ONLY.key()); + assertThat(retryArtificialException(() -> manager.latest()).get().options()) + .containsEntry(CoreOptions.SEQUENCE_SNAPSHOT_ORDERING.key(), "true") + .doesNotContainKey(CoreOptions.WRITE_ONLY.key()); + } + + @Test + public void testCommitChangesWithDynamicOptions() throws Exception { + Map schemaOptions = new HashMap<>(); + schemaOptions.put(CoreOptions.SEQUENCE_SNAPSHOT_ORDERING.key(), "true"); + Schema schema = + new Schema(rowType.getFields(), partitionKeys, primaryKeys, schemaOptions, ""); + + Map dynamicOptions = + Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), "true"); + retryArtificialException(() -> manager.createTable(schema, dynamicOptions)); + TableSchema tableSchema = + retryArtificialException( + () -> + manager.commitChanges( + Collections.singletonList( + SchemaChange.setOption("new_k", "new_v")), + dynamicOptions)); + + assertThat(tableSchema.options()) + .containsEntry(CoreOptions.SEQUENCE_SNAPSHOT_ORDERING.key(), "true") + .containsEntry("new_k", "new_v") + .doesNotContainKey(CoreOptions.WRITE_ONLY.key()); + assertThat(retryArtificialException(() -> manager.latest()).get().options()) + .containsEntry(CoreOptions.SEQUENCE_SNAPSHOT_ORDERING.key(), "true") + .containsEntry("new_k", "new_v") + .doesNotContainKey(CoreOptions.WRITE_ONLY.key()); + } + @Test public void testUpdateOptions() throws Exception { retryArtificialException(() -> manager.createTable(this.schema)); diff --git a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java index a306adc67005..8dc65bb467f0 100644 --- a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java @@ -28,6 +28,7 @@ import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -583,6 +584,33 @@ public void testSnapshotSequenceOrderingRejectsNonWriteOnly() { .hasMessageContaining(CoreOptions.WRITE_ONLY.key()); } + @Test + public void testSnapshotSequenceOrderingHonorsDynamicWriteOnlyValue() { + Map options = new HashMap<>(); + options.put(CoreOptions.SEQUENCE_SNAPSHOT_ORDERING.key(), "true"); + options.put(BUCKET.key(), String.valueOf(-1)); + + TableSchema schema = + new TableSchema( + 1, + Arrays.asList( + new DataField(0, "f0", DataTypes.INT()), + new DataField(1, "f1", DataTypes.INT())), + 10, + emptyList(), + singletonList("f1"), + options, + ""); + + assertThatNoException() + .isThrownBy( + () -> + validateTableSchema( + schema, + Collections.singletonMap( + CoreOptions.WRITE_ONLY.key(), "true"))); + } + @Test public void testSnapshotSequenceOrderingRejectsSequenceField() { Map options = new HashMap<>(); @@ -851,4 +879,27 @@ public void testFullCompactionDeltaCommitsWithLookupChangelogProducer() { options.put(CoreOptions.CHANGELOG_PRODUCER.key(), "input"); assertThatCode(() -> validateTableSchemaExec(options)).doesNotThrowAnyException(); } + + @Test + public void testDynamicOptionsCanRemoveSchemaOptionsDuringValidation() { + Map options = new HashMap<>(); + options.put(CoreOptions.CHANGELOG_PRODUCER.key(), "lookup"); + options.put(CoreOptions.FULL_COMPACTION_DELTA_COMMITS.key(), "1"); + options.put(BUCKET.key(), String.valueOf(-1)); + + List fields = + Arrays.asList( + new DataField(0, "f0", DataTypes.INT()), + new DataField(1, "f1", DataTypes.INT()), + new DataField(2, "f2", DataTypes.INT()), + new DataField(3, "f3", DataTypes.STRING())); + TableSchema schema = + new TableSchema( + 1, fields, 10, singletonList("f0"), singletonList("f1"), options, ""); + Map dynamicOptions = new HashMap<>(); + dynamicOptions.put(CoreOptions.FULL_COMPACTION_DELTA_COMMITS.key(), null); + + assertThatCode(() -> validateTableSchema(schema, dynamicOptions)) + .doesNotThrowAnyException(); + } } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/FallbackReadFileStoreTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/FallbackReadFileStoreTableTest.java index e0841df4d0e4..e01bab9df5af 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/FallbackReadFileStoreTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/FallbackReadFileStoreTableTest.java @@ -19,9 +19,12 @@ package org.apache.paimon.table; import org.apache.paimon.CoreOptions; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.format.FileFormatProvider; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.FileIOFinder; import org.apache.paimon.fs.Path; @@ -407,8 +410,23 @@ void testSwitchToBranch() throws Exception { String branchName = "bc"; Identifier mainId = Identifier.create("mydb", "mytable"); + Options catalogOptions = new Options(); + catalogOptions.set( + Catalog.TABLE_RUNTIME_OPTION_PREFIX + FileFormatProvider.VALIDATION_FORMAT_PROVIDER, + "catalog-runtime-provider"); + catalogOptions.set( + Catalog.TABLE_RUNTIME_OPTION_PREFIX + CoreOptions.PATH.key(), + new Path(tablePath, "ignored-runtime-path").toString()); CatalogEnvironment env = - new CatalogEnvironment(mainId, "uuid-1", null, null, null, null, false, false); + new CatalogEnvironment( + mainId, + "uuid-1", + null, + null, + null, + CatalogContext.create(catalogOptions), + false, + false); TableSchema tableSchema = SchemaUtils.forceCommit( @@ -431,13 +449,23 @@ void testSwitchToBranch() throws Exception { FallbackReadFileStoreTable fallbackTable = new FallbackReadFileStoreTable(mainTable, branchTable, true); - FileStoreTable switched = fallbackTable.switchToBranch(branchName); + Options dynamicOptions = new Options(); + dynamicOptions.set(CoreOptions.SCAN_SNAPSHOT_ID, 1L); + + FileStoreTable switched = + fallbackTable.copy(dynamicOptions.toMap()).switchToBranch(branchName); Identifier switchedId = switched.catalogEnvironment().identifier(); assertThat(switchedId).isNotNull(); assertThat(switchedId.getDatabaseName()).isEqualTo("mydb"); assertThat(switchedId.getBranchName()).isEqualTo(branchName); assertThat(switchedId.getObjectName()).isEqualTo("mytable$branch_bc"); + assertThat(switched.options()) + .containsEntry(CoreOptions.PATH.key(), tablePath.toString()) + .containsEntry( + FileFormatProvider.VALIDATION_FORMAT_PROVIDER, "catalog-runtime-provider") + .doesNotContainKey(CoreOptions.SCAN_SNAPSHOT_ID.key()); + assertThat(switched.location()).isEqualTo(tablePath); } private void writeDataIntoTable( diff --git a/paimon-core/src/test/resources/META-INF/services/org.apache.paimon.format.FileFormatProvider b/paimon-core/src/test/resources/META-INF/services/org.apache.paimon.format.FileFormatProvider new file mode 100644 index 000000000000..cff7ab76038d --- /dev/null +++ b/paimon-core/src/test/resources/META-INF/services/org.apache.paimon.format.FileFormatProvider @@ -0,0 +1,17 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.paimon.catalog.FileSystemCatalogTest$RuntimeOptionFileFormatProvider +org.apache.paimon.migrate.FileMetaUtilsTest$ReadStatsFileFormatProvider diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/format/CompactedChangelogReadOnlyFormat.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/format/CompactedChangelogReadOnlyFormat.java index f7388f359142..6c6125d82458 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/format/CompactedChangelogReadOnlyFormat.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/format/CompactedChangelogReadOnlyFormat.java @@ -78,7 +78,7 @@ public String identifier() { @Override public FileFormat create(FormatContext formatContext) { return new CompactedChangelogReadOnlyFormat( - getIdentifier(format), FileFormat.fromIdentifier(format, formatContext)); + getIdentifier(format), FileFormat.readerFromIdentifier(format, formatContext)); } } diff --git a/paimon-format/src/test/java/org/apache/paimon/format/FormatProviderNoHadoopTest.java b/paimon-format/src/test/java/org/apache/paimon/format/FormatProviderNoHadoopTest.java new file mode 100644 index 000000000000..6cbcc58dc874 --- /dev/null +++ b/paimon-format/src/test/java/org/apache/paimon/format/FormatProviderNoHadoopTest.java @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.format; + +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.format.FileFormatFactory.FormatContext; +import org.apache.paimon.fs.PositionOutputStream; +import org.apache.paimon.options.Options; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.reader.FileRecordReader; +import org.apache.paimon.statistics.SimpleColStatsCollector; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; + +import org.junit.jupiter.api.Test; + +import javax.annotation.Nullable; + +import java.io.File; +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests that {@link FileFormatProvider} can bypass Hadoop-backed format constructors. */ +public class FormatProviderNoHadoopTest { + + private static final String TEST_PROVIDER = "format-test-provider"; + + @Test + public void testProviderBypassesHadoopBackedOrcAndParquetFormatPaths() throws Exception { + try (URLClassLoader classLoader = new NoHadoopClassLoader(testClasspathWithoutHadoop())) { + Class runner = + Class.forName(NoHadoopFormatProviderRunner.class.getName(), true, classLoader); + runner.getMethod("run", ClassLoader.class).invoke(null, classLoader); + } catch (InvocationTargetException e) { + Throwable cause = e.getCause(); + if (cause instanceof Exception) { + throw (Exception) cause; + } + throw (Error) cause; + } + } + + private static URL[] testClasspathWithoutHadoop() { + return Arrays.stream(System.getProperty("java.class.path").split(File.pathSeparator)) + .filter(path -> !path.contains("/hadoop-")) + .filter(path -> !path.contains("/htrace-core")) + .filter(path -> !path.contains("/woodstox-core")) + .filter(path -> !path.contains("/stax2-api")) + .map( + path -> { + try { + return new File(path).toURI().toURL(); + } catch (IOException e) { + throw new RuntimeException(e); + } + }) + .toArray(URL[]::new); + } + + private static class NoHadoopClassLoader extends URLClassLoader { + + private NoHadoopClassLoader(URL[] urls) { + super(urls, ClassLoader.getSystemClassLoader().getParent()); + } + + @Override + protected Class loadClass(String name, boolean resolve) throws ClassNotFoundException { + if (name.startsWith("org.apache.hadoop.")) { + throw new ClassNotFoundException(name); + } + return super.loadClass(name, resolve); + } + } + + /** Runner loaded by {@link NoHadoopClassLoader}. */ + public static class NoHadoopFormatProviderRunner { + + public static void run(ClassLoader classLoader) throws Exception { + assertThatThrownBy(() -> classLoader.loadClass("org.apache.hadoop.conf.Configuration")) + .isInstanceOf(ClassNotFoundException.class); + + RowType rowType = DataTypes.ROW(DataTypes.FIELD(0, "id", DataTypes.INT())); + + Options options = new Options(); + options.setString(FileFormatProvider.WRITE_FORMAT_PROVIDER, TEST_PROVIDER); + options.setString(FileFormatProvider.READ_FORMAT_PROVIDER, TEST_PROVIDER); + options.setString(FileFormatProvider.VALIDATION_FORMAT_PROVIDER, TEST_PROVIDER); + + FileFormat orc = FileFormat.writerFromIdentifier("orc", options); + FileFormat parquet = FileFormat.writerFromIdentifier("parquet", options); + FileFormat orcReader = FileFormat.readerFromIdentifier("orc", options); + FileFormat orcValidation = FileFormat.validationFromIdentifier("orc", options); + + assertThat(orc).isInstanceOf(TestFileFormat.class); + assertThat(parquet).isInstanceOf(TestFileFormat.class); + assertThat(orcReader).isInstanceOf(TestFileFormat.class); + assertThat(orcValidation).isInstanceOf(TestFileFormat.class); + assertThat(orc.createWriterFactory(rowType)) + .isInstanceOf(TestFormatWriterFactory.class); + assertThat(parquet.createWriterFactory(rowType)) + .isInstanceOf(TestFormatWriterFactory.class); + assertThat(orcReader.createReaderFactory(rowType, rowType, null)) + .isInstanceOf(TestFormatReaderFactory.class); + assertThatCode(() -> orcValidation.validateDataFields(rowType)) + .doesNotThrowAnyException(); + + Options writerOnlyOptions = new Options(); + writerOnlyOptions.setString(FileFormatProvider.WRITE_FORMAT_PROVIDER, TEST_PROVIDER); + assertThatThrownBy(() -> FileFormat.readerFromIdentifier("orc", writerOnlyOptions)) + .hasRootCauseInstanceOf(ClassNotFoundException.class); + + Options writerAndGenericOptions = new Options(); + writerAndGenericOptions.setString( + FileFormatProvider.WRITE_FORMAT_PROVIDER, TEST_PROVIDER); + writerAndGenericOptions.setString(FileFormatProvider.FORMAT_PROVIDER, TEST_PROVIDER); + assertThat(FileFormat.readerFromIdentifier("orc", writerAndGenericOptions)) + .isInstanceOf(TestFileFormat.class); + } + } + + /** Test provider used by the no-Hadoop classloader runner. */ + public static class TestFileFormatProvider implements FileFormatProvider { + + @Override + public String identifier() { + return TEST_PROVIDER; + } + + @Override + public Optional create(String identifier, FormatContext context) { + if ("orc".equals(identifier) || "parquet".equals(identifier)) { + return Optional.of(new TestFileFormat(identifier)); + } + return Optional.empty(); + } + } + + /** Test format implementation that avoids Hadoop-backed constructors. */ + public static class TestFileFormat extends FileFormat { + + private TestFileFormat(String formatIdentifier) { + super(formatIdentifier); + } + + @Override + public FormatReaderFactory createReaderFactory( + RowType dataSchemaRowType, + RowType projectedRowType, + @Nullable List filters) { + return new TestFormatReaderFactory(); + } + + @Override + public FormatWriterFactory createWriterFactory(RowType type) { + return new TestFormatWriterFactory(); + } + + @Override + public void validateDataFields(RowType rowType) {} + + @Override + public Optional createStatsExtractor( + RowType type, SimpleColStatsCollector.Factory[] statsCollectors) { + return Optional.empty(); + } + } + + /** Test reader factory returned by {@link TestFileFormat}. */ + public static class TestFormatReaderFactory implements FormatReaderFactory { + + @Override + public FileRecordReader createReader(Context context) { + throw new UnsupportedOperationException(); + } + } + + /** Test writer factory returned by {@link TestFileFormat}. */ + public static class TestFormatWriterFactory implements FormatWriterFactory { + + @Override + public FormatWriter create(PositionOutputStream out, String compression) { + return new TestFormatWriter(); + } + } + + private static class TestFormatWriter implements FormatWriter { + + @Override + public void addElement(InternalRow element) {} + + @Override + public boolean reachTargetSize(boolean suggestedCheck, long targetSize) { + return false; + } + + @Override + public void close() {} + } +} diff --git a/paimon-format/src/test/resources/META-INF/services/org.apache.paimon.format.FileFormatProvider b/paimon-format/src/test/resources/META-INF/services/org.apache.paimon.format.FileFormatProvider new file mode 100644 index 000000000000..4b83156e904e --- /dev/null +++ b/paimon-format/src/test/resources/META-INF/services/org.apache.paimon.format.FileFormatProvider @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.paimon.format.FormatProviderNoHadoopTest$TestFileFormatProvider diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java index 889850bf1131..f5534ee24009 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java @@ -768,6 +768,7 @@ public org.apache.paimon.table.Table getTable(Identifier identifier) lockFactory().orElse(null), lockContext().orElse(null), context, + tableRuntimeOptions, false); } @@ -1113,7 +1114,8 @@ protected void createTableImpl(Identifier identifier, Schema schema) { identifier, () -> schemaManager(identifier, location) - .createTable(schema, externalTable)); + .createTable( + schema, externalTable, tableRuntimeOptions)); } catch (RuntimeException e) { throw e; } catch (Exception e) { @@ -1259,7 +1261,10 @@ protected void alterTableImpl(Identifier identifier, List changes) TableSchema schema; try { // first commit changes to underlying files - schema = runWithLock(identifier, () -> schemaManager.commitChanges(changes)); + schema = + runWithLock( + identifier, + () -> schemaManager.commitChanges(changes, tableRuntimeOptions)); } catch (TableNotExistException | ColumnAlreadyExistException | ColumnNotExistException