Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
bd25771
HTTP/2 PING health check + connection max lifetime eviction with jitter
jeet1995 Mar 14, 2026
5717ea6
CI pipeline + README updates for HTTP network fault tests
jeet1995 Mar 14, 2026
d039814
Adding HTTP/2 ping and HTTP connection lifecycle capabilities.
jeet1995 Mar 14, 2026
4974ddf
Adding HTTP/2 ping and HTTP connection lifecycle capabilities.
jeet1995 Mar 14, 2026
2ff4f43
Adding HTTP/2 ping and HTTP connection lifecycle capabilities.
jeet1995 Mar 14, 2026
ed2264b
Adding HTTP/2 ping and HTTP connection lifecycle capabilities.
jeet1995 Mar 14, 2026
283b12f
Adding HTTP/2 ping and HTTP connection lifecycle capabilities.
jeet1995 Mar 14, 2026
af4b972
Per-connection jitter, PING health improvements, eviction rate limite…
jeet1995 Mar 23, 2026
d6b5209
Decouple max lifetime from PING health, two-phase graceful eviction
jeet1995 Mar 23, 2026
30b8b11
Address self-review comments on SPEC
jeet1995 Mar 23, 2026
da053eb
Defensive defaults: 30min max lifetime, boolean config guards
jeet1995 Mar 23, 2026
073f229
PING is keepalive only (not eviction), rewrite SPEC
jeet1995 Mar 23, 2026
13a960a
Align code with SPEC: PING is keepalive only, remove dead eviction code
jeet1995 Mar 23, 2026
ea78d6e
FilterableDnsResolverGroup: test fixture for e2e DNS rotation validation
jeet1995 Mar 23, 2026
8e9878c
Extend SPEC to cover HTTP/1.1 connections (ChangeFeed uses H1.1)
jeet1995 Mar 23, 2026
9a6fa24
Max lifetime + PING keepalive for both HTTP/1.1 and HTTP/2
jeet1995 Mar 23, 2026
e7d93aa
SPEC: Add HTTP/1.1 keepalive gap analysis to Future Work
jeet1995 Mar 23, 2026
c2edf1e
SPEC: Fix PING scope to H2-only, fix duplicate numbering
jeet1995 Mar 23, 2026
a9fd95c
Merge remote-tracking branch 'upstream/main' into AzCosmos_HttpConnec…
jeet1995 Mar 31, 2026
f6273cb
Stabilize HTTP/2 connection lifecycle: native PING, subtractive jitte…
jeet1995 Mar 31, 2026
8af167a
Merge remote-tracking branch 'origin/main' into AzCosmos_HttpConnecti…
jeet1995 Apr 3, 2026
64be575
Add dnsRotationAfterMaxLifetimeExpiry E2E test
jeet1995 Apr 3, 2026
4faa473
Add CHANGELOG entries for HTTP connection lifecycle features
jeet1995 Apr 3, 2026
7d3ec81
Add manual HTTP/2 PING handler and counting test
jeet1995 Apr 3, 2026
06d818f
Fix tc prio qdisc priomap in bifurcation tests
jeet1995 Apr 3, 2026
0de812e
Merge branch 'main' of https://github.com/jeet1995/azure-sdk-for-java…
jeet1995 Apr 9, 2026
e1cfdb3
Merge remote-tracking branch 'origin/AzCosmos_HttpConnectionMaxLife' …
jeet1995 Apr 9, 2026
1d2ffb5
Fix PING comments, FQN names, TestNG group, and cleanup helpers
jeet1995 Apr 9, 2026
186fa74
Interceptor pattern for DNS resolver and doOnConnected injection
jeet1995 Apr 9, 2026
b793c2c
Extract NetworkFaultInjector shared utility from lifecycle tests
jeet1995 Apr 9, 2026
c34c2f9
Refactor bifurcation tests to NetworkFaultInjector + fix SPEC for cus…
jeet1995 Apr 9, 2026
67bc107
Fix flaky tc netem timeout tests
jeet1995 Apr 9, 2026
88426ea
Dynamic runtime toggle for eviction + PING, Http2PingHandler cleanup
jeet1995 Apr 9, 2026
0ff7dcb
Integrate DNS blocking into azure-cosmos-benchmark
jeet1995 Apr 10, 2026
95e71f1
Fix DNS blocking config: add applyField cases for dnsBlockingEnabled/…
jeet1995 Apr 10, 2026
6bc1401
fix: address PR #48420 review findings
jeet1995 Apr 12, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions sdk/cosmos/azure-cosmos-benchmark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ Licensed under the MIT License.
<version>2.29.0-beta.1</version> <!-- {x-version-update;com.azure:azure-cosmos-encryption;current} -->
</dependency>

<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-cosmos-test</artifactId>
<version>1.0.0-beta.19</version> <!-- {x-version-update;com.azure:azure-cosmos-test;current} -->
</dependency>

<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-identity</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.azure.cosmos.models.CosmosBulkOperations;
import com.azure.cosmos.models.CosmosItemOperation;
import com.azure.cosmos.models.ThroughputProperties;
import com.azure.cosmos.test.faultinjection.FilterableDnsResolverGroup;

import io.micrometer.core.instrument.MeterRegistry;
import org.apache.commons.lang3.RandomStringUtils;
Expand Down Expand Up @@ -138,8 +139,23 @@ abstract class AsyncBenchmark<T> implements Benchmark {
benchmarkSpecificClientBuilder = benchmarkSpecificClientBuilder.gatewayMode(gatewayConnectionConfig);
}

// DNS blocking: inject FilterableDnsResolverGroup for IP rotation testing
FilterableDnsResolverGroup dnsResolver = null;
if (cfg.isDnsBlockingEnabled()) {
dnsResolver = new FilterableDnsResolverGroup();
com.azure.cosmos.test.implementation.interceptor.CosmosInterceptorHelper
.registerHttpClientInterceptor(benchmarkSpecificClientBuilder, dnsResolver, null);
logger.info("DNS blocking enabled — FilterableDnsResolverGroup injected, cycle={}min",
cfg.getDnsBlockingCycleMinutes());
}

benchmarkWorkloadClient = benchmarkSpecificClientBuilder.buildAsyncClient();

// Start DNS blocking scheduler if enabled
if (dnsResolver != null) {
startDnsBlockingScheduler(dnsResolver, cfg);
}

try {
cosmosAsyncDatabase = benchmarkWorkloadClient.getDatabase(cfg.getDatabaseId());
cosmosAsyncDatabase.read().block();
Expand Down Expand Up @@ -410,4 +426,58 @@ protected Mono sparsityMono(long i) {
return null;
}

private void startDnsBlockingScheduler(FilterableDnsResolverGroup resolver, TenantWorkloadConfig cfg) {
int cycleMinutes = cfg.getDnsBlockingCycleMinutes();
String endpoint = cfg.getServiceEndpoint();
List<String> regions = cfg.getPreferredRegionsList();
String preferredRegion = (regions != null && !regions.isEmpty()) ? regions.get(0) : null;

Thread scheduler = new Thread(() -> {
try {
// Resolve regional endpoint IPs
String host = java.net.URI.create(endpoint).getHost();
if (preferredRegion != null) {
host = host.replace(".documents.azure.com",
"-" + preferredRegion.toLowerCase().replace(" ", "") + ".documents.azure.com");
}
java.net.InetAddress[] allIps = java.net.InetAddress.getAllByName(host);
logger.info("[DNS-BLOCKING] Resolved {}: {} IPs", host, allIps.length);
for (java.net.InetAddress ip : allIps) {
logger.info("[DNS-BLOCKING] {}", ip.getHostAddress());
}

if (allIps.length == 0) {
logger.warn("[DNS-BLOCKING] No IPs resolved — scheduler exiting");
return;
}

int ipIndex = 0;
while (!Thread.currentThread().isInterrupted()) {
// Phase: normal (all IPs)
logger.info("[DNS-BLOCKING] Phase: NORMAL (all IPs available) for {} min", cycleMinutes);
Thread.sleep(cycleMinutes * 60_000L);

// Phase: block one IP
java.net.InetAddress blockIp = allIps[ipIndex % allIps.length];
resolver.blockIp(blockIp);
logger.info("[DNS-BLOCKING] Phase: BLOCKED {} for {} min", blockIp.getHostAddress(), cycleMinutes);
Thread.sleep(cycleMinutes * 60_000L);

// Phase: unblock
resolver.unblockAll();
logger.info("[DNS-BLOCKING] Phase: UNBLOCKED all IPs");
ipIndex++;
}
} catch (InterruptedException e) {
logger.info("[DNS-BLOCKING] Scheduler interrupted — stopping");
Thread.currentThread().interrupt();
} catch (Exception e) {
logger.error("[DNS-BLOCKING] Scheduler failed", e);
}
});
scheduler.setDaemon(true);
scheduler.setName("dns-blocking-scheduler");
scheduler.start();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,12 @@ public enum Environment {
@JsonProperty("http2MaxConcurrentStreams")
private Integer http2MaxConcurrentStreams;

@JsonProperty("dnsBlockingEnabled")
private Boolean dnsBlockingEnabled;

@JsonProperty("dnsBlockingCycleMinutes")
private Integer dnsBlockingCycleMinutes;

@JsonProperty("preferredRegionsList")
private String preferredRegionsList;

Expand Down Expand Up @@ -358,6 +364,14 @@ public Integer getHttp2MaxConcurrentStreams() {
return http2MaxConcurrentStreams;
}

public boolean isDnsBlockingEnabled() {
return dnsBlockingEnabled != null && dnsBlockingEnabled;
}

public int getDnsBlockingCycleMinutes() {
return dnsBlockingCycleMinutes != null ? dnsBlockingCycleMinutes : 10;
}

public List<String> getPreferredRegionsList() {
if (preferredRegionsList == null || preferredRegionsList.isEmpty()) return null;
List<String> regions = new ArrayList<>();
Expand Down Expand Up @@ -532,6 +546,10 @@ private void applyField(String key, String value, boolean overwrite) {
if (overwrite || http2Enabled == null) http2Enabled = Boolean.parseBoolean(value); break;
case "http2MaxConcurrentStreams":
if (overwrite || http2MaxConcurrentStreams == null) http2MaxConcurrentStreams = Integer.parseInt(value); break;
case "dnsBlockingEnabled":
if (overwrite || dnsBlockingEnabled == null) dnsBlockingEnabled = Boolean.parseBoolean(value); break;
case "dnsBlockingCycleMinutes":
if (overwrite || dnsBlockingCycleMinutes == null) dnsBlockingCycleMinutes = Integer.parseInt(value); break;
// JVM-global properties (minConnectionPoolSizePerEndpoint, isPartitionLevelCircuitBreakerEnabled,
// isPerPartitionAutomaticFailoverRequired) are handled in BenchmarkConfig, not per-tenant.
case "minConnectionPoolSizePerEndpoint":
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.test.faultinjection;

import io.netty.channel.EventLoop;
import io.netty.resolver.AddressResolverGroup;
import io.netty.resolver.InetNameResolver;
import io.netty.resolver.InetSocketAddressResolver;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Promise;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

/**
* Test fixture: a DNS resolver that wraps JVM resolution but dynamically filters out
* blocked IPs. Allows e2e tests to simulate DNS changes mid-workload without OS-level
* hacks (no /etc/hosts, no iptables, no external DNS server).
*
* <p>Usage:
* <pre>{@code
* FilterableDnsResolverGroup resolver = new FilterableDnsResolverGroup();
*
* // Wire into reactor-netty HttpClient via .resolver(resolver)
* // ... run workload, connections land on IP1 ...
*
* resolver.blockIp(ip1); // dynamic — no restart, no OS change
* // ... wait for max lifetime → new connection → resolves to IP2 ...
*
* resolver.unblockIp(ip1); // dynamic — IP1 available again
* }</pre>
*/
public class FilterableDnsResolverGroup extends AddressResolverGroup<InetSocketAddress> {

private static final Logger logger = LoggerFactory.getLogger(FilterableDnsResolverGroup.class);

private final Set<InetAddress> blockedIps = ConcurrentHashMap.newKeySet();

/**
* Block an IP — future DNS resolutions will exclude it.
* Takes effect immediately for new connections. Existing connections are unaffected.
*/
public void blockIp(InetAddress ip) {
blockedIps.add(ip);
logger.info("Blocked IP: {} (total blocked: {})", ip.getHostAddress(), blockedIps.size());
}

/**
* Unblock an IP — future DNS resolutions may return it again.
*/
public void unblockIp(InetAddress ip) {
blockedIps.remove(ip);
logger.info("Unblocked IP: {} (total blocked: {})", ip.getHostAddress(), blockedIps.size());
}

/**
* Unblock all IPs.
*/
public void unblockAll() {
blockedIps.clear();
logger.info("Unblocked all IPs");
}

/**
* Returns the current set of blocked IPs (snapshot).
*/
public Set<InetAddress> getBlockedIps() {
return Collections.unmodifiableSet(new HashSet<>(blockedIps));
}

@Override
protected io.netty.resolver.AddressResolver<InetSocketAddress> newResolver(EventExecutor executor) {
return new InetSocketAddressResolver(executor, new FilterableNameResolver(executor));
}

private class FilterableNameResolver extends InetNameResolver {

FilterableNameResolver(EventExecutor executor) {
super(executor);
}

@Override
protected void doResolve(String inetHost, Promise<InetAddress> promise) {
try {
InetAddress[] allAddrs = InetAddress.getAllByName(inetHost);
List<InetAddress> filtered = Arrays.stream(allAddrs)
.filter(addr -> !blockedIps.contains(addr))
.collect(Collectors.toList());

if (filtered.isEmpty()) {
promise.setFailure(new UnknownHostException(
"All resolved IPs for " + inetHost + " are blocked: "
+ Arrays.toString(allAddrs)));
} else {
if (logger.isDebugEnabled()) {
logger.debug("Resolved {} → {} (blocked {} of {})",
inetHost, filtered.get(0).getHostAddress(),
allAddrs.length - filtered.size(), allAddrs.length);
}
promise.setSuccess(filtered.get(0));
}
} catch (UnknownHostException e) {
promise.setFailure(e);
}
}

@Override
protected void doResolveAll(String inetHost, Promise<List<InetAddress>> promise) {
try {
InetAddress[] allAddrs = InetAddress.getAllByName(inetHost);
List<InetAddress> filtered = Arrays.stream(allAddrs)
.filter(addr -> !blockedIps.contains(addr))
.collect(Collectors.toList());

if (filtered.isEmpty()) {
promise.setFailure(new UnknownHostException(
"All resolved IPs for " + inetHost + " are blocked: "
+ Arrays.toString(allAddrs)));
} else {
if (logger.isDebugEnabled()) {
logger.debug("Resolved all {} → {} IPs (blocked {} of {})",
inetHost, filtered.size(),
allAddrs.length - filtered.size(), allAddrs.length);
}
promise.setSuccess(filtered);
}
} catch (UnknownHostException e) {
promise.setFailure(e);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos.test.implementation.interceptor;

import com.azure.cosmos.implementation.interceptor.IHttpClientInterceptor;
import io.netty.resolver.AddressResolverGroup;
import reactor.netty.Connection;

import java.util.function.Consumer;

/**
* Test-side HTTP client interceptor for injecting custom DNS resolvers and
* connection handlers at client construction time.
*/
public class CosmosHttpClientInterceptor implements IHttpClientInterceptor {

private final AddressResolverGroup<?> addressResolverGroup;
private final Consumer<Connection> doOnConnectedCallback;

public CosmosHttpClientInterceptor(
AddressResolverGroup<?> addressResolverGroup,
Consumer<Connection> doOnConnectedCallback) {

this.addressResolverGroup = addressResolverGroup;
this.doOnConnectedCallback = doOnConnectedCallback;
}

@Override
public AddressResolverGroup<?> getAddressResolverGroup() {
return this.addressResolverGroup;
}

@Override
public Consumer<Connection> getDoOnConnectedCallback() {
return this.doOnConnectedCallback;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,15 @@
package com.azure.cosmos.test.implementation.interceptor;

import com.azure.cosmos.CosmosAsyncClient;
import com.azure.cosmos.CosmosClientBuilder;
import com.azure.cosmos.implementation.ImplementationBridgeHelpers;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.directconnectivity.StoreResponse;
import io.netty.resolver.AddressResolverGroup;
import reactor.netty.Connection;

import java.util.function.BiFunction;
import java.util.function.Consumer;

public class CosmosInterceptorHelper {
public static void registerTransportClientInterceptor(
Expand All @@ -21,4 +25,25 @@ public static void registerTransportClientInterceptor(
.getCosmosAsyncClientAccessor()
.registerTransportClientInterceptor(client, transportClientInterceptor);
}

/**
* Registers a custom DNS resolver and/or doOnConnected callback on the builder.
* Must be called before {@code builder.buildAsyncClient()}.
*
* @param builder the CosmosClientBuilder (pre-build)
* @param addressResolverGroup custom DNS resolver, or null for default
* @param doOnConnectedCallback custom connection callback, or null for none
*/
public static void registerHttpClientInterceptor(
CosmosClientBuilder builder,
AddressResolverGroup<?> addressResolverGroup,
Consumer<Connection> doOnConnectedCallback) {

CosmosHttpClientInterceptor interceptor = new CosmosHttpClientInterceptor(
addressResolverGroup, doOnConnectedCallback);
ImplementationBridgeHelpers
.CosmosClientBuilderHelper
.getCosmosClientBuilderAccessor()
.setHttpClientInterceptor(builder, interceptor);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ docker run --rm --cap-add=NET_ADMIN --memory 8g \
java -DCOSMOS.THINCLIENT_ENABLED=true \
-DCOSMOS.THINCLIENT_CONNECTION_TIMEOUT_IN_MS=1000 \
-DCOSMOS.HTTP2_ENABLED=true \
org.testng.TestNG /workspace/azure-cosmos-tests/src/test/resources/manual-thinclient-network-delay-testng.xml \
org.testng.TestNG /workspace/azure-cosmos-tests/src/test/resources/manual-http-network-fault-testng.xml \
-verbose 2
'
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ docker run --rm --cap-add=NET_ADMIN --memory 8g \
-DACCOUNT_KEY=$ACCOUNT_KEY \
-DCOSMOS.THINCLIENT_ENABLED=true \
-DCOSMOS.HTTP2_ENABLED=true \
org.testng.TestNG /workspace/azure-cosmos-tests/src/test/resources/manual-thinclient-network-delay-testng.xml \
org.testng.TestNG /workspace/azure-cosmos-tests/src/test/resources/manual-http-network-fault-testng.xml \
-verbose 2
'
```
Expand Down
Loading
Loading