Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ public void configurationPropertyReads() throws Exception {
GlobalEndpointManager globalEndpointManager = new GlobalEndpointManager(databaseAccountManagerInternal,
new ConnectionPolicy(DirectConnectionConfig.getDefaultConfig()), new Configs());
ReflectionUtils.setBackgroundRefreshLocationTimeIntervalInMS(globalEndpointManager, 1000);
// Disable jitter so the background refresh fires within the 2-second sleep windows
// used by this test. Default jitter (0-15s) would push the refresh beyond the sleep.
ReflectionUtils.setBackgroundRefreshJitterMaxInSeconds(globalEndpointManager, 0);
globalEndpointManager.init();

GatewayServiceConfigurationReader configurationReader = new GatewayServiceConfigurationReader(globalEndpointManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ public void refreshLocationAsyncForWriteForbidden() throws Exception {
}

/**
* Test for background refresh disable for multimaster
* Test for background refresh in multi-master: timer must keep running
*/
@Test(groups = {"unit"}, timeOut = TIMEOUT)
public void backgroundRefreshForMultiMaster() throws Exception {
Expand All @@ -236,8 +236,58 @@ public void backgroundRefreshForMultiMaster() throws Exception {
GlobalEndpointManager globalEndPointManager = new GlobalEndpointManager(databaseAccountManagerInternal, connectionPolicy, new Configs());
globalEndPointManager.init();

// Background refresh timer must keep running even for multi-master accounts where
// shouldRefreshEndpoints() returns false. This ensures topology changes (e.g.,
// multi-write <-> single-write transitions) are detected.
AtomicBoolean isRefreshInBackground = getRefreshInBackground(globalEndPointManager);
Assert.assertFalse(isRefreshInBackground.get());
Assert.assertTrue(isRefreshInBackground.get());
LifeCycleUtils.closeQuietly(globalEndPointManager);
}

/**
* Validates that a multi-master account's background refresh timer detects a topology
* change from multi-write to single-write. Without the fix in refreshLocationPrivateAsync,
* the timer stops after init and the transition is never detected.
*/
@Test(groups = {"unit"}, timeOut = TIMEOUT)
public void backgroundRefreshDetectsTopologyChangeForMultiMaster() throws Exception {
// Start with a multi-writer account (dbAccountJson4: MW, East US + East Asia)
ConnectionPolicy connectionPolicy = new ConnectionPolicy(DirectConnectionConfig.getDefaultConfig());
connectionPolicy.setEndpointDiscoveryEnabled(true);
connectionPolicy.setMultipleWriteRegionsEnabled(true);
DatabaseAccount multiWriterAccount = new DatabaseAccount(dbAccountJson4);
Mockito.when(databaseAccountManagerInternal.getDatabaseAccountFromEndpoint(ArgumentMatchers.any()))
.thenReturn(Flux.just(multiWriterAccount));
Mockito.when(databaseAccountManagerInternal.getServiceEndpoint())
.thenReturn(new URI("https://testaccount.documents.azure.com:443"));

GlobalEndpointManager globalEndPointManager = new GlobalEndpointManager(
databaseAccountManagerInternal, connectionPolicy, new Configs());
setBackgroundRefreshLocationTimeIntervalInMS(globalEndPointManager, 500);
setBackgroundRefreshJitterMaxInSeconds(globalEndPointManager, 0);
globalEndPointManager.init();

// Verify multi-writer state: 2 write regions available
LocationCache locationCache = this.getLocationCache(globalEndPointManager);
Map<String, RegionalRoutingContext> availableWriteEndpoints = this.getAvailableWriteEndpointByLocation(locationCache);
Assert.assertEquals(availableWriteEndpoints.size(), 2, "Expected 2 write regions for multi-writer account");
Assert.assertTrue(availableWriteEndpoints.containsKey("East US"));
Assert.assertTrue(availableWriteEndpoints.containsKey("East Asia"));

// Transition to single-writer account (dbAccountJson1: SW, East US only for writes)
DatabaseAccount singleWriterAccount = new DatabaseAccount(dbAccountJson1);
Mockito.when(databaseAccountManagerInternal.getDatabaseAccountFromEndpoint(ArgumentMatchers.any()))
.thenReturn(Flux.just(singleWriterAccount));

// Wait for background refresh to detect the topology change (jitter disabled for test)
Thread.sleep(2000);

// Verify single-writer state: write endpoints updated to reflect single-writer topology
locationCache = this.getLocationCache(globalEndPointManager);
availableWriteEndpoints = this.getAvailableWriteEndpointByLocation(locationCache);
Assert.assertEquals(availableWriteEndpoints.size(), 1, "Expected 1 write region after transition to single-writer");
Assert.assertTrue(availableWriteEndpoints.containsKey("East US"));

LifeCycleUtils.closeQuietly(globalEndPointManager);
}

Expand All @@ -254,14 +304,14 @@ public void startRefreshLocationTimerAsync() throws Exception {
Mockito.when(databaseAccountManagerInternal.getServiceEndpoint()).thenReturn(new URI("https://testaccount.documents.azure.com:443"));
GlobalEndpointManager globalEndPointManager = new GlobalEndpointManager(databaseAccountManagerInternal, connectionPolicy, new Configs());
setBackgroundRefreshLocationTimeIntervalInMS(globalEndPointManager, 1000);
setBackgroundRefreshJitterMaxInSeconds(globalEndPointManager, 0);
globalEndPointManager.init();

databaseAccount = new DatabaseAccount(dbAccountJson2);
Mockito.when(databaseAccountManagerInternal.getDatabaseAccountFromEndpoint(ArgumentMatchers.any())).thenReturn(Flux.just(databaseAccount));
Thread.sleep(2000);

LocationCache locationCache = this.getLocationCache(globalEndPointManager);
Assert.assertEquals(locationCache.getReadEndpoints().size(), 1);
Map<String, RegionalRoutingContext> availableReadEndpointByLocation = this.getAvailableReadEndpointByLocation(locationCache);
Assert.assertEquals(availableReadEndpointByLocation.size(), 1);
Assert.assertTrue(availableReadEndpointByLocation.keySet().iterator().next().equalsIgnoreCase("East Asia"));
Expand Down Expand Up @@ -341,6 +391,12 @@ private void setBackgroundRefreshLocationTimeIntervalInMS(GlobalEndpointManager
backgroundRefreshLocationTimeIntervalInMSField.setInt(globalEndPointManager, millSec);
}

private void setBackgroundRefreshJitterMaxInSeconds(GlobalEndpointManager globalEndPointManager, int seconds) throws Exception {
Comment thread
jeet1995 marked this conversation as resolved.
Field jitterField = GlobalEndpointManager.class.getDeclaredField("backgroundRefreshJitterMaxInSeconds");
jitterField.setAccessible(true);
jitterField.setInt(globalEndPointManager, seconds);
}

private GlobalEndpointManager getGlobalEndPointManager() throws Exception {
ConnectionPolicy connectionPolicy = new ConnectionPolicy(DirectConnectionConfig.getDefaultConfig());
connectionPolicy.setEndpointDiscoveryEnabled(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,10 @@ public static void setBackgroundRefreshLocationTimeIntervalInMS(GlobalEndpointMa
set(globalEndPointManager, millSec, "backgroundRefreshLocationTimeIntervalInMS");
}

public static void setBackgroundRefreshJitterMaxInSeconds(GlobalEndpointManager globalEndPointManager, int seconds){
set(globalEndPointManager, seconds, "backgroundRefreshJitterMaxInSeconds");
}

public static void setDiagnosticsProvider(CosmosAsyncClient cosmosAsyncClient, DiagnosticsProvider tracerProvider){
set(cosmosAsyncClient, tracerProvider, "diagnosticsProvider");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public class Configs {
private static final Protocol DEFAULT_PROTOCOL = Protocol.TCP;

private static final String UNAVAILABLE_LOCATIONS_EXPIRATION_TIME_IN_SECONDS = "COSMOS.UNAVAILABLE_LOCATIONS_EXPIRATION_TIME_IN_SECONDS";
private static final String BACKGROUND_REFRESH_LOCATION_JITTER_MAX_IN_SECONDS = "COSMOS.BACKGROUND_REFRESH_LOCATION_JITTER_MAX_IN_SECONDS";
private static final String GLOBAL_ENDPOINT_MANAGER_INITIALIZATION_TIME_IN_SECONDS = "COSMOS.GLOBAL_ENDPOINT_MANAGER_MAX_INIT_TIME_IN_SECONDS";
private static final String DEFAULT_THINCLIENT_ENDPOINT = "";
private static final String THINCLIENT_ENDPOINT = "COSMOS.THINCLIENT_ENDPOINT";
Expand Down Expand Up @@ -117,6 +118,7 @@ public class Configs {

private static final int DEFAULT_CLIENT_TELEMETRY_SCHEDULING_IN_SECONDS = 10 * 60;
private static final int DEFAULT_UNAVAILABLE_LOCATIONS_EXPIRATION_TIME_IN_SECONDS = 5 * 60;
private static final int DEFAULT_BACKGROUND_REFRESH_LOCATION_JITTER_MAX_IN_SECONDS = 15;

private static final int DEFAULT_MAX_HTTP_BODY_LENGTH_IN_BYTES = 6 * 1024 * 1024; //6MB
private static final int DEFAULT_MAX_HTTP_INITIAL_LINE_LENGTH = 4096; //4KB
Expand Down Expand Up @@ -567,6 +569,10 @@ public int getUnavailableLocationsExpirationTimeInSeconds() {
return getJVMConfigAsInt(UNAVAILABLE_LOCATIONS_EXPIRATION_TIME_IN_SECONDS, DEFAULT_UNAVAILABLE_LOCATIONS_EXPIRATION_TIME_IN_SECONDS);
}

public int getBackgroundRefreshLocationJitterMaxInSeconds() {
return getJVMConfigAsInt(BACKGROUND_REFRESH_LOCATION_JITTER_MAX_IN_SECONDS, DEFAULT_BACKGROUND_REFRESH_LOCATION_JITTER_MAX_IN_SECONDS);
}

public static int getMaxHttpHeaderSize() {
return getJVMConfigAsInt(MAX_HTTP_HEADER_SIZE_IN_BYTES, DEFAULT_MAX_HTTP_REQUEST_HEADER_SIZE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
Expand All @@ -37,6 +37,7 @@ public class GlobalEndpointManager implements AutoCloseable {
private static final CosmosDaemonThreadFactory theadFactory = new CosmosDaemonThreadFactory("cosmos-global-endpoint-mgr");

private final int backgroundRefreshLocationTimeIntervalInMS;
private final int backgroundRefreshJitterMaxInSeconds;
private final LocationCache locationCache;
private final URI defaultEndpoint;
private final ConnectionPolicy connectionPolicy;
Expand Down Expand Up @@ -67,6 +68,7 @@ public Throwable getLatestDatabaseRefreshError() {

public GlobalEndpointManager(DatabaseAccountManagerInternal owner, ConnectionPolicy connectionPolicy, Configs configs) {
this.backgroundRefreshLocationTimeIntervalInMS = configs.getUnavailableLocationsExpirationTimeInSeconds() * 1000;
this.backgroundRefreshJitterMaxInSeconds = configs.getBackgroundRefreshLocationJitterMaxInSeconds();
this.maxInitializationTime = Duration.ofSeconds(configs.getGlobalEndpointManagerMaxInitializationTimeInSeconds());

try {
Expand Down Expand Up @@ -302,6 +304,17 @@ private Mono<Void> refreshLocationPrivateAsync(DatabaseAccount databaseAccount)
return Mono.empty();
} else {
logger.debug("shouldRefreshEndpoints: false, nothing to do.");

// Even when no endpoint refresh is needed right now, we must keep the
Comment thread
jeet1995 marked this conversation as resolved.
// background refresh timer running so that future database account
// topology changes are detected — e.g., multi-write <-> single-write
// transitions, failover priority changes, region add/remove.
// This aligns with the .NET SDK behavior where the background loop
Comment thread
xinlian12 marked this conversation as resolved.
// continues unconditionally as long as the client is alive.
if (!this.refreshInBackground.get()) {
Comment thread
jeet1995 marked this conversation as resolved.
this.startRefreshLocationTimerAsync();
}

this.isRefreshing.set(false);
return Mono.empty();
}
Expand All @@ -320,13 +333,20 @@ private Mono<Void> startRefreshLocationTimerAsync(boolean initialization) {
return Mono.empty();
}

logger.debug("registering a refresh in [{}] ms", this.backgroundRefreshLocationTimeIntervalInMS);
LocalDateTime now = LocalDateTime.now();

int delayInMillis = initialization ? 0: this.backgroundRefreshLocationTimeIntervalInMS;
// Add jitter to the background refresh interval to prevent many CosmosClient
// instances from refreshing simultaneously and overwhelming the compute gateway.
int jitterInSeconds = (initialization || this.backgroundRefreshJitterMaxInSeconds <= 0)
? 0
: ThreadLocalRandom.current().nextInt(0, this.backgroundRefreshJitterMaxInSeconds + 1);
int delayInMillis = initialization ? 0 : this.backgroundRefreshLocationTimeIntervalInMS + (jitterInSeconds * 1000);

this.refreshInBackground.set(true);

logger.debug("Background refresh scheduled with delay [{}] ms (base [{}] ms + jitter [{}] s)",
delayInMillis, this.backgroundRefreshLocationTimeIntervalInMS, jitterInSeconds);

return Mono.delay(Duration.ofMillis(delayInMillis), CosmosSchedulers.COSMOS_PARALLEL)
.flatMap(
t -> {
Expand Down
Loading