Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public class FileTxnSnapLog {
final File snapDir;
TxnLog txnLog;
SnapShot snapLog;
private volatile boolean closed;
private final boolean autoCreateDB;
private final boolean trustEmptySnapshot;
public static final int VERSION = 2;
Expand Down Expand Up @@ -173,7 +174,15 @@ public FileTxnSnapLog(File dataDir, File snapDir) throws IOException {
System.getProperty(ZOOKEEPER_DB_AUTOCREATE, ZOOKEEPER_DB_AUTOCREATE_DEFAULT));
}

private void checkNotClosed() {
if (closed) {
throw new IllegalStateException(
"FileTxnSnapLog has been closed. This may indicate a stale reference to a stopped server instance.");
}
}

public void setServerStats(ServerStats serverStats) {
checkNotClosed();
txnLog.setServerStats(serverStats);
}

Expand Down Expand Up @@ -226,6 +235,7 @@ public File getSnapDir() {
* @return info of last snapshot
*/
public SnapshotInfo getLastSnapshotInfo() {
checkNotClosed();
return this.snapLog.getLastSnapshotInfo();
}

Expand All @@ -250,6 +260,7 @@ public boolean shouldForceWriteInitialSnapshotAfterLeaderElection() {
* @throws IOException
*/
public long restore(DataTree dt, Map<Long, Integer> sessions, PlayBackListener listener) throws IOException {
checkNotClosed();
long snapLoadingStartTime = Time.currentElapsedTime();
long deserializeResult = snapLog.deserialize(dt, sessions);
ServerMetrics.getMetrics().STARTUP_SNAP_LOAD_TIME.add(Time.currentElapsedTime() - snapLoadingStartTime);
Expand Down Expand Up @@ -327,6 +338,7 @@ public long fastForwardFromEdits(
DataTree dt,
Map<Long, Integer> sessions,
PlayBackListener listener) throws IOException {
checkNotClosed();
TxnIterator itr = txnLog.read(dt.lastProcessedZxid + 1);
long highestZxid = dt.lastProcessedZxid;
TxnHeader hdr;
Expand Down Expand Up @@ -475,6 +487,7 @@ public File save(
DataTree dataTree,
ConcurrentHashMap<Long, Integer> sessionsWithTimeouts,
boolean syncSnap) throws IOException {
checkNotClosed();
long lastZxid = dataTree.lastProcessedZxid;
File snapshotFile = new File(snapDir, Util.makeSnapshotName(lastZxid));
LOG.info("Snapshotting: 0x{} to {}", Long.toHexString(lastZxid), snapshotFile);
Expand Down Expand Up @@ -511,9 +524,10 @@ public File save(
* @throws IOException
*/
public boolean truncateLog(long zxid) {
boolean reopen = !closed;
try {
// close the existing txnLog and snapLog
close();
closeResources();

// truncate it
try (FileTxnLog truncLog = new FileTxnLog(dataDir)) {
Expand All @@ -523,8 +537,10 @@ public boolean truncateLog(long zxid) {
// I'd rather just close/reopen this object itself, however that
// would have a big impact outside ZKDatabase as there are other
// objects holding a reference to this object.
txnLog = new FileTxnLog(dataDir);
snapLog = new FileSnap(snapDir);
if (reopen) {
txnLog = new FileTxnLog(dataDir);
snapLog = new FileSnap(snapDir);
}

return truncated;
}
Expand Down Expand Up @@ -589,6 +605,7 @@ public File[] getSnapshotLogs(long zxid) {
* @throws IOException
*/
public boolean append(Request si) throws IOException {
checkNotClosed();
return txnLog.append(si);
}

Expand All @@ -597,6 +614,7 @@ public boolean append(Request si) throws IOException {
* @throws IOException
*/
public void commit() throws IOException {
checkNotClosed();
txnLog.commit();
}

Expand All @@ -605,6 +623,7 @@ public void commit() throws IOException {
* @return elapsed sync time of transaction log commit in milliseconds
*/
public long getTxnLogElapsedSyncTime() {
checkNotClosed();
return txnLog.getTxnLogSyncElapsedTime();
}

Expand All @@ -613,6 +632,7 @@ public long getTxnLogElapsedSyncTime() {
* @throws IOException
*/
public void rollLog() throws IOException {
checkNotClosed();
txnLog.rollLog();
}

Expand All @@ -621,6 +641,14 @@ public void rollLog() throws IOException {
* @throws IOException
*/
public void close() throws IOException {
if (closed) {
return;
}
closed = true;
closeResources();
}

private void closeResources() throws IOException {
TxnLog txnLogToClose = txnLog;
if (txnLogToClose != null) {
txnLogToClose.close();
Expand Down Expand Up @@ -664,10 +692,12 @@ public SnapDirContentCheckException(String msg) {
}

public void setTotalLogSize(long size) {
checkNotClosed();
txnLog.setTotalLogSize(size);
}

public long getTotalLogSize() {
checkNotClosed();
return txnLog.getTotalLogSize();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,27 @@ public void testEmptySnapshotSerialization() throws IOException {
assertNull(dataTree.getDigestFromLoadedSnapshot());
}

@Test
public void testSaveAfterCloseThrowsIllegalStateException() throws IOException {
FileTxnSnapLog snaplog = new FileTxnSnapLog(tmpDir, tmpDir);
DataTree dataTree = new DataTree();
ConcurrentHashMap<Long, Integer> sessions = new ConcurrentHashMap<>();

snaplog.close();

IllegalStateException error = assertThrows(IllegalStateException.class,
() -> snaplog.save(dataTree, sessions, true));
assertTrue(error.getMessage().contains("FileTxnSnapLog has been closed"));
}

@Test
public void testCloseIsIdempotent() throws IOException {
FileTxnSnapLog snaplog = new FileTxnSnapLog(tmpDir, tmpDir);

snaplog.close();
snaplog.close();
}

@Test
public void testSnapshotSerializationCompatibility() throws IOException {
testSnapshotSerializationCompatibility(true, false);
Expand Down