Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cloud/src/meta-service/meta_service_txn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -655,12 +655,15 @@ void update_streaming_job_meta(MetaServiceCode& code, std::string& msg,
}
new_job_info.set_scanned_rows(new_job_info.scanned_rows() +
commit_attachment.scanned_rows());
new_job_info.set_filtered_rows(new_job_info.filtered_rows() +
commit_attachment.filtered_rows());
new_job_info.set_load_bytes(new_job_info.load_bytes() + commit_attachment.load_bytes());
new_job_info.set_num_files(new_job_info.num_files() + commit_attachment.num_files());
new_job_info.set_file_bytes(new_job_info.file_bytes() + commit_attachment.file_bytes());
} else {
new_job_info.set_job_id(commit_attachment.job_id());
new_job_info.set_scanned_rows(commit_attachment.scanned_rows());
new_job_info.set_filtered_rows(commit_attachment.filtered_rows());
new_job_info.set_load_bytes(commit_attachment.load_bytes());
new_job_info.set_num_files(commit_attachment.num_files());
new_job_info.set_file_bytes(commit_attachment.file_bytes());
Expand Down Expand Up @@ -981,6 +984,7 @@ void MetaServiceImpl::reset_streaming_job_offset(::google::protobuf::RpcControll
// Preserve existing statistics if they exist
if (prev_existed) {
new_job_info.set_scanned_rows(prev_job_info.scanned_rows());
new_job_info.set_filtered_rows(prev_job_info.filtered_rows());
new_job_info.set_load_bytes(prev_job_info.load_bytes());
new_job_info.set_num_files(prev_job_info.num_files());
new_job_info.set_file_bytes(prev_job_info.file_bytes());
Expand Down
5 changes: 5 additions & 0 deletions cloud/test/meta_service_job_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5213,6 +5213,7 @@ TEST(MetaServiceJobTest, GetStreamingTaskCommitAttachTest) {
streaming_attach->set_job_id(1002);
streaming_attach->set_offset("test_offset_3");
streaming_attach->set_scanned_rows(2000);
streaming_attach->set_filtered_rows(150);
streaming_attach->set_load_bytes(10000);
streaming_attach->set_num_files(20);
streaming_attach->set_file_bytes(15000);
Expand Down Expand Up @@ -5241,6 +5242,7 @@ TEST(MetaServiceJobTest, GetStreamingTaskCommitAttachTest) {
EXPECT_TRUE(response.has_commit_attach());
EXPECT_EQ(response.commit_attach().job_id(), 1002);
EXPECT_EQ(response.commit_attach().scanned_rows(), 2000);
EXPECT_EQ(response.commit_attach().filtered_rows(), 150);
EXPECT_EQ(response.commit_attach().load_bytes(), 10000);
EXPECT_EQ(response.commit_attach().num_files(), 20);
EXPECT_EQ(response.commit_attach().file_bytes(), 15000);
Expand Down Expand Up @@ -5363,6 +5365,7 @@ TEST(MetaServiceJobTest, ResetStreamingJobOffsetTest) {
streaming_attach->set_job_id(job_id);
streaming_attach->set_offset("original_offset");
streaming_attach->set_scanned_rows(1000);
streaming_attach->set_filtered_rows(50);
streaming_attach->set_load_bytes(5000);
streaming_attach->set_num_files(10);
streaming_attach->set_file_bytes(8000);
Expand Down Expand Up @@ -5391,6 +5394,7 @@ TEST(MetaServiceJobTest, ResetStreamingJobOffsetTest) {
EXPECT_TRUE(response.has_commit_attach());
EXPECT_EQ(response.commit_attach().offset(), "original_offset");
EXPECT_EQ(response.commit_attach().scanned_rows(), 1000);
EXPECT_EQ(response.commit_attach().filtered_rows(), 50);
EXPECT_EQ(response.commit_attach().load_bytes(), 5000);
}

Expand Down Expand Up @@ -5427,6 +5431,7 @@ TEST(MetaServiceJobTest, ResetStreamingJobOffsetTest) {
EXPECT_EQ(response.commit_attach().offset(), "reset_offset");
// Other fields should remain unchanged
EXPECT_EQ(response.commit_attach().scanned_rows(), 1000);
EXPECT_EQ(response.commit_attach().filtered_rows(), 50);
EXPECT_EQ(response.commit_attach().load_bytes(), 5000);
EXPECT_EQ(response.commit_attach().num_files(), 10);
EXPECT_EQ(response.commit_attach().file_bytes(), 8000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,8 @@ public static TxnCommitAttachmentPB streamingTaskTxnCommitAttachmentToPb(Streami
.setScannedRows(streamingTaskTxnCommitAttachment.getScannedRows())
.setLoadBytes(streamingTaskTxnCommitAttachment.getLoadBytes())
.setNumFiles(streamingTaskTxnCommitAttachment.getNumFiles())
.setFileBytes(streamingTaskTxnCommitAttachment.getFileBytes());
.setFileBytes(streamingTaskTxnCommitAttachment.getFileBytes())
.setFilteredRows(streamingTaskTxnCommitAttachment.getFilteredRows());

if (streamingTaskTxnCommitAttachment.getOffset() != null) {
builder.setOffset(streamingTaskTxnCommitAttachment.getOffset());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -720,6 +720,7 @@ private void updateJobStatisticAndOffset(StreamingTaskTxnCommitAttachment attach
this.jobStatistic.setLoadBytes(this.jobStatistic.getLoadBytes() + attachment.getLoadBytes());
this.jobStatistic.setFileNumber(this.jobStatistic.getFileNumber() + attachment.getNumFiles());
this.jobStatistic.setFileSize(this.jobStatistic.getFileSize() + attachment.getFileBytes());
this.jobStatistic.setFilteredRows(this.jobStatistic.getFilteredRows() + attachment.getFilteredRows());
Offset endOffset = offsetProvider.deserializeOffset(attachment.getOffset());
offsetProvider.updateOffset(endOffset);
// Sync offsetProviderPersist after each offset update so the checkpoint thread
Expand All @@ -735,6 +736,7 @@ private void updateJobStatisticAndOffset(StreamingTaskTxnCommitAttachment attach
//update metric
if (MetricRepo.isInit && !isReplay) {
MetricRepo.COUNTER_STREAMING_JOB_TOTAL_ROWS.increase(attachment.getScannedRows());
MetricRepo.COUNTER_STREAMING_JOB_FILTER_ROWS.increase(attachment.getFilteredRows());
MetricRepo.COUNTER_STREAMING_JOB_LOAD_BYTES.increase(attachment.getLoadBytes());
}
}
Expand All @@ -747,11 +749,13 @@ private void updateCloudJobStatisticAndOffset(StreamingTaskTxnCommitAttachment a
this.jobStatistic.setLoadBytes(attachment.getLoadBytes());
this.jobStatistic.setFileNumber(attachment.getNumFiles());
this.jobStatistic.setFileSize(attachment.getFileBytes());
this.jobStatistic.setFilteredRows(attachment.getFilteredRows());
offsetProvider.updateOffset(offsetProvider.deserializeOffset(attachment.getOffset()));

//update metric
if (MetricRepo.isInit && !isReplay) {
MetricRepo.COUNTER_STREAMING_JOB_TOTAL_ROWS.update(attachment.getScannedRows());
MetricRepo.COUNTER_STREAMING_JOB_FILTER_ROWS.update(attachment.getFilteredRows());
MetricRepo.COUNTER_STREAMING_JOB_LOAD_BYTES.update(attachment.getLoadBytes());
}
}
Expand Down Expand Up @@ -1134,6 +1138,7 @@ public void beforeCommitted(TransactionState txnState) throws TransactionExcepti
loadStatistic.getLoadBytes(),
loadStatistic.getFileNumber(),
loadStatistic.getTotalFileSizeB(),
loadStatistic.getFilteredRows(),
offsetJson));
passCheck = true;
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,16 @@ public StreamingTaskTxnCommitAttachment() {
}

public StreamingTaskTxnCommitAttachment(long jobId, long taskId,
long scannedRows, long loadBytes, long numFiles, long fileBytes, String offset) {
long scannedRows, long loadBytes, long numFiles, long fileBytes,
long filteredRows, String offset) {
super(TransactionState.LoadJobSourceType.STREAMING_JOB);
this.jobId = jobId;
this.taskId = taskId;
this.scannedRows = scannedRows;
this.loadBytes = loadBytes;
this.numFiles = numFiles;
this.fileBytes = fileBytes;
this.filteredRows = filteredRows;
this.offset = offset;
}

Expand All @@ -49,6 +51,7 @@ public StreamingTaskTxnCommitAttachment(StreamingTaskCommitAttachmentPB pb) {
this.loadBytes = pb.getLoadBytes();
this.numFiles = pb.getNumFiles();
this.fileBytes = pb.getFileBytes();
this.filteredRows = pb.getFilteredRows();
this.offset = pb.getOffset();
}

Expand All @@ -69,6 +72,9 @@ public StreamingTaskTxnCommitAttachment(StreamingTaskCommitAttachmentPB pb) {
@SerializedName(value = "fs")
@Getter
private long fileBytes;
@SerializedName(value = "fr")
@Getter
private long filteredRows;
@SerializedName(value = "of")
@Getter
private String offset;
Expand All @@ -80,6 +86,7 @@ public String toString() {
+ ", loadBytes=" + loadBytes
+ ", numFiles=" + numFiles
+ ", fileBytes=" + fileBytes
+ ", filteredRows=" + filteredRows
+ ", offset=" + offset
+ "]";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ public class LoadStatistic {
public int fileNum = 0;
public long totalFileSizeB = 0;

// number of rows filtered by BE (DPP_ABNORMAL_ALL), set once after coordinator finishes
private long filteredRows = 0;

// init the statistic of specified load task
public synchronized void initLoad(TUniqueId loadId, Set<TUniqueId> fragmentIds, List<Long> relatedBackendIds) {
counterTbl.rowMap().remove(loadId);
Expand Down Expand Up @@ -133,6 +136,14 @@ public long getTotalFileSizeB() {
return totalFileSizeB;
}

public long getFilteredRows() {
return filteredRows;
}

public void setFilteredRows(long filteredRows) {
this.filteredRows = filteredRows;
}

public synchronized String toJson() {
long total = 0;
for (long rows : counterTbl.values()) {
Expand All @@ -156,6 +167,7 @@ public synchronized String toJson() {
details.put("LoadBytes", totalBytes);
details.put("FileNumber", fileNum);
details.put("FileSize", totalFileSizeB);
details.put("FilteredRows", filteredRows);
details.put("TaskNumber", counterTbl.rowMap().size());
details.put("Unfinished backends", unfinishedBackendIdsList);
details.put("All backends", allBackendIdsList);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,9 @@ protected final void execImpl(StmtExecutor executor) throws Exception {
if (coordinator.getLoadCounters().get(LoadEtlTask.DPP_ABNORMAL_ALL) != null) {
filteredRows = Integer.parseInt(coordinator.getLoadCounters().get(LoadEtlTask.DPP_ABNORMAL_ALL));
}
if (insertLoadJob != null) {
insertLoadJob.getLoadStatistic().setFilteredRows(filteredRows);
}
}

private void checkStrictModeAndFilterRatio() throws Exception {
Expand Down
1 change: 1 addition & 0 deletions gensrc/proto/cloud.proto
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,7 @@ message StreamingTaskCommitAttachmentPB {
optional int64 load_bytes = 4;
optional int64 num_files = 5;
optional int64 file_bytes = 6;
optional int64 filtered_rows = 7;
}

message TxnCommitAttachmentPB {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import org.awaitility.Awaitility

import static java.util.concurrent.TimeUnit.SECONDS

suite("test_streaming_insert_job_filtered_rows") {
def tableName = "test_streaming_insert_job_filtered_rows_tbl"
def jobName = "test_streaming_insert_job_filtered_rows_name"

sql """drop table if exists `${tableName}` force"""
sql """
DROP JOB IF EXISTS where jobname = '${jobName}'
"""

// c2 INT NOT NULL forces BE to filter every row when CSV provides
// non-parseable strings in that column.
sql """
CREATE TABLE IF NOT EXISTS ${tableName} (
`c1` int NULL,
`c2` int NOT NULL,
`c3` int NULL,
) ENGINE=OLAP
DUPLICATE KEY(`c1`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`c1`) BUCKETS 3
PROPERTIES ("replication_allocation" = "tag.location.default: 1");
"""

// insert_max_filter_ratio=1 lets the task succeed even if every row is filtered.
sql """
CREATE JOB ${jobName}
PROPERTIES(
"s3.max_batch_files" = "1",
"session.enable_insert_strict" = "false",
"session.insert_max_filter_ratio" = "1"
)
ON STREAMING DO INSERT INTO ${tableName}
SELECT * FROM S3
(
"uri" = "s3://${s3BucketName}/regression/load/data/example_[0-1].csv",
"format" = "csv",
"provider" = "${getS3Provider()}",
"column_separator" = ",",
"s3.endpoint" = "${getS3Endpoint()}",
"s3.region" = "${getS3Region()}",
"s3.access_key" = "${getS3AK()}",
"s3.secret_key" = "${getS3SK()}"
);
"""

try {
Awaitility.await().atMost(300, SECONDS)
.pollInterval(1, SECONDS).until(
{
def jobSucceedCount = sql """ select SucceedTaskCount from jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """
log.info("jobSucceedCount: " + jobSucceedCount)
jobSucceedCount.size() == 1 && '2' <= jobSucceedCount.get(0).get(0)
}
)
} catch (Exception ex) {
def showjob = sql """select * from jobs("type"="insert") where Name='${jobName}'"""
def showtask = sql """select * from tasks("type"="insert") where JobName='${jobName}'"""
log.info("show job: " + showjob)
log.info("show task: " + showtask)
throw ex;
}

def jobInfo = sql """
select currentOffset, endoffset, loadStatistic from jobs("type"="insert") where Name='${jobName}'
"""
log.info("jobInfo: " + jobInfo)
def loadStat = parseJson(jobInfo.get(0).get(2))
log.info("loadStatistic: " + jobInfo.get(0).get(2))

assert loadStat.scannedRows == 20
assert loadStat.fileNumber == 2
assert loadStat.filteredRows == 20

def rowCount = sql """ select count(1) from ${tableName} """
assert rowCount.get(0).get(0) == 0

sql """ DROP JOB IF EXISTS where jobname = '${jobName}' """
sql """drop table if exists `${tableName}` force"""
}
Loading