Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
Expand All @@ -54,7 +55,9 @@
import org.apache.zookeeper.server.Request;
import org.apache.zookeeper.server.ServerCnxn;
import org.apache.zookeeper.server.ServerMetrics;
import org.apache.zookeeper.server.SessionTracker;
import org.apache.zookeeper.server.TxnLogEntry;
import org.apache.zookeeper.server.ZKDatabase;
import org.apache.zookeeper.server.ZooTrace;
import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
Expand Down Expand Up @@ -579,6 +582,7 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception {
boolean snapshotNeeded = true;
boolean syncSnapshot = false;
readPacket(qp);
boolean diffSync = qp.getType() == Leader.DIFF;
Deque<Long> packetsCommitted = new ArrayDeque<>();
Deque<PacketInFlight> packetsNotLogged = new ArrayDeque<>();

Expand Down Expand Up @@ -633,6 +637,10 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception {
}
zk.getZKDatabase().initConfigInZKDatabase(self.getQuorumVerifier());
zk.createSessionTracker();
// DIFF keeps the local tree; clear ephemerals without sessions before applying new transactions.
if (diffSync) {
purgeOrphanedEphemerals();
}

// TODO: Ideally, this should be lastProcessZxid(a.k.a. QuorumPacket::zxid from above), but currently
// LearnerHandler does not guarantee this. So, let's be conservative and keep it unchange for now.
Expand Down Expand Up @@ -869,6 +877,43 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception {
// New server type need to handle in-flight packets
throw new UnsupportedOperationException("Unknown server type");
}

}

void purgeOrphanedEphemerals() {
if (zk == null) {
return;
}
SessionTracker sessionTracker = zk.getSessionTracker();
if (sessionTracker == null) {
return;
}
ZKDatabase zkDatabase = zk.getZKDatabase();
if (zkDatabase == null) {
return;
}

Set<Long> globalSessions = sessionTracker.globalSessions();
Set<Long> localSessions = sessionTracker.localSessions();
Set<Long> sessionsWithEphemerals = new HashSet<>(zkDatabase.getSessions());
if (sessionsWithEphemerals.isEmpty()) {
return;
}

long zxid = zkDatabase.getDataTreeLastProcessedZxid();
for (Long sessionId : sessionsWithEphemerals) {
if (globalSessions.contains(sessionId)
|| localSessions.contains(sessionId)
|| (sessionTracker instanceof UpgradeableSessionTracker
&& ((UpgradeableSessionTracker) sessionTracker).isUpgradingSession(sessionId))) {
continue;
}
LOG.warn(
"Removing ephemeral nodes for unknown session 0x{} after DIFF sync",
Long.toHexString(sessionId));
zkDatabase.killSession(sessionId, zxid);
sessionTracker.removeSession(sessionId);
}
}

protected void revalidate(QuorumPacket qp) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
Expand All @@ -50,6 +52,7 @@
import org.apache.zookeeper.server.ExitCode;
import org.apache.zookeeper.server.ZKDatabase;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import org.apache.zookeeper.test.TestUtils;
import org.apache.zookeeper.txn.CreateTxn;
import org.apache.zookeeper.txn.TxnHeader;
import org.apache.zookeeper.util.ServiceUtils;
Expand Down Expand Up @@ -313,6 +316,39 @@ public void syncTest(@TempDir File tmpDir) throws Exception {
assertEquals(startZxid, sl.zk.getLastProcessedZxid());
}

@Test
public void testPurgeOrphanedEphemerals() throws Exception {
File tmpFile = File.createTempFile("test", ".dir", testData);
tmpFile.delete();
SimpleLearner sl = null;
try {
FileTxnSnapLog ftsl = new FileTxnSnapLog(tmpFile, tmpFile);
sl = new SimpleLearner(ftsl);

long sessionId = 0x1234L;
TxnHeader hdr = new TxnHeader(sessionId, 1, 1L, 1L, ZooDefs.OpCode.create);
CreateTxn txn = new CreateTxn(
"/eph",
new byte[0],
new ArrayList<ACL>(),
true,
sl.zk.getZKDatabase().getNode("/").stat.getCversion());
sl.zk.getZKDatabase().processTxn(hdr, txn, null);

assertNotNull(sl.zk.getZKDatabase().getNode("/eph"), "Ephemeral node should exist before cleanup");

sl.zk.createSessionTracker();
sl.purgeOrphanedEphemerals();

assertNull(sl.zk.getZKDatabase().getNode("/eph"), "Ephemeral node should be removed for unknown session");
} finally {
if (sl != null) {
sl.zk.shutdown();
}
TestUtils.deleteFileRecursively(tmpFile);
}
}

@Test
public void truncFailTest(@TempDir File tmpDir) throws Exception {
final boolean[] exitProcCalled = {false};
Expand Down