Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
<grpc.version>1.75.0</grpc.version><!-- CURRENT_GRPC_VERSION -->
<netty.version>4.1.124.Final</netty.version>
<log4j2.version>2.18.0</log4j2.version>
<jackson.version>2.15.0</jackson.version>
</properties>
<repositories>
<repository>
Expand Down Expand Up @@ -130,6 +131,27 @@
<groupId>de.ruedigermoeller</groupId>
<artifactId>fst</artifactId>
<version>2.57</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,16 @@ private void delayCal(long delay, long delayThreshold) {
heartbeat.setSlaveBehindMaster((int) delayVal);
heartbeat.setDbSynStatus(MySQLHeartbeat.DB_SYN_NORMAL);
} else {
// master and slave maybe switch
heartbeat.setSlaveBehindMaster(null);
heartbeat.setDbSynStatus(MySQLHeartbeat.DB_SYN_ERROR);
if (logic == 0) {
long updatedLogic = dbGroup.getLogicTimestamp().updateAndGet(current -> Math.max(current, delay));
LOGGER.warn("delay detection rebased logic_timestamp to {} for dbGroup {}", updatedLogic, dbGroup.getGroupName());
heartbeat.setSlaveBehindMaster(0);
heartbeat.setDbSynStatus(MySQLHeartbeat.DB_SYN_NORMAL);
} else {
// master and slave maybe switch
heartbeat.setSlaveBehindMaster(null);
heartbeat.setDbSynStatus(MySQLHeartbeat.DB_SYN_ERROR);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ public MySQLHeartbeat(PhysicalDbInstance dbInstance) {
this.heartbeatTimeout = dbInstance.getDbGroupConfig().getHeartbeatTimeout();
this.isDelayDetection = dbInstance.getDbGroupConfig().isDelayDetection();
if (isDelayDetection) {
this.heartbeatSQL = getDetectorSql(dbInstance.getDbGroupConfig().getName(), dbInstance.getDbGroupConfig().getDelayDatabase());
this.heartbeatSQL = getDetectorSql(dbInstance.getDbGroupConfig().getName(),
dbInstance.getDbGroupConfig().getDelayDatabase(), dbInstance.isReadInstance());
} else {
this.heartbeatSQL = source.getDbGroupConfig().getHeartbeatSQL();
}
Expand Down Expand Up @@ -181,12 +182,12 @@ public void heartbeat() {
}
}

private String getDetectorSql(String dbGroupName, String delayDatabase) {
private String getDetectorSql(String dbGroupName, String delayDatabase, boolean readInstance) {
String[] str = {"dble", dbGroupName, SystemConfig.getInstance().getInstanceName()};
String sourceName = Joiner.on("_").join(str);
String sqlTableName = delayDatabase + ".u_delay ";
String detectorSql;
if (!source.isReadInstance()) {
if (!readInstance) {
String update = "replace into ? (source,real_timestamp,logic_timestamp) values ('?','?',?)";
detectorSql = convert(update, Lists.newArrayList(sqlTableName, sourceName));
} else {
Expand All @@ -199,9 +200,14 @@ private String getDetectorSql(String dbGroupName, String delayDatabase) {
private String convert(String template, List<String> list) {
StringBuilder sb = new StringBuilder(template);
String replace = "?";
int fromIndex = 0;
for (String str : list) {
int index = sb.indexOf(replace);
sb.replace(index, index + 1, str);
int index = sb.indexOf(replace, fromIndex);
if (index < 0) {
throw new IllegalArgumentException("heartbeat sql template placeholder '?' not enough, template=" + template + ", values=" + list);
}
sb.replace(index, index + replace.length(), str);
fromIndex = index + str.length();
}
return sb.toString();
}
Expand Down Expand Up @@ -387,11 +393,17 @@ public long getHeartbeatTimeout() {
}

String getHeartbeatSQL() {
if (isDelayDetection && !source.isReadInstance()) {
return convert(heartbeatSQL, Lists.newArrayList(String.valueOf(LocalDateTime.now()), String.valueOf(source.getDbGroup().getLogicTimestamp().incrementAndGet())));
} else {
return heartbeatSQL;
if (isDelayDetection) {
boolean readInstance = source.isReadInstance();
String detectorSql = getDetectorSql(source.getDbGroupConfig().getName(),
source.getDbGroupConfig().getDelayDatabase(), readInstance);
if (!readInstance) {
return convert(detectorSql, Lists.newArrayList(String.valueOf(LocalDateTime.now()),
String.valueOf(source.getDbGroup().getLogicTimestamp().incrementAndGet())));
}
return detectorSql;
}
return heartbeatSQL;
}

public DbInstanceSyncRecorder getAsyncRecorder() {
Expand Down
Loading