Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file removed .idea/icon.png
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public interface IWALBuffer extends AutoCloseable {
*
* @throws InterruptedException when interrupted by the flush thread
*/
void waitForFlush() throws InterruptedException;
void waitForRollFile() throws InterruptedException;

/**
* Wait for next flush operation done, if the predicate == true after entering a locked
Expand All @@ -60,14 +60,14 @@ public interface IWALBuffer extends AutoCloseable {
* @param waitPredicate the condition which should be satisfied before waiting.
* @throws InterruptedException when interrupted by the flush thread
*/
public void waitForFlush(Predicate<WALBuffer> waitPredicate) throws InterruptedException;
public void waitForRollFile(Predicate<WALBuffer> waitPredicate) throws InterruptedException;

/**
* Wait for next flush operation done.
*
* @throws InterruptedException when interrupted by the flush thread
*/
boolean waitForFlush(long time, TimeUnit unit) throws InterruptedException;
boolean waitForRollFile(long time, TimeUnit unit) throws InterruptedException;

/** Return true when all wal entries all consumed and flushed. */
boolean isAllWALEntriesConsumed();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
private final Lock buffersLock = new ReentrantLock();
// condition to guarantee correctness of switching buffers
private final Condition idleBufferReadyCondition = buffersLock.newCondition();
private final Condition rollLogWriterCondition = buffersLock.newCondition();
// last writer position when fsync is called, help record each entry's position
private long lastFsyncPosition;

Expand Down Expand Up @@ -168,8 +169,15 @@

@Override
protected File rollLogWriter(long searchIndex, WALFileStatus fileStatus) throws IOException {
File file = super.rollLogWriter(searchIndex, fileStatus);

Check warning on line 172 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALBuffer.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Distance between variable 'file' declaration and its first usage is 4, but allowed 3. Consider making that variable final if you still need to store its value in advance (before method calls that might have side effects on the original value).

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ4MWO9DJxyiBuPnoZZo&open=AZ4MWO9DJxyiBuPnoZZo&pullRequest=17628
currentWALFileWriter.setCompressedByteBuffer(compressedByteBuffer);
buffersLock.lock();
try {
// notify WALReader that new file is generated, and it can read new file
rollLogWriterCondition.signalAll();
} finally {
buffersLock.unlock();
}
return file;
}

Expand Down Expand Up @@ -656,7 +664,7 @@
}

@Override
public void waitForFlush() throws InterruptedException {
public void waitForRollFile() throws InterruptedException {
buffersLock.lock();
try {
idleBufferReadyCondition.await();
Expand All @@ -666,22 +674,22 @@
}

@Override
public void waitForFlush(Predicate<WALBuffer> waitPredicate) throws InterruptedException {
public void waitForRollFile(Predicate<WALBuffer> waitPredicate) throws InterruptedException {
buffersLock.lock();
try {
if (waitPredicate.test(this)) {
idleBufferReadyCondition.await();
rollLogWriterCondition.await();

Check failure on line 681 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALBuffer.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove this call to "await" or move it into a "while" loop.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ4MWO9DJxyiBuPnoZZm&open=AZ4MWO9DJxyiBuPnoZZm&pullRequest=17628
}
} finally {
buffersLock.unlock();
}
}

@Override
public boolean waitForFlush(long time, TimeUnit unit) throws InterruptedException {
public boolean waitForRollFile(long time, TimeUnit unit) throws InterruptedException {
buffersLock.lock();
try {
return idleBufferReadyCondition.await(time, unit);
return rollLogWriterCondition.await(time, unit);

Check failure on line 692 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALBuffer.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove this call to "await" or move it into a "while" loop.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ4MWO9DJxyiBuPnoZZn&open=AZ4MWO9DJxyiBuPnoZZn&pullRequest=17628
} finally {
buffersLock.unlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@
// no iot consensus, all insert nodes can be safely deleted
public static final long DEFAULT_SAFELY_DELETED_SEARCH_INDEX = Long.MAX_VALUE;
// timeout threshold when waiting for next wal entry
private static final long WAIT_FOR_NEXT_WAL_ENTRY_TIMEOUT_IN_SEC = 30;
public static final long WAIT_FOR_NEXT_WAL_ENTRY_TIMEOUT_IN_SEC = 30;
private static final WritingMetrics WRITING_METRICS = WritingMetrics.getInstance();

// unique identifier of this WALNode
Expand Down Expand Up @@ -617,7 +617,7 @@
}

@Override
public boolean hasNext() {

Check warning on line 620 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

A "Brain Method" was detected. Refactor it to reduce at least one of the following metrics: LOC from 122 to 64, Complexity from 24 to 14, Nesting Level from 6 to 2, Number of Variables from 12 to 6.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ4MWPOPJxyiBuPnoZZp&open=AZ4MWPOPJxyiBuPnoZZp&pullRequest=17628
if (itr != null && itr.hasNext()) {
return true;
}
Expand Down Expand Up @@ -792,7 +792,7 @@
while (!hasNext()) {
if (!walFileRolled) {
boolean timeout =
!buffer.waitForFlush(WAIT_FOR_NEXT_WAL_ENTRY_TIMEOUT_IN_SEC, TimeUnit.SECONDS);
!buffer.waitForRollFile(WAIT_FOR_NEXT_WAL_ENTRY_TIMEOUT_IN_SEC, TimeUnit.SECONDS);
if (timeout) {
bufferLastSearchIndex = buffer.getCurrentSearchIndex();
logger.info(
Expand All @@ -805,7 +805,7 @@
} else {
// only wait when the search index of the buffer remains the same as the previous check
long finalBufferLastSearchIndex = bufferLastSearchIndex;
buffer.waitForFlush(buf -> buf.getCurrentSearchIndex() == finalBufferLastSearchIndex);
buffer.waitForRollFile(buf -> buf.getCurrentSearchIndex() == finalBufferLastSearchIndex);
}
}
}
Expand All @@ -814,8 +814,8 @@
public void waitForNextReady(long time, TimeUnit unit)
throws InterruptedException, TimeoutException {
if (!hasNext()) {
boolean timeout = !buffer.waitForFlush(time, unit);
if (timeout || !hasNext()) {
boolean timeout = !buffer.waitForRollFile(time, unit);
if (timeout && !hasNext()) {
throw new TimeoutException();
}
}
Expand Down
Loading
Loading