Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,17 @@
public class BucketOffsetsRetrieverImpl implements OffsetsInitializer.BucketOffsetsRetriever {
private final Admin flussAdmin;
private final TablePath tablePath;
private final Boolean fetchEarliestOffset;

public BucketOffsetsRetrieverImpl(Admin flussAdmin, TablePath tablePath) {
this(flussAdmin, tablePath, false);
}

public BucketOffsetsRetrieverImpl(
Admin flussAdmin, TablePath tablePath, Boolean fetchEarliestOffset) {
this.flussAdmin = flussAdmin;
this.tablePath = tablePath;
this.fetchEarliestOffset = fetchEarliestOffset;
}

@Override
Expand All @@ -52,11 +59,15 @@ public Map<Integer, Long> latestOffsets(
@Override
public Map<Integer, Long> earliestOffsets(
@Nullable String partitionName, Collection<Integer> buckets) {
Map<Integer, Long> bucketWithOffset = new HashMap<>(buckets.size());
for (Integer bucket : buckets) {
bucketWithOffset.put(bucket, EARLIEST_OFFSET);
if (!fetchEarliestOffset) {
Map<Integer, Long> bucketWithOffset = new HashMap<>(buckets.size());
for (Integer bucket : buckets) {
bucketWithOffset.put(bucket, EARLIEST_OFFSET);
}
return bucketWithOffset;
} else {
return listOffsets(partitionName, buckets, new OffsetSpec.EarliestSpec());
}
return bucketWithOffset;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,14 @@ object SparkFlussConf {
.durationType()
.defaultValue(Duration.ofMillis(10000L))
.withDescription("The timeout for log scanner to poll records.")

val SCAN_MAX_RECORDS_PER_PARTITION: ConfigOption[java.lang.Long] =
ConfigBuilder
.key("scan.max.records.per.partition")
.longType()
.noDefaultValue()
.withDescription(
"The maximum number of records per Spark input partition when reading a log table. " +
"When set, each Fluss bucket whose offset range exceeds this value will be split " +
"into multiple partitions. Disabled by default (one partition per bucket).")
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.fluss.client.table.scanner.log.LogScanner
import org.apache.fluss.config.Configuration
import org.apache.fluss.metadata.{PartitionInfo, TableBucket, TableInfo, TablePath}
import org.apache.fluss.predicate.Predicate
import org.apache.fluss.spark.SparkFlussConf

import org.apache.spark.sql.connector.read.{Batch, InputPartition, PartitionReaderFactory}
import org.apache.spark.sql.types.StructType
Expand Down Expand Up @@ -91,26 +92,64 @@ class FlussAppendBatch(
}

override def planInputPartitions(): Array[InputPartition] = {
val bucketOffsetsRetrieverImpl = new BucketOffsetsRetrieverImpl(admin, tablePath)
val maxRecordsPerPartition: Option[Long] = {
val opt = flussConfig.getOptional(SparkFlussConf.SCAN_MAX_RECORDS_PER_PARTITION)
if (opt.isPresent) Some(opt.get().longValue()) else None
}

val bucketOffsetsRetrieverImpl = maxRecordsPerPartition match {
case Some(_) => new BucketOffsetsRetrieverImpl(admin, tablePath, true)
case None => new BucketOffsetsRetrieverImpl(admin, tablePath)
}
val buckets = (0 until tableInfo.getNumBuckets).toSeq

def splitOffsetRange(
tableBucket: TableBucket,
startOffset: Long,
stopOffset: Long,
maxRecords: Long): Seq[InputPartition] = {
if (
startOffset < 0 || stopOffset <= startOffset || stopOffset <= (startOffset + maxRecords)
) {
return Seq(
FlussAppendInputPartition(tableBucket, startOffset, stopOffset)
.asInstanceOf[InputPartition])
}
val rangeSize = stopOffset - startOffset
val numSplits = ((rangeSize + maxRecords - 1) / maxRecords).toInt
val step = (rangeSize + numSplits - 1) / numSplits

Iterator
.from(0)
.take(numSplits)
.map(i => startOffset + i * step)
.map {
from =>
FlussAppendInputPartition(tableBucket, from, math.min(from + step, stopOffset))
.asInstanceOf[InputPartition]
}
.toSeq
}

def createPartitions(
partitionId: Option[Long],
startBucketOffsets: Map[Integer, Long],
stoppingBucketOffsets: Map[Integer, Long]): Array[InputPartition] = {
buckets.map {
buckets.flatMap {
bucketId =>
val (startBucketOffset, stoppingBucketOffset) =
val (startOffset, stopOffset) =
(startBucketOffsets(bucketId), stoppingBucketOffsets(bucketId))
partitionId match {
case Some(partitionId) =>
val tableBucket = new TableBucket(tableInfo.getTableId, partitionId, bucketId)
FlussAppendInputPartition(tableBucket, startBucketOffset, stoppingBucketOffset)
.asInstanceOf[InputPartition]
val tableBucket = partitionId match {
case Some(pid) => new TableBucket(tableInfo.getTableId, pid, bucketId)
case None => new TableBucket(tableInfo.getTableId, bucketId)
}
maxRecordsPerPartition match {
case Some(maxRecs) =>
splitOffsetRange(tableBucket, startOffset, stopOffset, maxRecs)
case None =>
val tableBucket = new TableBucket(tableInfo.getTableId, bucketId)
FlussAppendInputPartition(tableBucket, startBucketOffset, stoppingBucketOffset)
.asInstanceOf[InputPartition]
Seq(
FlussAppendInputPartition(tableBucket, startOffset, stopOffset)
.asInstanceOf[InputPartition])
}
}.toArray
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -524,4 +524,13 @@ class SparkLogTableReadTest extends FlussSparkTestBase {
assert(numRowsRead == 5L, s"Expected 5 rows read, got $numRowsRead")
}
}

test("Spark Read: split partition by config") {
withSampleTable {
withSQLConf("spark.sql.fluss.scan.max.records.per.partition" -> "2") {
val query = sql(s"SELECT amount FROM $DEFAULT_DATABASE.t ORDER BY orderId")
checkAnswer(query, Row(601) :: Row(602) :: Row(603) :: Row(604) :: Row(605) :: Nil)
}
}
}
}