Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/main/java/com/actiontech/dble/DbleServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,9 @@ public void startup() throws Exception {
this.config.testConnection();
LOGGER.info("==========================================Test connection finish==================================");

this.config.createDelayDetectTable();
LOGGER.info("==========================================Create delay detect table finish==================================");

// sync global status
this.config.getAndSyncKeyVariables();
LOGGER.info("=====================================Get And Sync KeyVariables finish=============================");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.io.IOException;
import java.lang.reflect.Type;
import java.util.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class PhysicalDbGroup {
Expand All @@ -42,11 +43,15 @@ public class PhysicalDbGroup {
public static final int RW_SPLIT_ALL = 2;
// weight
public static final int WEIGHT = 0;
private final List<PhysicalDbInstance> writeInstanceList;

enum USAGE {
NONE, RW, SHARDING;
}

private final String groupName;
private final DbGroupConfig dbGroupConfig;
private volatile PhysicalDbInstance writeDbInstance;
private final List<PhysicalDbInstance> writeInstanceList;
private Map<String, PhysicalDbInstance> allSourceMap = new HashMap<>();

private final int rwSplitMode;
Expand All @@ -55,8 +60,10 @@ public class PhysicalDbGroup {
private final LocalReadLoadBalancer localReadLoadBalancer = new LocalReadLoadBalancer();
private final ReentrantReadWriteLock adjustLock = new ReentrantReadWriteLock();

private boolean shardingUseless = true;
private boolean rwSplitUseless = true;
//delayDetection
private AtomicLong logicTimestamp = new AtomicLong();

private USAGE usedFor = USAGE.NONE;

public PhysicalDbGroup(String name, DbGroupConfig config, PhysicalDbInstance writeDbInstances, PhysicalDbInstance[] readDbInstances, int rwSplitMode) {
this.groupName = name;
Expand All @@ -66,8 +73,8 @@ public PhysicalDbGroup(String name, DbGroupConfig config, PhysicalDbInstance wri
writeDbInstances.setDbGroup(this);
this.writeDbInstance = writeDbInstances;
this.writeInstanceList = Collections.singletonList(writeDbInstance);
allSourceMap.put(writeDbInstances.getName(), writeDbInstances);

allSourceMap.put(writeDbInstances.getName(), writeDbInstances);
for (PhysicalDbInstance readDbInstance : readDbInstances) {
readDbInstance.setDbGroup(this);
allSourceMap.put(readDbInstance.getName(), readDbInstance);
Expand All @@ -89,6 +96,54 @@ public PhysicalDbGroup(PhysicalDbGroup org) {
writeInstanceList = Collections.singletonList(writeDbInstance);
}

public void init(String reason) {
for (Map.Entry<String, PhysicalDbInstance> entry : allSourceMap.entrySet()) {
entry.getValue().init(reason);
}
}

// only fresh backend connection pool
public void init(List<String> sourceNames, String reason) {
for (String sourceName : sourceNames) {
if (allSourceMap.containsKey(sourceName)) {
allSourceMap.get(sourceName).init(reason, false);
}
}
}

public void stop(String reason) {
stop(reason, false);
}

public void stop(String reason, boolean closeFront) {
for (PhysicalDbInstance dbInstance : allSourceMap.values()) {
dbInstance.stop(reason, closeFront);
}
}

// only fresh backend connection pool
public void stop(List<String> sourceNames, String reason, boolean closeFront) {
for (String sourceName : sourceNames) {
if (allSourceMap.containsKey(sourceName)) {
allSourceMap.get(sourceName).stop(reason, closeFront, false);
}
}

if (closeFront) {
Iterator<PooledConnection> iterator = IOProcessor.BACKENDS_OLD.iterator();
while (iterator.hasNext()) {
PooledConnection con = iterator.next();
if (con instanceof BackendConnection) {
BackendConnection backendCon = (BackendConnection) con;
if (backendCon.getPoolDestroyedTime() != 0 && sourceNames.contains(backendCon.getInstance().getConfig().getInstanceName())) {
backendCon.closeWithFront("old active backend conn will be forced closed by closing front conn");
iterator.remove();
}
}
}
}
}

public String getGroupName() {
return groupName;
}
Expand Down Expand Up @@ -125,88 +180,46 @@ PhysicalDbInstance findDbInstance(BackendConnection exitsCon) {
}

boolean isSlave(PhysicalDbInstance ds) {
return !(writeDbInstance == ds);
return writeDbInstance != ds;
}

public int getRwSplitMode() {
return rwSplitMode;
}

public boolean isUseless() {
return shardingUseless && rwSplitUseless;
return usedFor == USAGE.NONE;
}

public boolean usedForSharding() {
return usedFor == USAGE.SHARDING;
}

public boolean isShardingUseless() {
return shardingUseless;
public boolean usedForRW() {
return usedFor == USAGE.RW;
}

public boolean isRwSplitUseless() {
return rwSplitUseless;
public void setUsedForSharding() {
usedFor = USAGE.SHARDING;
}

public void setShardingUseless(boolean shardingUseless) {
this.shardingUseless = shardingUseless;
public void setUsedForRW() {
usedFor = USAGE.RW;
}

public void setRwSplitUseless(boolean rwSplitUseless) {
this.rwSplitUseless = rwSplitUseless;
public USAGE getUsedFor() {
return usedFor;
}

private boolean checkSlaveSynStatus() {
return (dbGroupConfig.getDelayThreshold() != -1) &&
(dbGroupConfig.isShowSlaveSql());
return ((dbGroupConfig.getDelayThreshold() != -1) && dbGroupConfig.isShowSlaveSql()) ||
dbGroupConfig.isDelayDetection();
}

public PhysicalDbInstance getWriteDbInstance() {
return writeDbInstance;
}

public void init(String reason) {
for (Map.Entry<String, PhysicalDbInstance> entry : allSourceMap.entrySet()) {
entry.getValue().init(reason);
}
}

public void init(List<String> sourceNames, String reason) {
for (String sourceName : sourceNames) {
if (allSourceMap.containsKey(sourceName)) {
allSourceMap.get(sourceName).init(reason, false);
}
}
}

public void stop(String reason) {
stop(reason, false);
}

public void stop(String reason, boolean closeFront) {
for (PhysicalDbInstance dbInstance : allSourceMap.values()) {
dbInstance.stop(reason, closeFront);
}
}

public void stop(List<String> sourceNames, String reason, boolean closeFront) {
for (String sourceName : sourceNames) {
if (allSourceMap.containsKey(sourceName)) {
allSourceMap.get(sourceName).stop(reason, closeFront, false);
}
}

if (closeFront) {
Iterator<PooledConnection> iterator = IOProcessor.BACKENDS_OLD.iterator();
while (iterator.hasNext()) {
PooledConnection con = iterator.next();
if (con instanceof BackendConnection) {
BackendConnection backendCon = (BackendConnection) con;
if (backendCon.getPoolDestroyedTime() != 0 && sourceNames.contains(backendCon.getInstance().getConfig().getInstanceName())) {
backendCon.closeWithFront("old active backend conn will be forced closed by closing front conn");
iterator.remove();
}
}
}
}
}

public Collection<PhysicalDbInstance> getDbInstances(boolean isAll) {
if (!isAll && rwSplitMode == RW_SPLIT_OFF) {
return writeInstanceList;
Expand All @@ -230,18 +243,6 @@ public PhysicalDbInstance[] getReadDbInstances() {
return readSources;
}

/**
* rwsplit user
*
* @param master
* @param writeStatistical
* @return
* @throws IOException
*/
public PhysicalDbInstance rwSelect(Boolean master, Boolean writeStatistical) throws IOException {
return rwSelect(master, writeStatistical, false);
}

/**
* rwsplit user
*
Expand Down Expand Up @@ -546,6 +547,14 @@ public boolean checkInstanceExist(String instanceName) {
return true;
}

public AtomicLong getLogicTimestamp() {
return logicTimestamp;
}

public void setLogicTimestamp(AtomicLong logicTimestamp) {
this.logicTimestamp = logicTimestamp;
}

private void reportHeartbeatError(PhysicalDbInstance ins) throws IOException {
final DbInstanceConfig config = ins.getConfig();
String heartbeatError = "the dbInstance[" + config.getUrl() + "] can't reach. Please check the dbInstance status";
Expand All @@ -565,7 +574,9 @@ public boolean equalsBaseInfo(PhysicalDbGroup pool) {
pool.getDbGroupConfig().getErrorRetryCount() == this.dbGroupConfig.getErrorRetryCount() &&
pool.getDbGroupConfig().getRwSplitMode() == this.dbGroupConfig.getRwSplitMode() &&
pool.getDbGroupConfig().getDelayThreshold() == this.dbGroupConfig.getDelayThreshold() &&
pool.getDbGroupConfig().getDelayPeriodMillis() == this.dbGroupConfig.getDelayPeriodMillis() &&
pool.getDbGroupConfig().getDelayDatabase().equals(this.dbGroupConfig.getDelayDatabase()) &&
pool.getDbGroupConfig().isDisableHA() == this.dbGroupConfig.isDisableHA() &&
pool.getGroupName().equals(this.groupName) && pool.isShardingUseless() == this.isShardingUseless() && pool.isRwSplitUseless() == this.isRwSplitUseless();
pool.getGroupName().equals(this.groupName) && pool.getUsedFor() == this.getUsedFor();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,12 @@
import com.actiontech.dble.net.factory.MySQLConnectionFactory;
import com.actiontech.dble.net.service.AbstractService;
import com.actiontech.dble.services.mysqlsharding.MySQLResponseService;
import com.actiontech.dble.singleton.Scheduler;
import com.actiontech.dble.singleton.TraceManager;
import com.actiontech.dble.util.StringUtil;
import com.actiontech.dble.util.TimeUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.LongAdder;

Expand Down Expand Up @@ -54,7 +51,6 @@ public abstract class PhysicalDbInstance implements ReadTimeStatusInstance {
private final LongAdder writeCount = new LongAdder();

private final AtomicBoolean isInitial = new AtomicBoolean(false);
private AtomicBoolean initHeartbeat = new AtomicBoolean(false);

// connection pool
private ConnectionPool connectionPool;
Expand Down Expand Up @@ -96,24 +92,30 @@ public void init(String reason, boolean isInitHeartbeat) {
return;
}

if (dbGroup.usedForSharding()) {
checkPoolSize();
}

LOGGER.info("init dbInstance[{}]", this.dbGroup.getGroupName() + "." + name);
start(reason, isInitHeartbeat);
}

private void checkPoolSize() {
int size = config.getMinCon();
String[] physicalSchemas = dbGroup.getSchemas();
int initSize = physicalSchemas.length;
if (size < initSize) {
LOGGER.warn("For db instance[{}], minIdle is less than (the count of shardingNodes), so dble will create at least 1 conn for every schema, " +
LOGGER.warn("For db instance[{}], minIdle is less than (the count of shardingNodes/apNodes), so dble will create at least 1 conn for every schema, " +
"minCon size before:{}, now:{}", this.dbGroup.getGroupName() + "." + name, size, initSize);
config.setMinCon(initSize);
}

initSize = Math.max(initSize, config.getMinCon());
size = config.getMaxCon();
if (size < initSize) {
LOGGER.warn("For db instance[{}], maxTotal[{}] is less than the minCon or the count of shardingNodes,change the maxCon into {}", this.dbGroup.getGroupName() + "." + name, size, initSize);
LOGGER.warn("For db instance[{}], maxTotal[{}] is less than the minCon or the count of shardingNodes/apNodes,change the maxCon into {}", this.dbGroup.getGroupName() + "." + name, size, initSize);
config.setMaxCon(initSize);
}

LOGGER.info("init dbInstance[{}]", this.dbGroup.getGroupName() + "." + name);
start(reason, isInitHeartbeat);
}

public void createConnectionSkipPool(String schema, ResponseHandler handler) {
Expand Down Expand Up @@ -363,36 +365,33 @@ private void startHeartbeat() {
LOGGER.info("the instance[{}] is disabled or fake node, skip to start heartbeat.", this.dbGroup.getGroupName() + "." + name);
return;
}
heartbeat.start(heartbeatRecoveryTime);
}

heartbeat.start();
if (initHeartbeat.compareAndSet(false, true)) {

heartbeat.setScheduledFuture(Scheduler.getInstance().getScheduledExecutor().scheduleAtFixedRate(() -> {
if (DbleServer.getInstance().getConfig().isFullyConfigured()) {
if (TimeUtil.currentTimeMillis() < heartbeatRecoveryTime) {
return;
}

heartbeat.heartbeat();
}
}, 0L, config.getPoolConfig().getHeartbeatPeriodMillis(), TimeUnit.MILLISECONDS));
} else {
LOGGER.warn("init dbInstance[{}] heartbeat, but it has been initialized, skip initialization.", heartbeat.getSource().getName());
}
private void stopHeartbeat(String reason) {
heartbeat.stop(reason);
}

public void start(String reason) {
start(reason, true);
}

public void start(String reason, boolean isStartHeartbeat) {
startPool(reason);
if (isStartHeartbeat) {
startHeartbeat();
}
}

private void startPool(String reason) {
if (disabled.get() || fakeNode) {
LOGGER.info("init dbInstance[{}] because {}, but it is disabled or a fakeNode, skip initialization.", this.dbGroup.getGroupName() + "." + name, reason);
return;
}
if ((dbGroupConfig.getRwSplitMode() != RW_SPLIT_OFF || dbGroup.getWriteDbInstance() == this) && !dbGroup.isUseless()) {
LOGGER.info("start connection pool of physical db instance[{}], due to {}", this.dbGroup.getGroupName() + "." + name, reason);
this.connectionPool.startEvictor();
}
if (isStartHeartbeat) {
startHeartbeat();
}
}

public void stop(String reason, boolean closeFront) {
Expand All @@ -401,17 +400,18 @@ public void stop(String reason, boolean closeFront) {

public void stop(String reason, boolean closeFront, boolean isStopHeartbeat) {
if (isStopHeartbeat) {
final boolean stop = heartbeat.isStop();
heartbeat.stop(reason);
if (!stop) {
initHeartbeat.set(false);
}
stopHeartbeat(reason);
}
stopPool(reason, closeFront);

isInitial.set(false);
}

private void stopPool(String reason, boolean closeFront) {
if (dbGroupConfig.getRwSplitMode() != RW_SPLIT_OFF || dbGroup.getWriteDbInstance() == this) {
LOGGER.info("stop connection pool of physical db instance[{}], due to {}", this.dbGroup.getGroupName() + "." + name, reason);
connectionPool.stop(reason, closeFront);
}
isInitial.set(false);
}

public void closeAllConnection(String reason) {
Expand Down
Loading
Loading