Skip to content
Draft
37 changes: 36 additions & 1 deletion paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.paimon.annotation.Documentation;
import org.apache.paimon.annotation.Documentation.ExcludeFromDocumentation;
import org.apache.paimon.annotation.Documentation.Immutable;
import org.apache.paimon.annotation.Experimental;
import org.apache.paimon.compression.CompressOptions;
import org.apache.paimon.fileindex.FileIndexOptions;
import org.apache.paimon.fs.Path;
Expand Down Expand Up @@ -299,6 +300,40 @@ public InlineElement getDescription() {
.withDescription(
"Specify the message format of data files, currently orc, parquet and avro are supported.");

@Experimental
@ExcludeFromDocumentation("Experimental format-provider SPI")
public static final ConfigOption<String> FILE_FORMAT_PROVIDER =
key("file.format.provider")
.stringType()
.noDefaultValue()
.withDescription(
"Selects an experimental file format provider discovered from the classpath.");

@Experimental
@ExcludeFromDocumentation("Experimental format-provider SPI")
public static final ConfigOption<String> FILE_FORMAT_READ_PROVIDER =
key("file.format.read-provider")
.stringType()
.noDefaultValue()
.withDescription("Selects an experimental file format provider for readers.");

@Experimental
@ExcludeFromDocumentation("Experimental format-provider SPI")
public static final ConfigOption<String> FILE_FORMAT_WRITE_PROVIDER =
key("file.format.write-provider")
.stringType()
.noDefaultValue()
.withDescription("Selects an experimental file format provider for writers.");

@Experimental
@ExcludeFromDocumentation("Experimental format-provider SPI")
public static final ConfigOption<String> FILE_FORMAT_VALIDATION_PROVIDER =
key("file.format.validation-provider")
.stringType()
.noDefaultValue()
.withDescription(
"Selects an experimental file format provider for schema validation.");

public static final ConfigOption<Map<String, String>> FILE_COMPRESSION_PER_LEVEL =
key("file.compression.per.level")
.mapType()
Expand Down Expand Up @@ -2756,7 +2791,7 @@ public Map<Integer, String> statsModePerLevel() {
}

public static String normalizeFileFormat(String fileFormat) {
return StringUtils.isEmpty(fileFormat) ? fileFormat : fileFormat.toLowerCase();
return StringUtils.isEmpty(fileFormat) ? fileFormat : fileFormat.toLowerCase(Locale.ROOT);
}

public String dataFilePrefix() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@
package org.apache.paimon.factories;

import org.apache.paimon.format.FileFormatFactory;
import org.apache.paimon.format.FileFormatProvider;

import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine;

import java.util.List;
import java.util.Locale;
import java.util.stream.Collectors;

import static org.apache.paimon.factories.FactoryUtil.discoverFactories;
Expand All @@ -34,6 +36,9 @@ public class FormatFactoryUtil {
private static final Cache<ClassLoader, List<FileFormatFactory>> FACTORIES =
Caffeine.newBuilder().softValues().maximumSize(100).executor(Runnable::run).build();

private static final Cache<ClassLoader, List<FileFormatProvider>> PROVIDERS =
Caffeine.newBuilder().softValues().maximumSize(100).executor(Runnable::run).build();

/** Discovers a file format factory. */
@SuppressWarnings("unchecked")
public static <T extends FileFormatFactory> T discoverFactory(
Expand Down Expand Up @@ -64,4 +69,53 @@ private static List<FileFormatFactory> getFactories(ClassLoader classLoader) {
return FACTORIES.get(
classLoader, s -> discoverFactories(classLoader, FileFormatFactory.class));
}

/** Discovers file format providers. */
public static List<FileFormatProvider> discoverProviders(ClassLoader classLoader) {
return PROVIDERS.get(
classLoader, s -> discoverFactories(classLoader, FileFormatProvider.class));
}

/** Discovers a file format provider. */
public static FileFormatProvider discoverProvider(ClassLoader classLoader, String identifier) {
final List<FileFormatProvider> foundProviders = discoverProviders(classLoader);
final String normalizedIdentifier = identifier.trim().toLowerCase(Locale.ROOT);

final List<FileFormatProvider> matchingProviders =
foundProviders.stream()
.filter(
f ->
f.identifier()
.trim()
.toLowerCase(Locale.ROOT)
.equals(normalizedIdentifier))
.collect(Collectors.toList());

if (matchingProviders.isEmpty()) {
throw new FactoryException(
String.format(
"Could not find any provider for identifier '%s' that implements FileFormatProvider in the classpath.\n\n"
+ "Available provider identifiers are:\n\n"
+ "%s",
identifier,
foundProviders.stream()
.map(FileFormatProvider::identifier)
.collect(Collectors.joining("\n"))));
}

if (matchingProviders.size() > 1) {
throw new FactoryException(
String.format(
"Multiple providers for identifier '%s' that implement FileFormatProvider found in the classpath.\n\n"
+ "Ambiguous provider classes are:\n\n"
+ "%s",
identifier,
matchingProviders.stream()
.map(p -> p.getClass().getName())
.sorted()
.collect(Collectors.joining("\n"))));
}

return matchingProviders.get(0);
}
}
119 changes: 114 additions & 5 deletions paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;

Expand Down Expand Up @@ -74,6 +75,23 @@ public Optional<SimpleStatsExtractor> createStatsExtractor(
}

public static FileFormat fromIdentifier(String identifier, Options options) {
return fromIdentifier(identifier, options, FileFormatProvider.FORMAT_PROVIDER);
}

public static FileFormat readerFromIdentifier(String identifier, Options options) {
return fromIdentifier(identifier, options, FileFormatProvider.READ_FORMAT_PROVIDER);
}

public static FileFormat writerFromIdentifier(String identifier, Options options) {
return fromIdentifier(identifier, options, FileFormatProvider.WRITE_FORMAT_PROVIDER);
}

public static FileFormat validationFromIdentifier(String identifier, Options options) {
return fromIdentifier(identifier, options, FileFormatProvider.VALIDATION_FORMAT_PROVIDER);
}

private static FileFormat fromIdentifier(
String identifier, Options options, String providerOptionKey) {
return fromIdentifier(
normalizeFileFormat(identifier),
new FormatContext(
Expand All @@ -82,21 +100,88 @@ public static FileFormat fromIdentifier(String identifier, Options options) {
options.get(CoreOptions.WRITE_BATCH_SIZE),
options.get(CoreOptions.WRITE_BATCH_MEMORY),
options.get(CoreOptions.FILE_COMPRESSION_ZSTD_LEVEL),
options.get(CoreOptions.FILE_BLOCK_SIZE)));
options.get(CoreOptions.FILE_BLOCK_SIZE)),
providerOptionKey);
}

/** Create a {@link FileFormat} from format identifier and format options. */
public static FileFormat fromIdentifier(String identifier, FormatContext context) {
return fromIdentifier(identifier, context, FileFormatProvider.FORMAT_PROVIDER);
}

public static FileFormat readerFromIdentifier(String identifier, FormatContext context) {
return fromIdentifier(identifier, context, FileFormatProvider.READ_FORMAT_PROVIDER);
}

public static FileFormat writerFromIdentifier(String identifier, FormatContext context) {
return fromIdentifier(identifier, context, FileFormatProvider.WRITE_FORMAT_PROVIDER);
}

public static FileFormat validationFromIdentifier(String identifier, FormatContext context) {
return fromIdentifier(identifier, context, FileFormatProvider.VALIDATION_FORMAT_PROVIDER);
}

private static FileFormat fromIdentifier(
String identifier, FormatContext context, String providerOptionKey) {
String normalizedIdentifier = normalizeFileFormat(identifier);
ClassLoader classLoader = providerClassLoader();
Optional<FileFormat> operationFormat =
createFromProvider(classLoader, context, providerOptionKey, normalizedIdentifier);
if (operationFormat.isPresent()) {
return operationFormat.get();
}

if (!FileFormatProvider.FORMAT_PROVIDER.equals(providerOptionKey)) {
Optional<FileFormat> genericFormat =
createFromProvider(
classLoader,
context,
FileFormatProvider.FORMAT_PROVIDER,
normalizedIdentifier);
if (genericFormat.isPresent()) {
return genericFormat.get();
}
}

return FormatFactoryUtil.discoverFactory(
FileFormat.class.getClassLoader(), identifier.toLowerCase())
FileFormat.class.getClassLoader(), normalizedIdentifier)
.create(context);
}

private static Optional<FileFormat> createFromProvider(
ClassLoader classLoader,
FormatContext context,
String providerOptionKey,
String normalizedIdentifier) {
String providerIdentifier = context.options().getString(providerOptionKey, null);
if (providerIdentifier == null || providerIdentifier.trim().isEmpty()) {
return Optional.empty();
}
return FormatFactoryUtil.discoverProvider(
classLoader, providerIdentifier.trim().toLowerCase(Locale.ROOT))
.create(normalizedIdentifier, context);
}

private static ClassLoader providerClassLoader() {
ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
if (contextClassLoader != null) {
try {
if (contextClassLoader.loadClass(FileFormatProvider.class.getName())
== FileFormatProvider.class) {
return contextClassLoader;
}
} catch (ClassNotFoundException ignored) {
// Fall back to the class loader that loaded Paimon's format API.
}
}
return FileFormat.class.getClassLoader();
}

protected Options getIdentifierPrefixOptions(Options options) {
Map<String, String> result = new HashMap<>();
String prefix = formatIdentifier.toLowerCase() + ".";
String prefix = formatIdentifier.toLowerCase(Locale.ROOT) + ".";
for (String key : options.keySet()) {
if (key.toLowerCase().startsWith(prefix)) {
if (key.toLowerCase(Locale.ROOT).startsWith(prefix)) {
result.put(prefix + key.substring(prefix.length()), options.get(key));
}
}
Expand All @@ -107,13 +192,37 @@ public static FileFormat fileFormat(CoreOptions options) {
return FileFormat.fromIdentifier(options.fileFormatString(), options.toConfiguration());
}

public static FileFormat readerFileFormat(CoreOptions options) {
return FileFormat.readerFromIdentifier(
options.fileFormatString(), options.toConfiguration());
}

public static FileFormat writerFileFormat(CoreOptions options) {
return FileFormat.writerFromIdentifier(
options.fileFormatString(), options.toConfiguration());
}

public static FileFormat validationFileFormat(CoreOptions options) {
return FileFormat.validationFromIdentifier(
options.fileFormatString(), options.toConfiguration());
}

@Nullable
public static FileFormat vectorFileFormat(CoreOptions options) {
String vectorFileFormat = options.vectorFileFormatString();
if (vectorFileFormat == null) {
return null;
}
return FileFormat.fromIdentifier(vectorFileFormat, options.toConfiguration());
return FileFormat.writerFromIdentifier(vectorFileFormat, options.toConfiguration());
}

@Nullable
public static FileFormat validationVectorFileFormat(CoreOptions options) {
String vectorFileFormat = options.vectorFileFormatString();
if (vectorFileFormat == null) {
return null;
}
return FileFormat.validationFromIdentifier(vectorFileFormat, options.toConfiguration());
}

public static FileFormat manifestFormat(CoreOptions options) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.format;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.annotation.Experimental;
import org.apache.paimon.format.FileFormatFactory.FormatContext;

import java.util.Optional;

/**
* Provider for engine-specific file format implementations.
*
* <p>Engines can use this SPI to provide readers, writers, or validation logic without exposing the
* implementation dependencies used by Paimon's built-in file formats. For example, an engine may
* provide ORC or Parquet implementations backed by its own filesystem and format libraries.
*
* <p>The operation-specific options take precedence for their operation. If the operation-specific
* provider is not configured or returns {@link Optional#empty()}, Paimon falls back to {@link
* #FORMAT_PROVIDER}. If no configured provider handles the format, Paimon uses the built-in {@link
* FileFormatFactory} for the requested format.
*/
@Experimental
public interface FileFormatProvider {

/** Option key used to select a provider discovered from the classpath. */
String FORMAT_PROVIDER = CoreOptions.FILE_FORMAT_PROVIDER.key();

/** Option key used to select a provider for file-format readers. */
String READ_FORMAT_PROVIDER = CoreOptions.FILE_FORMAT_READ_PROVIDER.key();

/** Option key used to select a provider for file-format writers. */
String WRITE_FORMAT_PROVIDER = CoreOptions.FILE_FORMAT_WRITE_PROVIDER.key();

/** Option key used to select a provider for file-format validation. */
String VALIDATION_FORMAT_PROVIDER = CoreOptions.FILE_FORMAT_VALIDATION_PROVIDER.key();

/** Identifier used to select this provider. */
String identifier();

/**
* Creates a file format for the identifier.
*
* <p>The identifier is the normalized file format identifier, such as {@code parquet} or {@code
* orc}. Return {@link Optional#empty()} to let Paimon use the default service-loaded format
* factory for this format.
*/
Optional<FileFormat> create(String identifier, FormatContext context);
}
Loading