Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -69,14 +71,19 @@ public abstract class IoTDBIoTConsensusV23C3DBasicITBase
protected static final int CLUSTER_INIT_TIMEOUT_SECONDS = 300;

protected static final String INSERTION1 =
"INSERT INTO root.sg.d1(timestamp,speed,temperature) values(100, 1, 2)";
"INSERT INTO root.sg.d1(timestamp, speed, temperature, power) VALUES (100, 1, 2, 3)";
protected static final String INSERTION2 =
"INSERT INTO root.sg.d1(timestamp,speed,temperature) values(101, 3, 4)";
"INSERT INTO root.sg.d1(timestamp, speed, temperature, power) VALUES (101, 4, 5, 6)";
protected static final String INSERTION3 =
"INSERT INTO root.sg.d1(timestamp,speed,temperature) values(102, 5, 6)";
"INSERT INTO root.sg.d1(timestamp, speed, temperature, power) VALUES (102, 7, 8, 9)";
protected static final String FLUSH_COMMAND = "flush on cluster";
protected static final String COUNT_QUERY = "select count(*) from root.sg.**";
protected static final String SELECT_ALL_QUERY = "select speed, temperature from root.sg.d1";
protected static final String SELECT_ALL_QUERY =
"select speed, temperature, power from root.sg.d1";
protected static final String DELETE_TIMESERIES_SPEED = "DELETE TIMESERIES root.sg.d1.speed";
protected static final String SHOW_TIMESERIES_D1 = "SHOW TIMESERIES root.sg.d1.*";
protected static final String SELECT_SURVIVING_QUERY =
"SELECT temperature, power FROM root.sg.d1";

/**
* Returns IoTConsensusV2 mode: {@link ConsensusFactory#IOT_CONSENSUS_V2_BATCH_MODE} or {@link
Expand Down Expand Up @@ -210,6 +217,187 @@ public void testReplicaConsistencyAfterLeaderStop() throws Exception {
}
}

/**
* Test that DELETE TIMESERIES is properly replicated to all DataNode replicas via IoTConsensusV2.
*
* <p>This test reproduces the scenario from the historical deletion replication bug: when a
* timeseries is deleted after data insertion (with some unflushed data), the deletion event must
* be consistently replicated to all replicas. After waiting for replication to complete, stopping
* each DataNode in turn should show the same schema on all surviving nodes.
*
* <p>Scenario:
*
* <ol>
* <li>Insert data into root.sg.d1 with 3 measurements (speed, temperature, power), flush
* <li>Insert more data (unflushed to create WAL-only entries)
* <li>DELETE TIMESERIES root.sg.d1.speed
* <li>Flush again to persist deletion
* <li>Wait for replication to complete on all DataNodes
* <li>Verify that every DataNode independently shows the same timeseries (speed is gone)
* </ol>
*/
public void testDeleteTimeSeriesReplicaConsistency() throws Exception {
try (Connection connection = makeItCloseQuietly(EnvFactory.getEnv().getConnection());
Statement statement = makeItCloseQuietly(connection.createStatement())) {

// Step 1: Insert data with 3 measurements and flush
LOGGER.info(
"Step 1: Inserting data with 3 measurements and flushing (mode: {})...",
getIoTConsensusV2Mode());
statement.execute(INSERTION1);
statement.execute(INSERTION2);
statement.execute(FLUSH_COMMAND);

// Step 2: Insert more data without flush (creates WAL-only entries)
LOGGER.info("Step 2: Inserting more data without flush (WAL-only entries)...");
statement.execute(INSERTION3);

// Step 3: Delete one timeseries
LOGGER.info("Step 3: Deleting timeseries root.sg.d1.speed...");
statement.execute(DELETE_TIMESERIES_SPEED);

// Step 4: Flush again to persist the deletion
LOGGER.info("Step 4: Flushing to persist deletion...");
statement.execute(FLUSH_COMMAND);

// Verify on the current connection: speed should be gone, 2 timeseries remain
verifyTimeSeriesAfterDelete(statement, "via initial connection");

// Step 5: Wait for replication to complete on data region leaders
LOGGER.info("Step 5: Waiting for replication to complete on data region leaders...");
Map<Integer, Pair<Integer, Set<Integer>>> dataRegionMap =
getDataRegionMapWithLeader(statement);
Set<Integer> leaderNodeIds = new HashSet<>();
for (Pair<Integer, Set<Integer>> leaderAndReplicas : dataRegionMap.values()) {
if (leaderAndReplicas.getLeft() > 0) {
leaderNodeIds.add(leaderAndReplicas.getLeft());
}
}
for (int leaderNodeId : leaderNodeIds) {
EnvFactory.getEnv()
.dataNodeIdToWrapper(leaderNodeId)
.ifPresent(this::waitForReplicationComplete);
}

// Step 6: Verify schema consistency on each DataNode independently
LOGGER.info("Step 6: Verifying schema consistency on each DataNode independently...");
List<DataNodeWrapper> dataNodeWrappers = EnvFactory.getEnv().getDataNodeWrapperList();
for (DataNodeWrapper wrapper : dataNodeWrappers) {
String nodeDescription = "DataNode " + wrapper.getIp() + ":" + wrapper.getPort();
LOGGER.info("Verifying schema on {}", nodeDescription);
Awaitility.await()
.atMost(60, TimeUnit.SECONDS)
.untilAsserted(
() -> {
try (Connection nodeConn =
makeItCloseQuietly(
EnvFactory.getEnv()
.getConnection(
wrapper,
SessionConfig.DEFAULT_USER,
SessionConfig.DEFAULT_PASSWORD,
BaseEnv.TREE_SQL_DIALECT));
Statement nodeStmt = makeItCloseQuietly(nodeConn.createStatement())) {
verifyTimeSeriesAfterDelete(nodeStmt, nodeDescription);
}
});
}

// Step 7: Stop each DataNode one by one and verify remaining nodes still consistent
LOGGER.info(
"Step 7: Stopping each DataNode in turn and verifying remaining nodes show consistent schema...");
Copy link

Copilot AI Mar 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This log message line is likely to exceed the project's 100-character Checkstyle limit once indentation is included. Please wrap/split the string (or use multiple LOGGER.info calls) to keep each line within the limit.

Suggested change
"Step 7: Stopping each DataNode in turn and verifying remaining nodes show consistent schema...");
"Step 7: Stopping each DataNode in turn and verifying remaining nodes "
+ "show consistent schema...");

Copilot uses AI. Check for mistakes.
for (DataNodeWrapper stoppedNode : dataNodeWrappers) {
String stoppedDesc = "DataNode " + stoppedNode.getIp() + ":" + stoppedNode.getPort();
LOGGER.info("Stopping {}", stoppedDesc);
stoppedNode.stopForcibly();
Assert.assertFalse(stoppedDesc + " should be stopped", stoppedNode.isAlive());
Copy link

Copilot AI Mar 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After calling stopForcibly(), the test immediately asserts !isAlive(). AbstractNodeWrapper.stopForcibly() only waits up to 10s and ignores the return value, so the process may still be alive and this assertion can be flaky. Prefer awaiting the node to actually stop (e.g., Awaitility.until(() -> !stoppedNode.isAlive())) before proceeding.

Suggested change
Assert.assertFalse(stoppedDesc + " should be stopped", stoppedNode.isAlive());
Awaitility.await()
.atMost(60, TimeUnit.SECONDS)
.untilAsserted(
() ->
Assert.assertFalse(
stoppedDesc + " should be stopped", stoppedNode.isAlive()));

Copilot uses AI. Check for mistakes.

try {
// Verify schema on each surviving node
for (DataNodeWrapper aliveNode : dataNodeWrappers) {
if (aliveNode == stoppedNode) {
continue;
}
String aliveDesc = "DataNode " + aliveNode.getIp() + ":" + aliveNode.getPort();
Awaitility.await()
.pollDelay(1, TimeUnit.SECONDS)
.atMost(90, TimeUnit.SECONDS)
.untilAsserted(
() -> {
try (Connection aliveConn =
makeItCloseQuietly(
EnvFactory.getEnv()
.getConnection(
aliveNode,
SessionConfig.DEFAULT_USER,
SessionConfig.DEFAULT_PASSWORD,
BaseEnv.TREE_SQL_DIALECT));
Statement aliveStmt = makeItCloseQuietly(aliveConn.createStatement())) {
verifyTimeSeriesAfterDelete(
aliveStmt, aliveDesc + " (while " + stoppedDesc + " is down)");
}
});
}
} finally {
// Restart the stopped node before moving to the next iteration
LOGGER.info("Restarting {}", stoppedDesc);
stoppedNode.start();
// Wait for the restarted node to rejoin
Awaitility.await()
.atMost(120, TimeUnit.SECONDS)
.pollInterval(2, TimeUnit.SECONDS)
.until(stoppedNode::isAlive);
}
}

LOGGER.info(
"DELETE TIMESERIES replica consistency test passed for mode: {}",
getIoTConsensusV2Mode());
}
}

/**
* Verify that after deleting root.sg.d1.speed, only temperature and power timeseries remain, and
* that data queries do not return the deleted timeseries.
*/
private void verifyTimeSeriesAfterDelete(Statement statement, String context) throws Exception {
// Verify via SHOW TIMESERIES: speed should be gone, only temperature and power remain
Set<String> timeseries = new HashSet<>();
try (ResultSet resultSet = statement.executeQuery(SHOW_TIMESERIES_D1)) {
while (resultSet.next()) {
timeseries.add(resultSet.getString("Timeseries"));
}
}
LOGGER.info("[{}] SHOW TIMESERIES result: {}", context, timeseries);
Assert.assertEquals(
"[" + context + "] Expected exactly 2 timeseries after delete (temperature, power)",
2,
timeseries.size());
Assert.assertFalse(
"[" + context + "] root.sg.d1.speed should have been deleted",
timeseries.contains("root.sg.d1.speed"));
Assert.assertTrue(
"[" + context + "] root.sg.d1.temperature should still exist",
timeseries.contains("root.sg.d1.temperature"));
Assert.assertTrue(
"[" + context + "] root.sg.d1.power should still exist",
timeseries.contains("root.sg.d1.power"));

// Verify via SELECT: only temperature and power columns should return data
try (ResultSet selectResult = statement.executeQuery(SELECT_SURVIVING_QUERY)) {
int rowCount = 0;
while (selectResult.next()) {
rowCount++;
}
// After delete, remaining data depends on whether unflushed data for the deleted
// timeseries was also cleaned up. We mainly verify that the query doesn't fail
// and that some rows are returned for the surviving measurements.
Assert.assertTrue(
"[" + context + "] Expected at least 1 row from SELECT on surviving timeseries",
rowCount >= 1);
}
}

private static final Pattern SYNC_LAG_PATTERN =
Pattern.compile("iot_consensus_v2\\{[^}]*type=\"syncLag\"[^}]*}\\s+(\\S+)");

Expand Down Expand Up @@ -259,7 +447,7 @@ protected void verifyDataConsistency(Statement statement) throws Exception {
totalCount += parseLongFromString(countResult.getString(i));
}
Assert.assertEquals(
"Expected 6 total data points (3 timestamps x 2 measurements)", 6, totalCount);
"Expected 9 total data points (3 timestamps x 3 measurements)", 9, totalCount);
}

int rowCount = 0;
Expand All @@ -269,15 +457,19 @@ protected void verifyDataConsistency(Statement statement) throws Exception {
long timestamp = parseLongFromString(selectResult.getString(1));
long speed = parseLongFromString(selectResult.getString(2));
long temperature = parseLongFromString(selectResult.getString(3));
long power = parseLongFromString(selectResult.getString(4));
if (timestamp == 100) {
Assert.assertEquals(1, speed);
Assert.assertEquals(2, temperature);
Assert.assertEquals(3, power);
} else if (timestamp == 101) {
Assert.assertEquals(3, speed);
Assert.assertEquals(4, temperature);
Assert.assertEquals(4, speed);
Assert.assertEquals(5, temperature);
Assert.assertEquals(6, power);
} else if (timestamp == 102) {
Assert.assertEquals(5, speed);
Assert.assertEquals(6, temperature);
Assert.assertEquals(7, speed);
Assert.assertEquals(8, temperature);
Assert.assertEquals(9, power);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,10 @@ public void testReplicaConsistencyAfterLeaderStop() throws Exception {
public void test3C3DWriteFlushAndQuery() throws Exception {
super.test3C3DWriteFlushAndQuery();
}

@Override
@Test
public void testDeleteTimeSeriesReplicaConsistency() throws Exception {
super.testDeleteTimeSeriesReplicaConsistency();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,10 @@ public void testReplicaConsistencyAfterLeaderStop() throws Exception {
public void test3C3DWriteFlushAndQuery() throws Exception {
super.test3C3DWriteFlushAndQuery();
}

@Override
@Test
public void testDeleteTimeSeriesReplicaConsistency() throws Exception {
super.testDeleteTimeSeriesReplicaConsistency();
}
}
Loading