Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,10 @@ public interface Scan {
<T> TypedLogScanner<T> createTypedLogScanner(Class<T> pojoClass);

/**
* Creates a {@link BatchScanner} to read current data in the given table bucket for this scan.
* Creates a {@link BatchScanner} to read current data in the given table bucket.
*
* <p>Note: this API doesn't support pre-configured with {@link #project}.
* <p>For Primary Key Tables, this performs a full RocksDB-backed KV scan of the bucket and does
* not require {@link #limit(int)}. For Log Tables, {@link #limit(int)} must be set.
*/
BatchScanner createBatchScanner(TableBucket tableBucket);

Expand All @@ -102,6 +103,9 @@ public interface Scan {
*/
BatchScanner createBatchScanner(TableBucket tableBucket, long snapshotId);

/** Creates a {@link BatchScanner} to read current data in the given table for this scan. */
/**
* Creates a {@link BatchScanner} that scans across all buckets of the table, expanding all
* partitions for partitioned tables. For Log Tables, {@link #limit(int)} must be set.
*/
BatchScanner createBatchScanner() throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.fluss.client.metadata.KvSnapshotMetadata;
import org.apache.fluss.client.table.scanner.batch.BatchScanner;
import org.apache.fluss.client.table.scanner.batch.CompositeBatchScanner;
import org.apache.fluss.client.table.scanner.batch.KvBatchScanner;
import org.apache.fluss.client.table.scanner.batch.KvSnapshotBatchScanner;
import org.apache.fluss.client.table.scanner.batch.LimitBatchScanner;
import org.apache.fluss.client.table.scanner.log.LogScanner;
Expand All @@ -47,6 +48,7 @@

/** API for configuring and creating {@link LogScanner} and {@link BatchScanner}. */
public class TableScan implements Scan {

private final FlussConnection conn;
private final TableInfo tableInfo;
private final SchemaGetter schemaGetter;
Expand Down Expand Up @@ -159,10 +161,19 @@ public BatchScanner createBatchScanner(TableBucket tableBucket) {
"BatchScanner doesn't support filter pushdown. Table: %s, bucket: %s",
tableInfo.getTablePath(), tableBucket));
}
if (tableInfo.hasPrimaryKey() && limit == null) {
return new KvBatchScanner(
tableInfo,
tableBucket,
schemaGetter,
conn.getMetadataUpdater(),
kvBatchSizeBytes(),
projectedColumns);
}
if (limit == null) {
throw new UnsupportedOperationException(
String.format(
"Currently, BatchScanner is only available when limit is set. Table: %s, bucket: %s",
"BatchScanner over a Log Table requires limit to be set. Table: %s, bucket: %s",
tableInfo.getTablePath(), tableBucket));
}
return new LimitBatchScanner(
Expand All @@ -174,6 +185,13 @@ public BatchScanner createBatchScanner(TableBucket tableBucket) {
limit);
}

private int kvBatchSizeBytes() {
return (int)
conn.getConfiguration()
.get(ConfigOptions.CLIENT_SCANNER_KV_FETCH_MAX_BYTES)
.getBytes();
}

@Override
public BatchScanner createBatchScanner(TableBucket tableBucket, long snapshotId) {
if (recordBatchFilter != null) {
Expand Down
Loading