Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1296,6 +1296,12 @@ public class Config extends ConfigBase {
@ConfField(mutable = true, masterOnly = true)
public static int streaming_task_timeout_multiplier = 10;

@ConfField(mutable = true, masterOnly = true)
public static int streaming_cdc_light_rpc_timeout_sec = 90;

@ConfField(mutable = true, masterOnly = true)
public static int streaming_cdc_heavy_rpc_timeout_sec = 600;

/**
* the max timeout of get kafka meta.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
* In PostgreSQL/MySQL, multi-table writes are performed by tasks that only make calls.
Expand Down Expand Up @@ -134,9 +136,9 @@ private void sendWriteRequest() throws JobException {
TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBrpcPort());
InternalService.PRequestCdcClientResult result = null;
try {
Future<PRequestCdcClientResult> future =
BackendServiceProxy.getInstance().requestCdcClient(address, request);
result = future.get();
Future<PRequestCdcClientResult> future = BackendServiceProxy.getInstance()
.requestCdcClient(address, request, Config.streaming_cdc_heavy_rpc_timeout_sec);
result = future.get(Config.streaming_cdc_heavy_rpc_timeout_sec, TimeUnit.SECONDS);
TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode());
if (code != TStatusCode.OK) {
log.error("Failed to send write records request, {}", result.getStatus().getErrorMsgs(0));
Expand All @@ -160,6 +162,11 @@ private void sendWriteRequest() throws JobException {
throw new JobException("Failed to parse write records response: " + response);
}
throw new JobException("Failed to send write records request , error message: " + response);
} catch (TimeoutException te) {
log.warn("cdc_client RPC timeout api=/api/writeRecords taskId={} jobId={} backend={}:{} timeout_sec={}",
taskId, getJobId(), backend.getHost(), backend.getBrpcPort(),
Config.streaming_cdc_heavy_rpc_timeout_sec);
throw new JobException("cdc_client RPC timeout: /api/writeRecords taskId=" + taskId);
} catch (ExecutionException | InterruptedException ex) {
log.error("Send write request failed: ", ex);
throw new JobException(ex);
Expand Down Expand Up @@ -331,20 +338,20 @@ public boolean isTimeout() {
* such as a data quality error, and needs to expose it to the user.
*/
public String getTimeoutReason() {
if (runningBackendId <= 0) {
log.info("No running backend for task {}", runningBackendId);
return "";
}
Backend backend = Env.getCurrentSystemInfo().getBackend(runningBackendId);
try {
if (runningBackendId <= 0) {
log.info("No running backend for task {}", runningBackendId);
return "";
}
Backend backend = Env.getCurrentSystemInfo().getBackend(runningBackendId);
InternalService.PRequestCdcClientRequest request = InternalService.PRequestCdcClientRequest.newBuilder()
.setApi("/api/getFailReason/" + getTaskId())
.build();
TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBrpcPort());
InternalService.PRequestCdcClientResult result = null;
Future<PRequestCdcClientResult> future =
BackendServiceProxy.getInstance().requestCdcClient(address, request);
result = future.get();
Future<PRequestCdcClientResult> future = BackendServiceProxy.getInstance()
.requestCdcClient(address, request, Config.streaming_cdc_light_rpc_timeout_sec);
result = future.get(Config.streaming_cdc_light_rpc_timeout_sec, TimeUnit.SECONDS);
TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode());
if (code != TStatusCode.OK) {
log.warn("Failed to get task timeout reason, {}", result.getStatus().getErrorMsgs(0));
Expand All @@ -363,6 +370,11 @@ public String getTimeoutReason() {
} catch (JsonProcessingException e) {
log.warn("Failed to get task timeout reason, response: {}", response);
}
} catch (TimeoutException te) {
log.warn("cdc_client RPC timeout api=/api/getFailReason jobId={} taskId={} backend={}:{} "
+ "timeout_sec={}",
getJobId(), getTaskId(), backend.getHost(), backend.getBrpcPort(),
Config.streaming_cdc_light_rpc_timeout_sec);
} catch (ExecutionException | InterruptedException ex) {
log.warn("Send get task fail reason request failed: ", ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.doris.catalog.Env;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.httpv2.entity.ResponseBody;
import org.apache.doris.httpv2.rest.RestApiStatusCode;
import org.apache.doris.job.cdc.DataSourceConfigKeys;
Expand Down Expand Up @@ -64,6 +65,8 @@
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

@Getter
Expand Down Expand Up @@ -220,9 +223,9 @@ public void fetchRemoteMeta(Map<String, String> properties) throws Exception {
TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBrpcPort());
InternalService.PRequestCdcClientResult result = null;
try {
Future<PRequestCdcClientResult> future =
BackendServiceProxy.getInstance().requestCdcClient(address, request);
result = future.get();
Future<PRequestCdcClientResult> future = BackendServiceProxy.getInstance()
.requestCdcClient(address, request, Config.streaming_cdc_light_rpc_timeout_sec);
result = future.get(Config.streaming_cdc_light_rpc_timeout_sec, TimeUnit.SECONDS);
TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode());
if (code != TStatusCode.OK) {
log.warn("Failed to get end offset from backend, {}", result.getStatus().getErrorMsgs(0));
Expand All @@ -246,6 +249,11 @@ public void fetchRemoteMeta(Map<String, String> properties) throws Exception {
log.warn("Failed to parse end offset response: {}", response);
throw new JobException(response);
}
} catch (TimeoutException te) {
log.warn("cdc_client RPC timeout api=/api/fetchEndOffset jobId={} backend={}:{} timeout_sec={}",
getJobId(), backend.getHost(), backend.getBrpcPort(),
Config.streaming_cdc_light_rpc_timeout_sec);
throw new JobException("cdc_client RPC timeout: /api/fetchEndOffset jobId=" + getJobId());
} catch (ExecutionException | InterruptedException ex) {
log.warn("Get end offset error: ", ex);
throw new JobException(ex);
Expand Down Expand Up @@ -306,9 +314,9 @@ private boolean compareOffset(Map<String, String> offsetFirst, Map<String, Strin
TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBrpcPort());
InternalService.PRequestCdcClientResult result = null;
try {
Future<PRequestCdcClientResult> future =
BackendServiceProxy.getInstance().requestCdcClient(address, request);
result = future.get();
Future<PRequestCdcClientResult> future = BackendServiceProxy.getInstance()
.requestCdcClient(address, request, Config.streaming_cdc_light_rpc_timeout_sec);
result = future.get(Config.streaming_cdc_light_rpc_timeout_sec, TimeUnit.SECONDS);
TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode());
if (code != TStatusCode.OK) {
log.warn("Failed to compare offset , {}", result.getStatus().getErrorMsgs(0));
Expand All @@ -328,6 +336,11 @@ private boolean compareOffset(Map<String, String> offsetFirst, Map<String, Strin
log.warn("Failed to parse compare offset response: {}", response);
throw new JobException("Failed to parse compare offset response: " + response);
}
} catch (TimeoutException te) {
log.warn("cdc_client RPC timeout api=/api/compareOffset jobId={} backend={}:{} timeout_sec={}",
getJobId(), backend.getHost(), backend.getBrpcPort(),
Config.streaming_cdc_light_rpc_timeout_sec);
throw new JobException("cdc_client RPC timeout: /api/compareOffset jobId=" + getJobId());
} catch (ExecutionException | InterruptedException ex) {
log.warn("Compare offset error: ", ex);
throw new JobException(ex);
Expand Down Expand Up @@ -549,9 +562,9 @@ private List<SnapshotSplit> requestTableSplits(String table) throws JobException
TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBrpcPort());
InternalService.PRequestCdcClientResult result = null;
try {
Future<PRequestCdcClientResult> future =
BackendServiceProxy.getInstance().requestCdcClient(address, request);
result = future.get();
Future<PRequestCdcClientResult> future = BackendServiceProxy.getInstance()
.requestCdcClient(address, request, Config.streaming_cdc_heavy_rpc_timeout_sec);
result = future.get(Config.streaming_cdc_heavy_rpc_timeout_sec, TimeUnit.SECONDS);
TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode());
if (code != TStatusCode.OK) {
log.warn("Failed to get split from backend, {}", result.getStatus().getErrorMsgs(0));
Expand All @@ -572,6 +585,11 @@ private List<SnapshotSplit> requestTableSplits(String table) throws JobException
log.warn("Failed to parse split response: {}", response);
throw new JobException("Failed to parse split response: " + response);
}
} catch (TimeoutException te) {
log.warn("cdc_client RPC timeout api=/api/fetchSplits jobId={} backend={}:{} table={} timeout_sec={}",
getJobId(), backend.getHost(), backend.getBrpcPort(), table,
Config.streaming_cdc_heavy_rpc_timeout_sec);
throw new JobException("cdc_client RPC timeout: /api/fetchSplits jobId=" + getJobId() + " table=" + table);
} catch (ExecutionException | InterruptedException ex) {
log.warn("Get splits error: ", ex);
throw new JobException(ex);
Expand Down Expand Up @@ -663,9 +681,9 @@ private void initSourceReader() throws JobException {
TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBrpcPort());
InternalService.PRequestCdcClientResult result = null;
try {
Future<PRequestCdcClientResult> future =
BackendServiceProxy.getInstance().requestCdcClient(address, request);
result = future.get();
Future<PRequestCdcClientResult> future = BackendServiceProxy.getInstance()
.requestCdcClient(address, request, Config.streaming_cdc_heavy_rpc_timeout_sec);
result = future.get(Config.streaming_cdc_heavy_rpc_timeout_sec, TimeUnit.SECONDS);
TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode());
if (code != TStatusCode.OK) {
log.warn("Failed to init job {} reader, {}", getJobId(), result.getStatus().getErrorMsgs(0));
Expand Down Expand Up @@ -693,6 +711,11 @@ private void initSourceReader() throws JobException {
log.warn("Failed to init {} source reader, {}", getJobId(), response);
throw new JobException("Failed to init source reader, cause " + e.getMessage());
}
} catch (TimeoutException te) {
log.warn("cdc_client RPC timeout api=/api/initReader jobId={} backend={}:{} timeout_sec={}",
getJobId(), backend.getHost(), backend.getBrpcPort(),
Config.streaming_cdc_heavy_rpc_timeout_sec);
throw new JobException("cdc_client RPC timeout: /api/initReader jobId=" + getJobId());
} catch (ExecutionException | InterruptedException ex) {
log.warn("init source reader: ", ex);
throw new JobException(ex);
Expand All @@ -711,13 +734,17 @@ public void cleanMeta(Long jobId) throws JobException {
TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBrpcPort());
InternalService.PRequestCdcClientResult result = null;
try {
Future<PRequestCdcClientResult> future =
BackendServiceProxy.getInstance().requestCdcClient(address, request);
result = future.get();
Future<PRequestCdcClientResult> future = BackendServiceProxy.getInstance()
.requestCdcClient(address, request, Config.streaming_cdc_light_rpc_timeout_sec);
result = future.get(Config.streaming_cdc_light_rpc_timeout_sec, TimeUnit.SECONDS);
TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode());
if (code != TStatusCode.OK) {
log.warn("Failed to close job {} source {}", jobId, result.getStatus().getErrorMsgs(0));
}
} catch (TimeoutException te) {
log.warn("cdc_client RPC timeout api=/api/close jobId={} backend={}:{} timeout_sec={}",
jobId, backend.getHost(), backend.getBrpcPort(),
Config.streaming_cdc_light_rpc_timeout_sec);
} catch (ExecutionException | InterruptedException ex) {
log.warn("Close job error: ", ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -203,9 +205,10 @@ private List<Map<String, String>> fetchTaskEndOffset(long taskId, List<Long> sca
String rawResponse = null;
try {
TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBrpcPort());
Future<PRequestCdcClientResult> future =
BackendServiceProxy.getInstance().requestCdcClient(address, request);
InternalService.PRequestCdcClientResult result = future.get();
Future<PRequestCdcClientResult> future = BackendServiceProxy.getInstance()
.requestCdcClient(address, request, Config.streaming_cdc_light_rpc_timeout_sec);
InternalService.PRequestCdcClientResult result =
future.get(Config.streaming_cdc_light_rpc_timeout_sec, TimeUnit.SECONDS);
TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode());
if (code != TStatusCode.OK) {
log.warn("Failed to get task {} offset from BE {}: {}", taskId,
Expand All @@ -221,6 +224,11 @@ private List<Map<String, String>> fetchTaskEndOffset(long taskId, List<Long> sca
log.info("Fetched task {} offset from BE {}: {}", taskId, backend.getHost(), data);
return data;
}
} catch (TimeoutException te) {
log.warn("cdc_client RPC timeout api=/api/getTaskOffset jobId={} taskId={} backend={}:{} "
+ "timeout_sec={}",
jobId, taskId, backend.getHost(), backend.getBrpcPort(),
Config.streaming_cdc_light_rpc_timeout_sec);
} catch (Exception ex) {
log.warn("Get task offset error for task {} from BE {}, raw response: {}",
taskId, backend.getHost(), rawResponse, ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,11 @@ public Future<InternalService.PRequestCdcClientResult> requestCdcClient(
return stub.requestCdcClient(request);
}

public Future<InternalService.PRequestCdcClientResult> requestCdcClient(
InternalService.PRequestCdcClientRequest request, int timeoutSec) {
return stub.withDeadlineAfter(timeoutSec, TimeUnit.SECONDS).requestCdcClient(request);
}

public void shutdown() {
ConnectivityState state = channel.getState(false);
LOG.warn("shut down backend service client: {}, channel state: {}", address, state);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -608,4 +608,15 @@ public Future<InternalService.PRequestCdcClientResult> requestCdcClient(TNetwork
}
return null;
}

public Future<InternalService.PRequestCdcClientResult> requestCdcClient(TNetworkAddress address,
InternalService.PRequestCdcClientRequest request, int timeoutSec) {
try {
final BackendServiceClient client = getProxy(address);
return client.requestCdcClient(request, timeoutSec);
} catch (Throwable e) {
LOG.warn("request cdc client failed, address={}:{}", address.getHostname(), address.getPort(), e);
}
return null;
}
}
Loading