Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions docs/docs/multimodal-table/global-index.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,20 @@ ALTER TABLE my_table SET (

Global index files cover row-id ranges. If more rows are appended after an index is built, those
new rows are not automatically covered by the existing index files. Run `create_global_index` again
to build index files for newly uncovered data. A query that can be answered by a matching global
index reads indexed row ranges; rows in uncovered ranges are not returned for that indexed query.
to build index files for newly uncovered data. By default, queries use fast search and only read
indexed row ranges; rows in uncovered ranges are not returned for that indexed query.

To improve freshness for query types that support raw-data search, set:

```sql
ALTER TABLE my_table SET ('global-index.search-mode' = 'full');
```

With `full` search, supported global-index queries first use the snapshot `nextRowId` and global
index row-id coverage to detect whether any row range is missing from the index. Raw data is scanned
only when such a gap exists. Use `detail` search when data files may have been rewritten or updated
after index creation; it scans data file metadata to find the exact unindexed row ranges and can
handle index invalidation caused by updates or rewrites.

To temporarily disable global-index scan acceleration while keeping the index files, set:

Expand All @@ -147,6 +159,7 @@ These table options affect global index build and read behavior:
| Option | Default | Description |
|---|---|---|
| `global-index.enabled` | `true` | Whether scans can use global indexes. |
| `global-index.search-mode` | `fast` | Search mode for global-index queries. `fast` searches indexed data only. `full` checks snapshot `nextRowId` against global index row-id coverage and scans raw data only if a gap exists. `detail` scans data file metadata to find exact unindexed rows and can handle index invalidation caused by updates or rewrites. |
| `global-index.external-path` | Not set | Root directory for global index files. If not set, files are stored under the table index directory. |
| `sorted-index.records-per-range` | `10000000` | Expected number of records per sorted global index file for BTree and Bitmap builds. |
| `sorted-index.build.max-parallelism` | `4096` | Maximum Flink or Spark parallelism for building sorted global indexes. |
Expand Down
6 changes: 6 additions & 0 deletions docs/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -740,6 +740,12 @@
<td>String</td>
<td>Global index root directory, if not set, the global index files will be stored under the &lt;table-root-directory&gt;/index.</td>
</tr>
<tr>
<td><h5>global-index.search-mode</h5></td>
<td style="word-wrap: break-word;">fast</td>
<td><p>Enum</p></td>
<td>Search mode for global index queries. Supported values are 'fast', 'full', and 'detail'.<br /><br />Possible values:<ul><li>"fast": Only search indexed data.</li><li>"full": Use snapshot next row id and global index coverage to detect missing row ids, and scan raw data only when a gap exists.</li><li>"detail": Scan data files to find exact unindexed rows. This can handle index invalidation caused by updates or rewrites.</li></ul></td>
</tr>
<tr>
<td><h5>global-index.row-count-per-shard</h5></td>
<td style="word-wrap: break-word;">100000</td>
Expand Down
43 changes: 43 additions & 0 deletions paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -2543,6 +2543,14 @@ public InlineElement getDescription() {
.defaultValue(true)
.withDescription("Whether to enable global index for scan.");

public static final ConfigOption<GlobalIndexSearchMode> GLOBAL_INDEX_SEARCH_MODE =
key("global-index.search-mode")
.enumType(GlobalIndexSearchMode.class)
.defaultValue(GlobalIndexSearchMode.FAST)
.withDescription(
"Search mode for global index queries. "
+ "Supported values are 'fast', 'full', and 'detail'.");

public static final ConfigOption<Integer> GLOBAL_INDEX_THREAD_NUM =
key("global-index.thread-num")
.intType()
Expand Down Expand Up @@ -4049,6 +4057,10 @@ public boolean globalIndexEnabled() {
return options.get(GLOBAL_INDEX_ENABLED);
}

public GlobalIndexSearchMode globalIndexSearchMode() {
return options.get(GLOBAL_INDEX_SEARCH_MODE);
}

public Integer globalIndexThreadNum() {
return options.get(GLOBAL_INDEX_THREAD_NUM);
}
Expand Down Expand Up @@ -4846,4 +4858,35 @@ public enum GlobalIndexColumnUpdateAction {
/** Drop all global index entries for the whole partitions affected by the update. */
DROP_PARTITION_INDEX
}

/** Search mode for global index queries. */
public enum GlobalIndexSearchMode implements DescribedEnum {
FAST("fast", "Only search indexed data."),
FULL(
"full",
"Use snapshot next row id and global index coverage to detect missing row ids, "
+ "and scan raw data only when a gap exists."),
DETAIL(
"detail",
"Scan data files to find exact unindexed rows. "
+ "This can handle index invalidation caused by updates or rewrites.");

private final String value;
private final String description;

GlobalIndexSearchMode(String value, String description) {
this.value = value;
this.description = description;
}

@Override
public String toString() {
return value;
}

@Override
public InlineElement getDescription() {
return text(description);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.globalindex;

/** A {@link GlobalIndexer} that supports vector similarity search. */
public interface VectorGlobalIndexer extends GlobalIndexer {

/** Returns the metric name used to convert vector distances to comparable scores. */
String metric();
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.paimon.globalindex.GlobalIndexIOMeta;
import org.apache.paimon.globalindex.GlobalIndexReader;
import org.apache.paimon.globalindex.GlobalIndexWriter;
import org.apache.paimon.globalindex.GlobalIndexer;
import org.apache.paimon.globalindex.VectorGlobalIndexer;
import org.apache.paimon.globalindex.io.GlobalIndexFileReader;
import org.apache.paimon.globalindex.io.GlobalIndexFileWriter;
import org.apache.paimon.options.Options;
Expand All @@ -32,12 +32,13 @@
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;

import static org.apache.paimon.utils.Preconditions.checkArgument;

/**
* A test-only {@link GlobalIndexer} for vector similarity search. Uses brute-force linear scan for
* ANN queries. No native library dependency required.
* A test-only {@link VectorGlobalIndexer} for vector similarity search. Uses brute-force linear
* scan for ANN queries. No native library dependency required.
*
* <p>Supported distance metrics (configured via option {@code test.vector.metric}):
*
Expand All @@ -47,7 +48,7 @@
* <li>{@code inner_product} - Inner product similarity (directly used as score)
* </ul>
*/
public class TestVectorGlobalIndexer implements GlobalIndexer {
public class TestVectorGlobalIndexer implements VectorGlobalIndexer {

/** Option key for vector dimension. */
public static final String OPT_DIMENSION = "test.vector.dimension";
Expand All @@ -59,6 +60,8 @@ public class TestVectorGlobalIndexer implements GlobalIndexer {

public static final String OPT_REQUIRED_OPTION_VALUE = "test.vector.required-option.value";

private static final AtomicInteger METRIC_CALLS = new AtomicInteger();

private final DataType fieldType;
private final int dimension;
private final String metric;
Expand Down Expand Up @@ -96,7 +99,17 @@ public int dimension() {
return dimension;
}

@Override
public String metric() {
METRIC_CALLS.incrementAndGet();
return metric;
}

public static void resetMetricCalls() {
METRIC_CALLS.set(0);
}

public static int metricCalls() {
return METRIC_CALLS.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.paimon.globalindex;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.CoreOptions.GlobalIndexSearchMode;
import org.apache.paimon.Snapshot;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.io.DataFileMeta;
Expand All @@ -39,6 +41,7 @@
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.Range;
import org.apache.paimon.utils.RoaringNavigableMap64;
import org.apache.paimon.utils.RowRangeIndex;

import org.slf4j.Logger;
Expand All @@ -55,6 +58,7 @@
import java.util.function.Function;

import static org.apache.paimon.table.SpecialFields.ROW_ID;
import static org.apache.paimon.table.source.snapshot.TimeTravelUtil.tryTravelOrLatest;
import static org.apache.paimon.utils.ManifestReadThreadPool.randomlyExecuteSequentialReturn;

/** Scan for data evolution table. */
Expand Down Expand Up @@ -241,10 +245,30 @@ public Plan plan() {
ScoreGetter scoreGetter = null;

if (rowRangeIndex == null) {
Optional<GlobalIndexResult> indexResult = evalGlobalIndex();
Snapshot snapshot = null;
Optional<GlobalIndexResult> indexResult;
if (globalIndexResult != null) {
indexResult = Optional.of(globalIndexResult);
} else if (filter == null || !table.coreOptions().globalIndexEnabled()) {
indexResult = Optional.empty();
} else {
snapshot = tryTravelOrLatest(table);
indexResult = evalGlobalIndex(snapshot);
}
if (indexResult.isPresent()) {
GlobalIndexResult result = indexResult.get();
rowRangeIndex = RowRangeIndex.create(result.results().toRangeList());
RoaringNavigableMap64 rowIds = result.results();
List<Range> rowRanges = rowIds.toRangeList();
boolean scanUnindexedRanges =
globalIndexResult == null
&& !(result instanceof ScoredGlobalIndexResult)
&& table.coreOptions().globalIndexSearchMode()
!= GlobalIndexSearchMode.FAST;
if (scanUnindexedRanges) {
rowIds = unindexedRowsScanner(snapshot).withUnindexedRows(rowIds);
rowRanges = rowIds.toRangeList();
}
rowRangeIndex = RowRangeIndex.create(rowRanges);
if (result instanceof ScoredGlobalIndexResult) {
scoreGetter = ((ScoredGlobalIndexResult) result).scoreGetter();
}
Expand All @@ -259,7 +283,15 @@ public Plan plan() {
return wrapToIndexSplits(splits, rowRangeIndex, scoreGetter);
}

private Optional<GlobalIndexResult> evalGlobalIndex() {
private GlobalIndexUnindexedRowsScanner unindexedRowsScanner(Snapshot snapshot) {
return new GlobalIndexUnindexedRowsScanner(
table,
snapshot,
batchScan.snapshotReader().manifestsReader().partitionFilter(),
filter);
}

private Optional<GlobalIndexResult> evalGlobalIndex(Snapshot snapshot) {
if (this.globalIndexResult != null) {
return Optional.of(globalIndexResult);
}
Expand All @@ -273,7 +305,7 @@ private Optional<GlobalIndexResult> evalGlobalIndex() {
PartitionPredicate partitionFilter =
batchScan.snapshotReader().manifestsReader().partitionFilter();
Optional<GlobalIndexScanner> optionalScanner =
GlobalIndexScanner.create(table, partitionFilter, filter);
GlobalIndexScanner.create(table, partitionFilter, filter, snapshot);
if (!optionalScanner.isPresent()) {
return Optional.empty();
}
Expand All @@ -296,7 +328,9 @@ public static Plan wrapToIndexSplits(
Function<Split, List<IndexedSplit>> process =
split ->
Collections.singletonList(
wrap((DataSplit) split, rowRangeIndex, scoreGetter));
split instanceof IndexedSplit
? (IndexedSplit) split
: wrap((DataSplit) split, rowRangeIndex, scoreGetter));
randomlyExecuteSequentialReturn(process, splits, null).forEachRemaining(indexedSplits::add);
return () -> indexedSplits;
}
Expand Down
Loading
Loading