-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Add IT for DELETE TIMESERIES replica consistency under IoTConsensusV2 #17332
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -37,6 +37,8 @@ | |||||||||||||||
| import java.sql.ResultSet; | ||||||||||||||||
| import java.sql.Statement; | ||||||||||||||||
| import java.util.Collections; | ||||||||||||||||
| import java.util.HashSet; | ||||||||||||||||
| import java.util.List; | ||||||||||||||||
| import java.util.Map; | ||||||||||||||||
| import java.util.Set; | ||||||||||||||||
| import java.util.concurrent.TimeUnit; | ||||||||||||||||
|
|
@@ -69,14 +71,19 @@ public abstract class IoTDBIoTConsensusV23C3DBasicITBase | |||||||||||||||
| protected static final int CLUSTER_INIT_TIMEOUT_SECONDS = 300; | ||||||||||||||||
|
|
||||||||||||||||
| protected static final String INSERTION1 = | ||||||||||||||||
| "INSERT INTO root.sg.d1(timestamp,speed,temperature) values(100, 1, 2)"; | ||||||||||||||||
| "INSERT INTO root.sg.d1(timestamp, speed, temperature, power) VALUES (100, 1, 2, 3)"; | ||||||||||||||||
| protected static final String INSERTION2 = | ||||||||||||||||
| "INSERT INTO root.sg.d1(timestamp,speed,temperature) values(101, 3, 4)"; | ||||||||||||||||
| "INSERT INTO root.sg.d1(timestamp, speed, temperature, power) VALUES (101, 4, 5, 6)"; | ||||||||||||||||
| protected static final String INSERTION3 = | ||||||||||||||||
| "INSERT INTO root.sg.d1(timestamp,speed,temperature) values(102, 5, 6)"; | ||||||||||||||||
| "INSERT INTO root.sg.d1(timestamp, speed, temperature, power) VALUES (102, 7, 8, 9)"; | ||||||||||||||||
| protected static final String FLUSH_COMMAND = "flush on cluster"; | ||||||||||||||||
| protected static final String COUNT_QUERY = "select count(*) from root.sg.**"; | ||||||||||||||||
| protected static final String SELECT_ALL_QUERY = "select speed, temperature from root.sg.d1"; | ||||||||||||||||
| protected static final String SELECT_ALL_QUERY = | ||||||||||||||||
| "select speed, temperature, power from root.sg.d1"; | ||||||||||||||||
| protected static final String DELETE_TIMESERIES_SPEED = "DELETE TIMESERIES root.sg.d1.speed"; | ||||||||||||||||
| protected static final String SHOW_TIMESERIES_D1 = "SHOW TIMESERIES root.sg.d1.*"; | ||||||||||||||||
| protected static final String SELECT_SURVIVING_QUERY = | ||||||||||||||||
| "SELECT temperature, power FROM root.sg.d1"; | ||||||||||||||||
|
|
||||||||||||||||
| /** | ||||||||||||||||
| * Returns IoTConsensusV2 mode: {@link ConsensusFactory#IOT_CONSENSUS_V2_BATCH_MODE} or {@link | ||||||||||||||||
|
|
@@ -210,6 +217,187 @@ public void testReplicaConsistencyAfterLeaderStop() throws Exception { | |||||||||||||||
| } | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| /** | ||||||||||||||||
| * Test that DELETE TIMESERIES is properly replicated to all DataNode replicas via IoTConsensusV2. | ||||||||||||||||
| * | ||||||||||||||||
| * <p>This test reproduces the scenario from the historical deletion replication bug: when a | ||||||||||||||||
| * timeseries is deleted after data insertion (with some unflushed data), the deletion event must | ||||||||||||||||
| * be consistently replicated to all replicas. After waiting for replication to complete, stopping | ||||||||||||||||
| * each DataNode in turn should show the same schema on all surviving nodes. | ||||||||||||||||
| * | ||||||||||||||||
| * <p>Scenario: | ||||||||||||||||
| * | ||||||||||||||||
| * <ol> | ||||||||||||||||
| * <li>Insert data into root.sg.d1 with 3 measurements (speed, temperature, power), flush | ||||||||||||||||
| * <li>Insert more data (unflushed to create WAL-only entries) | ||||||||||||||||
| * <li>DELETE TIMESERIES root.sg.d1.speed | ||||||||||||||||
| * <li>Flush again to persist deletion | ||||||||||||||||
| * <li>Wait for replication to complete on all DataNodes | ||||||||||||||||
| * <li>Verify that every DataNode independently shows the same timeseries (speed is gone) | ||||||||||||||||
| * </ol> | ||||||||||||||||
| */ | ||||||||||||||||
| public void testDeleteTimeSeriesReplicaConsistency() throws Exception { | ||||||||||||||||
| try (Connection connection = makeItCloseQuietly(EnvFactory.getEnv().getConnection()); | ||||||||||||||||
| Statement statement = makeItCloseQuietly(connection.createStatement())) { | ||||||||||||||||
|
|
||||||||||||||||
| // Step 1: Insert data with 3 measurements and flush | ||||||||||||||||
| LOGGER.info( | ||||||||||||||||
| "Step 1: Inserting data with 3 measurements and flushing (mode: {})...", | ||||||||||||||||
| getIoTConsensusV2Mode()); | ||||||||||||||||
| statement.execute(INSERTION1); | ||||||||||||||||
| statement.execute(INSERTION2); | ||||||||||||||||
| statement.execute(FLUSH_COMMAND); | ||||||||||||||||
|
|
||||||||||||||||
| // Step 2: Insert more data without flush (creates WAL-only entries) | ||||||||||||||||
| LOGGER.info("Step 2: Inserting more data without flush (WAL-only entries)..."); | ||||||||||||||||
| statement.execute(INSERTION3); | ||||||||||||||||
|
|
||||||||||||||||
| // Step 3: Delete one timeseries | ||||||||||||||||
| LOGGER.info("Step 3: Deleting timeseries root.sg.d1.speed..."); | ||||||||||||||||
| statement.execute(DELETE_TIMESERIES_SPEED); | ||||||||||||||||
|
|
||||||||||||||||
| // Step 4: Flush again to persist the deletion | ||||||||||||||||
| LOGGER.info("Step 4: Flushing to persist deletion..."); | ||||||||||||||||
| statement.execute(FLUSH_COMMAND); | ||||||||||||||||
|
|
||||||||||||||||
| // Verify on the current connection: speed should be gone, 2 timeseries remain | ||||||||||||||||
| verifyTimeSeriesAfterDelete(statement, "via initial connection"); | ||||||||||||||||
|
|
||||||||||||||||
| // Step 5: Wait for replication to complete on data region leaders | ||||||||||||||||
| LOGGER.info("Step 5: Waiting for replication to complete on data region leaders..."); | ||||||||||||||||
| Map<Integer, Pair<Integer, Set<Integer>>> dataRegionMap = | ||||||||||||||||
| getDataRegionMapWithLeader(statement); | ||||||||||||||||
| Set<Integer> leaderNodeIds = new HashSet<>(); | ||||||||||||||||
| for (Pair<Integer, Set<Integer>> leaderAndReplicas : dataRegionMap.values()) { | ||||||||||||||||
| if (leaderAndReplicas.getLeft() > 0) { | ||||||||||||||||
| leaderNodeIds.add(leaderAndReplicas.getLeft()); | ||||||||||||||||
| } | ||||||||||||||||
| } | ||||||||||||||||
| for (int leaderNodeId : leaderNodeIds) { | ||||||||||||||||
| EnvFactory.getEnv() | ||||||||||||||||
| .dataNodeIdToWrapper(leaderNodeId) | ||||||||||||||||
| .ifPresent(this::waitForReplicationComplete); | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| // Step 6: Verify schema consistency on each DataNode independently | ||||||||||||||||
| LOGGER.info("Step 6: Verifying schema consistency on each DataNode independently..."); | ||||||||||||||||
| List<DataNodeWrapper> dataNodeWrappers = EnvFactory.getEnv().getDataNodeWrapperList(); | ||||||||||||||||
| for (DataNodeWrapper wrapper : dataNodeWrappers) { | ||||||||||||||||
| String nodeDescription = "DataNode " + wrapper.getIp() + ":" + wrapper.getPort(); | ||||||||||||||||
| LOGGER.info("Verifying schema on {}", nodeDescription); | ||||||||||||||||
| Awaitility.await() | ||||||||||||||||
| .atMost(60, TimeUnit.SECONDS) | ||||||||||||||||
| .untilAsserted( | ||||||||||||||||
| () -> { | ||||||||||||||||
| try (Connection nodeConn = | ||||||||||||||||
| makeItCloseQuietly( | ||||||||||||||||
| EnvFactory.getEnv() | ||||||||||||||||
| .getConnection( | ||||||||||||||||
| wrapper, | ||||||||||||||||
| SessionConfig.DEFAULT_USER, | ||||||||||||||||
| SessionConfig.DEFAULT_PASSWORD, | ||||||||||||||||
| BaseEnv.TREE_SQL_DIALECT)); | ||||||||||||||||
| Statement nodeStmt = makeItCloseQuietly(nodeConn.createStatement())) { | ||||||||||||||||
| verifyTimeSeriesAfterDelete(nodeStmt, nodeDescription); | ||||||||||||||||
| } | ||||||||||||||||
| }); | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| // Step 7: Stop each DataNode one by one and verify remaining nodes still consistent | ||||||||||||||||
| LOGGER.info( | ||||||||||||||||
| "Step 7: Stopping each DataNode in turn and verifying remaining nodes show consistent schema..."); | ||||||||||||||||
| for (DataNodeWrapper stoppedNode : dataNodeWrappers) { | ||||||||||||||||
| String stoppedDesc = "DataNode " + stoppedNode.getIp() + ":" + stoppedNode.getPort(); | ||||||||||||||||
| LOGGER.info("Stopping {}", stoppedDesc); | ||||||||||||||||
| stoppedNode.stopForcibly(); | ||||||||||||||||
| Assert.assertFalse(stoppedDesc + " should be stopped", stoppedNode.isAlive()); | ||||||||||||||||
|
||||||||||||||||
| Assert.assertFalse(stoppedDesc + " should be stopped", stoppedNode.isAlive()); | |
| Awaitility.await() | |
| .atMost(60, TimeUnit.SECONDS) | |
| .untilAsserted( | |
| () -> | |
| Assert.assertFalse( | |
| stoppedDesc + " should be stopped", stoppedNode.isAlive())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This log message line is likely to exceed the project's 100-character Checkstyle limit once indentation is included. Please wrap/split the string (or use multiple LOGGER.info calls) to keep each line within the limit.