Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -57,22 +57,22 @@
<td>Write blob field using blob descriptor rather than blob bytes.</td>
</tr>
<tr>
<td><h5>blob-external-storage-field</h5></td>
<td><h5>blob-descriptor-field</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Comma-separated BLOB field names (must be a subset of 'blob-descriptor-field') whose raw data will be written to external storage at write time. The external storage path is configured via 'blob-external-storage-path'. Orphan file cleanup is not applied to that path.</td>
<td>Comma-separated BLOB field names to store as serialized BlobDescriptor bytes inline in data files.</td>
</tr>
<tr>
<td><h5>blob-external-storage-path</h5></td>
<td><h5>blob-external-storage-field</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>The external storage path where raw BLOB data from fields configured by 'blob-external-storage-field' is written at write time. Orphan file cleanup is not applied to this path.</td>
<td>Comma-separated BLOB field names (must be a subset of 'blob-descriptor-field') whose raw data will be written to external storage at write time. The external storage path is configured via 'blob-external-storage-path'. Orphan file cleanup is not applied to that path.</td>
</tr>
<tr>
<td><h5>blob-descriptor-field</h5></td>
<td><h5>blob-external-storage-path</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Comma-separated BLOB field names to store as serialized BlobDescriptor bytes inline in data files.</td>
<td>The external storage path where raw BLOB data from fields configured by 'blob-external-storage-field' is written at write time. Orphan file cleanup is not applied to this path.</td>
</tr>
<tr>
<td><h5>blob-field</h5></td>
Expand Down Expand Up @@ -410,6 +410,12 @@
<td>Boolean</td>
<td>Whether enable data evolution for row tracking table.</td>
</tr>
<tr>
<td><h5>data-evolution.force-split-row-range-contiguous</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether force each split group to contain only row-range-contiguous files for data evolution table scan splitting.</td>
</tr>
<tr>
<td><h5>data-file.external-paths</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
11 changes: 11 additions & 0 deletions paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -2092,6 +2092,13 @@ public InlineElement getDescription() {
.defaultValue(false)
.withDescription("Whether enable data evolution for row tracking table.");

public static final ConfigOption<Boolean> DATA_EVOLUTION_FORCE_SPLIT_ROW_RANGE_CONTIGOUS =
key("data-evolution.force-split-row-range-contiguous")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether force each split group to contain only row-range-contiguous files for data evolution table scan splitting.");

public static final ConfigOption<Boolean> SNAPSHOT_IGNORE_EMPTY_COMMIT =
key("snapshot.ignore-empty-commit")
.booleanType()
Expand Down Expand Up @@ -3413,6 +3420,10 @@ public boolean dataEvolutionEnabled() {
return options.get(DATA_EVOLUTION_ENABLED);
}

public boolean dataEvolutionForceSplitRowRangeContigous() {
return options.get(DATA_EVOLUTION_FORCE_SPLIT_ROW_RANGE_CONTIGOUS);
}

public boolean prepareCommitWaitCompaction() {
if (!needLookup()) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.paimon.globalindex;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.index.GlobalIndexMeta;
import org.apache.paimon.index.IndexFileMeta;
Expand All @@ -35,7 +37,9 @@
public class GlobalIndexBuilderUtils {

public static List<IndexFileMeta> toIndexFileMetas(
FileStoreTable table,
FileIO fileIO,
IndexPathFactory indexPathFactory,
CoreOptions options,
Range range,
int indexFieldId,
String indexType,
Expand All @@ -44,12 +48,11 @@ public static List<IndexFileMeta> toIndexFileMetas(
List<IndexFileMeta> results = new ArrayList<>();
for (ResultEntry entry : entries) {
String fileName = entry.fileName();
GlobalIndexFileReadWrite readWrite = createGlobalIndexFileReadWrite(table);
long fileSize = readWrite.fileSize(fileName);
long fileSize = fileIO.getFileSize(indexPathFactory.toPath(fileName));
GlobalIndexMeta globalIndexMeta =
new GlobalIndexMeta(range.from, range.to, indexFieldId, null, entry.meta());

Path externalPathDir = table.coreOptions().globalIndexExternalPath();
Path externalPathDir = options.globalIndexExternalPath();
String externalPathString = null;
if (externalPathDir != null) {
Path externalPath = new Path(externalPathDir, fileName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
package org.apache.paimon.globalindex.btree;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.InternalRow.FieldGetter;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.globalindex.GlobalIndexParallelWriter;
import org.apache.paimon.globalindex.GlobalIndexWriter;
Expand All @@ -30,6 +32,7 @@
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataIncrement;
import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.reader.RecordReader;
Expand Down Expand Up @@ -87,8 +90,14 @@ public class BTreeGlobalIndexBuilder implements Serializable {
@Nullable private PartitionPredicate partitionPredicate;

public BTreeGlobalIndexBuilder(Table table) {
this.table = (FileStoreTable) table;
this.rowType = table.rowType();
this.table =
((FileStoreTable) table)
.copy(
Collections.singletonMap(
CoreOptions.DATA_EVOLUTION_FORCE_SPLIT_ROW_RANGE_CONTIGOUS
.key(),
"true"));
this.rowType = this.table.rowType();
this.options = this.table.coreOptions().toConfiguration();
this.recordsPerRange =
(long) (options.get(BTreeIndexOptions.BTREE_INDEX_RECORDS_PER_RANGE) * FLOATING);
Expand Down Expand Up @@ -132,10 +141,46 @@ public List<DataSplit> scan() {
if (partitionPredicate != null) {
snapshotReader = snapshotReader.withPartitionFilter(partitionPredicate);
}
Snapshot latestSnapshot = snapshotReader.snapshotManager().latestSnapshot();
if (latestSnapshot == null) {
return Collections.emptyList();
}
snapshotReader = snapshotReader.withSnapshot(latestSnapshot);

Preconditions.checkArgument(indexType != null, "indexType must be set before scan.");
Preconditions.checkArgument(indexField != null, "indexField must be set before scan.");

Range dataRange = new Range(0, latestSnapshot.nextRowId() - 1);
List<Range> indexedRanges = indexedRowRanges(latestSnapshot);
List<Range> nonIndexedRanges = dataRange.exclude(indexedRanges);
if (nonIndexedRanges.isEmpty()) {
return Collections.emptyList();
}
snapshotReader = snapshotReader.withRowRanges(nonIndexedRanges);
return snapshotReader.read().dataSplits();
}

private List<Range> indexedRowRanges(Snapshot snapshot) {
List<Range> ranges = new ArrayList<>();
for (IndexManifestEntry entry :
table.store().newIndexFileHandler().scan(snapshot, indexType)) {
if (partitionPredicate != null && !partitionPredicate.test(entry.partition())) {
continue;
}
if (entry.indexFile().globalIndexMeta() == null) {
continue;
}
if (entry.indexFile().globalIndexMeta().indexFieldId() != indexField.id()) {
continue;
}
ranges.add(
new Range(
entry.indexFile().globalIndexMeta().rowRangeStart(),
entry.indexFile().globalIndexMeta().rowRangeEnd()));
}
return Range.sortAndMergeOverlap(ranges, true);
}

public List<CommitMessage> build(List<DataSplit> splits, IOManager ioManager)
throws IOException {
Range rowRange = calcRowRange(splits);
Expand Down Expand Up @@ -226,6 +271,43 @@ public List<CommitMessage> build(Range rowRange, Iterator<InternalRow> data)
return commitMessages;
}

public List<CommitMessage> buildForSinglePartition(
Range rowRange,
BinaryRow partition,
Iterator<InternalRow> data,
int indexFieldPos,
int rowIdPos)
throws IOException {
long counter = 0;
GlobalIndexParallelWriter currentWriter = null;
List<CommitMessage> commitMessages = new ArrayList<>();
FieldGetter indexFieldGetter =
InternalRow.createFieldGetter(indexField.type(), indexFieldPos);

while (data.hasNext()) {
InternalRow row = data.next();

if (currentWriter != null && counter >= recordsPerRange) {
commitMessages.add(flushIndex(rowRange, currentWriter.finish(), partition));
currentWriter = null;
counter = 0;
}

counter++;
if (currentWriter == null) {
currentWriter = createWriter();
}

long localRowId = row.getLong(rowIdPos) - rowRange.from;
currentWriter.write(indexFieldGetter.getFieldOrNull(row), localRowId);
}

if (counter > 0) {
commitMessages.add(flushIndex(rowRange, currentWriter.finish(), partition));
}
return commitMessages;
}

public GlobalIndexParallelWriter createWriter() throws IOException {
GlobalIndexParallelWriter currentWriter;
GlobalIndexWriter indexWriter = createIndexWriter(table, indexType, indexField, options);
Expand All @@ -242,22 +324,34 @@ public CommitMessage flushIndex(
Range rowRange, List<ResultEntry> resultEntries, BinaryRow partition)
throws IOException {
List<IndexFileMeta> indexFileMetas =
toIndexFileMetas(table, rowRange, indexField.id(), indexType, resultEntries);
toIndexFileMetas(
table.fileIO(),
table.store().pathFactory().globalIndexFileFactory(),
table.coreOptions(),
rowRange,
indexField.id(),
indexType,
resultEntries);
DataIncrement dataIncrement = DataIncrement.indexIncrement(indexFileMetas);
return new CommitMessageImpl(
partition, 0, null, dataIncrement, CompactIncrement.emptyIncrement());
}

public static Range calcRowRange(List<DataSplit> dataSplits) {
long start = Long.MAX_VALUE;
long end = Long.MIN_VALUE;
List<Range> ranges = calcRowRanges(dataSplits);
if (ranges.isEmpty()) {
return null;
}
return new Range(ranges.get(0).from, ranges.get(ranges.size() - 1).to);
}

public static List<Range> calcRowRanges(List<DataSplit> dataSplits) {
List<Range> ranges = new ArrayList<>();
for (DataSplit dataSplit : dataSplits) {
for (DataFileMeta file : dataSplit.dataFiles()) {
Range range = file.nonNullRowIdRange();
start = Math.min(start, range.from);
end = Math.max(end, range.to);
ranges.add(file.nonNullRowIdRange());
}
}
return start == Long.MAX_VALUE ? null : new Range(start, end);
return Range.sortAndMergeOverlap(ranges, true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,10 @@ protected SplitGenerator splitGenerator() {
boolean blobFileSizeCountInSplitting = options.blobSplitByFileSize();
return coreOptions().dataEvolutionEnabled()
? new DataEvolutionSplitGenerator(
targetSplitSize, openFileCost, blobFileSizeCountInSplitting)
targetSplitSize,
openFileCost,
blobFileSizeCountInSplitting,
options.dataEvolutionForceSplitRowRangeContigous())
: new AppendOnlySplitGenerator(targetSplitSize, openFileCost, bucketMode());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.paimon.utils.BinPacking;
import org.apache.paimon.utils.RangeHelper;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.function.Function;
Expand All @@ -34,12 +35,17 @@ public class DataEvolutionSplitGenerator implements SplitGenerator {
private final long targetSplitSize;
private final long openFileCost;
private final boolean countBlobSize;
private final boolean forceSplitRowRangeContigous;

public DataEvolutionSplitGenerator(
long targetSplitSize, long openFileCost, boolean countBlobSize) {
long targetSplitSize,
long openFileCost,
boolean countBlobSize,
boolean forceSplitRowRangeContigous) {
this.targetSplitSize = targetSplitSize;
this.openFileCost = openFileCost;
this.countBlobSize = countBlobSize;
this.forceSplitRowRangeContigous = forceSplitRowRangeContigous;
}

@Override
Expand All @@ -64,23 +70,77 @@ public List<SplitGroup> splitForBatch(List<DataFileMeta> input) {
: meta.fileSize())
.sum(),
openFileCost);
if (forceSplitRowRangeContigous) {
return packByContiguousRanges(ranges, weightFunc);
}
return BinPacking.packForOrdered(ranges, weightFunc, targetSplitSize).stream()
.map(
f -> {
boolean rawConvertible = f.stream().allMatch(file -> file.size() == 1);
List<DataFileMeta> groupFiles =
f.stream()
.flatMap(Collection::stream)
.collect(Collectors.toList());
return rawConvertible
? SplitGroup.rawConvertibleGroup(groupFiles)
: SplitGroup.nonRawConvertibleGroup(groupFiles);
})
.map(this::toPackedSplitGroup)
.collect(Collectors.toList());
}

@Override
public List<SplitGroup> splitForStreaming(List<DataFileMeta> files) {
return splitForBatch(files);
}

private SplitGroup toPackedSplitGroup(List<List<DataFileMeta>> fileGroups) {
boolean rawConvertible = fileGroups.stream().allMatch(file -> file.size() == 1);
List<DataFileMeta> groupFiles =
fileGroups.stream().flatMap(Collection::stream).collect(Collectors.toList());
return rawConvertible
? SplitGroup.rawConvertibleGroup(groupFiles)
: SplitGroup.nonRawConvertibleGroup(groupFiles);
}

private List<SplitGroup> packByContiguousRanges(
List<List<DataFileMeta>> ranges, Function<List<DataFileMeta>, Long> weightFunc) {
if (ranges.isEmpty()) {
return new ArrayList<>();
}

List<SplitGroup> result = new ArrayList<>();
List<List<DataFileMeta>> currentSegment = new ArrayList<>();
long currentMaxRowId = Long.MIN_VALUE;

for (List<DataFileMeta> rangeFiles : ranges) {
long minRowId = minRowId(rangeFiles);
long maxRowId = maxRowId(rangeFiles);
if (currentSegment.isEmpty() || areContiguous(currentMaxRowId, minRowId)) {
currentSegment.add(rangeFiles);
currentMaxRowId = maxRowId;
} else {
result.addAll(
BinPacking.packForOrdered(currentSegment, weightFunc, targetSplitSize)
.stream()
.map(this::toPackedSplitGroup)
.collect(Collectors.toList()));
currentSegment = new ArrayList<>();
currentSegment.add(rangeFiles);
currentMaxRowId = maxRowId;
}
}

result.addAll(
BinPacking.packForOrdered(currentSegment, weightFunc, targetSplitSize).stream()
.map(this::toPackedSplitGroup)
.collect(Collectors.toList()));
return result;
}

private long minRowId(List<DataFileMeta> files) {
return files.stream()
.mapToLong(f -> f.nonNullRowIdRange().from)
.min()
.orElse(Long.MAX_VALUE);
}

private long maxRowId(List<DataFileMeta> files) {
return files.stream().mapToLong(f -> f.nonNullRowIdRange().to).max().orElse(Long.MIN_VALUE);
}

private boolean areContiguous(long previousMaxRowId, long currentMinRowId) {
// Contiguous means no gap between adjacent ranges.
// e.g. previous max == current min (as requested) or previous max + 1 == current min.
return previousMaxRowId >= currentMinRowId - 1;
}
}
Loading