Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

package org.apache.fluss.flink.lake.reader;

import org.apache.fluss.flink.source.reader.LogRecordRowIterator;
import org.apache.fluss.record.LogRecord;
import org.apache.fluss.row.InternalRow;
import org.apache.fluss.utils.CloseableIterator;

/**
Expand All @@ -39,7 +39,7 @@
* <li>Maintains iterator semantics for sequential data access
* </ul>
*/
public class IndexedLakeSplitRecordIterator implements CloseableIterator<InternalRow> {
public class IndexedLakeSplitRecordIterator implements LogRecordRowIterator {
private final CloseableIterator<LogRecord> logRecordIterators;
private final int currentLakeSplitIndex;

Expand All @@ -64,7 +64,7 @@ public boolean hasNext() {
}

@Override
public InternalRow next() {
return logRecordIterators.next().getRow();
public LogRecord nextLogRecord() {
return logRecordIterators.next();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.fluss.client.table.scanner.batch.BatchScanner;
import org.apache.fluss.flink.lake.split.LakeSnapshotSplit;
import org.apache.fluss.flink.source.reader.LogRecordRowIterator;
import org.apache.fluss.lake.source.LakeSource;
import org.apache.fluss.lake.source.LakeSplit;
import org.apache.fluss.record.LogRecord;
Expand Down Expand Up @@ -66,7 +67,7 @@ public void close() throws IOException {
}
}

private static class InternalRowIterator implements CloseableIterator<InternalRow> {
private static class InternalRowIterator implements LogRecordRowIterator {

private final CloseableIterator<LogRecord> recordCloseableIterator;

Expand All @@ -90,8 +91,8 @@ public boolean hasNext() {
}

@Override
public InternalRow next() {
return recordCloseableIterator.next().getRow();
public LogRecord nextLogRecord() {
return recordCloseableIterator.next();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,26 +65,30 @@ public class FlinkSourceReaderMetrics {
// Map for tracking current consuming offsets
private final Map<TableBucket, Long> offsets = new HashMap<>();

// For currentFetchEventTimeLag metric
private volatile long currentFetchEventTimeLag = UNINITIALIZED;
private volatile long maxFetchEventTimeLag = UNINITIALIZED;
// Map for tracking current fetch event time lag by table bucket
private final Map<TableBucket, Long> currentFetchEventTimeLags = new HashMap<>();

public FlinkSourceReaderMetrics(SourceReaderMetricGroup sourceReaderMetricGroup) {
this.sourceReaderMetricGroup = sourceReaderMetricGroup;
this.flussSourceReaderMetricGroup =
sourceReaderMetricGroup.addGroup(FLUSS_METRIC_GROUP).addGroup(READER_METRIC_GROUP);
}

public void reportRecordEventTime(long lag) {
if (currentFetchEventTimeLag == UNINITIALIZED) {
// Lazily register the currentFetchEventTimeLag
// Set the lag before registering the metric to avoid metric reporter getting
// the uninitialized value
currentFetchEventTimeLag = lag;
sourceReaderMetricGroup.gauge(
MetricNames.CURRENT_FETCH_EVENT_TIME_LAG, () -> currentFetchEventTimeLag);
return;
public void reportRecordEventTime(TableBucket tableBucket, long timestamp) {
if (!currentFetchEventTimeLags.containsKey(tableBucket)) {
registerEventTimeLagMetricsForTableBucket(tableBucket);
}
long lag = System.currentTimeMillis() - timestamp;
currentFetchEventTimeLags.put(tableBucket, lag);

if (lag > maxFetchEventTimeLag) {
if (maxFetchEventTimeLag == UNINITIALIZED) {
sourceReaderMetricGroup.gauge(
MetricNames.CURRENT_FETCH_EVENT_TIME_LAG, () -> maxFetchEventTimeLag);
}
maxFetchEventTimeLag = lag;
}
currentFetchEventTimeLag = lag;
}

public void registerTableBucket(TableBucket tableBucket) {
Expand All @@ -105,16 +109,26 @@ public void recordCurrentOffset(TableBucket tb, long offset) {

// -------- Helper functions --------
private void registerOffsetMetricsForTableBucket(TableBucket tableBucket) {
getMetricGroupForTableBucket(tableBucket)
.gauge(
CURRENT_OFFSET_METRIC_GAUGE,
() -> offsets.getOrDefault(tableBucket, INITIAL_OFFSET));
}

private void registerEventTimeLagMetricsForTableBucket(TableBucket tableBucket) {
getMetricGroupForTableBucket(tableBucket)
.gauge(
MetricNames.CURRENT_FETCH_EVENT_TIME_LAG,
() -> currentFetchEventTimeLags.getOrDefault(tableBucket, UNINITIALIZED));
}

private MetricGroup getMetricGroupForTableBucket(TableBucket tableBucket) {
final MetricGroup metricGroup =
tableBucket.getPartitionId() == null
? this.flussSourceReaderMetricGroup
: this.flussSourceReaderMetricGroup.addGroup(
PARTITION_GROUP, String.valueOf(tableBucket.getPartitionId()));
final MetricGroup bucketGroup =
metricGroup.addGroup(BUCKET_GROUP, String.valueOf(tableBucket.getBucket()));
bucketGroup.gauge(
CURRENT_OFFSET_METRIC_GAUGE,
() -> offsets.getOrDefault(tableBucket, INITIAL_OFFSET));
return metricGroup.addGroup(BUCKET_GROUP, String.valueOf(tableBucket.getBucket()));
}

private void checkTableBucketTracked(TableBucket tableBucket) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.fluss.client.table.scanner.batch.BatchScanner;
import org.apache.fluss.flink.lake.reader.IndexedLakeSplitRecordIterator;
import org.apache.fluss.flink.source.split.SnapshotSplit;
import org.apache.fluss.record.LogRecord;
import org.apache.fluss.row.InternalRow;
import org.apache.fluss.row.MemoryAwareGetters;
import org.apache.fluss.row.ProjectedRow;
Expand Down Expand Up @@ -155,7 +156,16 @@ public boolean hasNext() {

@Override
public ScanRecord next() {
InternalRow row = rowIterator.next();
LogRecord logRecord = null;
InternalRow row;
// Most bounded scanners expose row-only iterators. LogRecord-backed iterators preserve
// metadata such as timestamp for event-time lag reporting.
if (rowIterator instanceof LogRecordRowIterator) {
logRecord = ((LogRecordRowIterator) rowIterator).nextLogRecord();
row = logRecord.getRow();
} else {
row = rowIterator.next();
}
// Extract row size when the underlying row supports it (e.g. BinaryRow in
// KV-snapshot paths); falls back to -1 for other row types.
int sizeInBytes = ScanRecord.UNKNOWN_SIZE_IN_BYTES;
Expand All @@ -164,6 +174,14 @@ public ScanRecord next() {
if (underlyingRow instanceof MemoryAwareGetters) {
sizeInBytes = ((MemoryAwareGetters) underlyingRow).getSizeInBytes();
}
if (logRecord != null) {
return new ScanRecord(
logRecord.logOffset(),
logRecord.timestamp(),
logRecord.getChangeType(),
row,
sizeInBytes);
}
return new ScanRecord(row, sizeInBytes);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,11 @@ public RecordAndPos nextRecordFromSplit() {
flinkSourceReaderMetrics.recordCurrentOffset(currentTableBucket, offset);
}

long timestamp = recordAndPos.record().timestamp();
if (timestamp > 0) {
flinkSourceReaderMetrics.reportRecordEventTime(currentTableBucket, timestamp);
}

return recordAndPos;
}
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -424,10 +424,6 @@ private void checkSnapshotSplitOrStartNext() {
}

private FlinkRecordsWithSplitIds forLogRecords(ScanRecords scanRecords) {
// For calculating the currentFetchEventTimeLag
long fetchTimestamp = System.currentTimeMillis();
long maxConsumerRecordTimestampInFetch = -1;

Map<String, CloseableIterator<RecordAndPos>> splitRecords = new HashMap<>();
Map<TableBucket, Long> stoppingOffsets = new HashMap<>();
Set<String> finishedSplits = new HashSet<>();
Expand All @@ -445,10 +441,6 @@ private FlinkRecordsWithSplitIds forLogRecords(ScanRecords scanRecords) {
List<ScanRecord> bucketScanRecords = scanRecords.records(scanBucket);
if (!bucketScanRecords.isEmpty()) {
final ScanRecord lastRecord = bucketScanRecords.get(bucketScanRecords.size() - 1);
// We keep the maximum message timestamp in the fetch for calculating lags
maxConsumerRecordTimestampInFetch =
Math.max(maxConsumerRecordTimestampInFetch, lastRecord.timestamp());

// After processing a record with offset of "stoppingOffset - 1", the split reader
// should not continue fetching because the record with stoppingOffset may not
// exist. Keep polling will just block forever
Expand All @@ -474,14 +466,6 @@ public String next() {
}
};

// We use the timestamp on ScanRecord as the event time to calculate the
// currentFetchEventTimeLag. This is not totally accurate as the event time could be
// overridden by user's custom TimestampAssigner configured in source operator.
if (maxConsumerRecordTimestampInFetch > 0) {
flinkSourceReaderMetrics.reportRecordEventTime(
fetchTimestamp - maxConsumerRecordTimestampInFetch);
}

FlinkRecordsWithSplitIds recordsWithSplitIds =
new FlinkRecordsWithSplitIds(
splitRecords,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.flink.source.reader;

import org.apache.fluss.record.LogRecord;
import org.apache.fluss.row.InternalRow;
import org.apache.fluss.utils.CloseableIterator;

/**
* A row iterator that is backed by {@link LogRecord}s and can preserve record metadata.
*
* <p>Only iterators whose source records have meaningful {@link LogRecord} metadata should
* implement this interface. Row-only snapshot iterators should remain plain {@link
* CloseableIterator}s.
*/
public interface LogRecordRowIterator extends CloseableIterator<InternalRow> {

/** Returns the next log record. */
LogRecord nextLogRecord();

@Override
default InternalRow next() {
return nextLogRecord().getRow();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.testutils.MetricListener;
import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.runtime.metrics.groups.InternalSourceReaderMetricGroup;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -64,6 +65,61 @@ void testCurrentOffsetTracking() {
assertCurrentOffset(t3, 15513L, metricListener);
}

@Test
void testCurrentFetchEventTimeLagTracksMaxLag() {
MetricListener metricListener = new MetricListener();
FlinkSourceReaderMetrics flinkSourceReaderMetrics =
new FlinkSourceReaderMetrics(
InternalSourceReaderMetricGroup.mock(metricListener.getMetricGroup()));
TableBucket tableBucket0 = new TableBucket(0, 0);
TableBucket tableBucket1 = new TableBucket(0, 1);

long timestamp = System.currentTimeMillis() - 100000L;
flinkSourceReaderMetrics.reportRecordEventTime(tableBucket0, timestamp);

Optional<Gauge<Long>> readerEventTimeLagGauge =
metricListener.getGauge(MetricNames.CURRENT_FETCH_EVENT_TIME_LAG);
Optional<Gauge<Long>> bucket0EventTimeLagGauge =
metricListener.getGauge(
FLUSS_METRIC_GROUP,
READER_METRIC_GROUP,
BUCKET_GROUP,
String.valueOf(tableBucket0.getBucket()),
MetricNames.CURRENT_FETCH_EVENT_TIME_LAG);
assertThat(readerEventTimeLagGauge).isPresent();
assertThat(bucket0EventTimeLagGauge).isPresent();
long readerEventTimeLag = readerEventTimeLagGauge.get().getValue();
long bucket0EventTimeLag = bucket0EventTimeLagGauge.get().getValue();

flinkSourceReaderMetrics.reportRecordEventTime(tableBucket0, timestamp - 100000L);
long maxReaderEventTimeLag = readerEventTimeLagGauge.get().getValue();
assertThat(maxReaderEventTimeLag).isGreaterThan(readerEventTimeLag);
assertThat((long) bucket0EventTimeLagGauge.get().getValue())
.isGreaterThan(bucket0EventTimeLag);

long newerTimestamp = System.currentTimeMillis();
flinkSourceReaderMetrics.reportRecordEventTime(tableBucket1, newerTimestamp);
Optional<Gauge<Long>> bucket1EventTimeLagGauge =
metricListener.getGauge(
FLUSS_METRIC_GROUP,
READER_METRIC_GROUP,
BUCKET_GROUP,
String.valueOf(tableBucket1.getBucket()),
MetricNames.CURRENT_FETCH_EVENT_TIME_LAG);
assertThat(bucket1EventTimeLagGauge).isPresent();
assertThat((long) readerEventTimeLagGauge.get().getValue())
.isEqualTo(maxReaderEventTimeLag);
assertThat((long) bucket1EventTimeLagGauge.get().getValue())
.isLessThan(maxReaderEventTimeLag);

long updatedBucket0Timestamp = newerTimestamp - 50000L;
flinkSourceReaderMetrics.reportRecordEventTime(tableBucket0, updatedBucket0Timestamp);
assertThat((long) readerEventTimeLagGauge.get().getValue())
.isEqualTo(maxReaderEventTimeLag);
assertThat((long) bucket0EventTimeLagGauge.get().getValue())
.isLessThan(maxReaderEventTimeLag);
}

// ----------- Assertions --------------

private void assertCurrentOffset(
Expand Down
Loading