Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1285,6 +1285,10 @@ public static int incrementAndGetInt(long ptr) {
return UNSAFE.getAndAddInt(null, ptr, 1) + 1;
}

public static int getAndIncrementInt(long ptr) {
return UNSAFE.getAndAddInt(null, ptr, 1);
}

/**
* Atomically increments value stored in an integer pointed by {@code ptr}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@
import static org.apache.ignite.internal.util.GridUnsafe.getInt;
import static org.apache.ignite.internal.util.GridUnsafe.getIntVolatile;
import static org.apache.ignite.internal.util.GridUnsafe.getLong;
import static org.apache.ignite.internal.util.GridUnsafe.incrementAndGetInt;
import static org.apache.ignite.internal.util.GridUnsafe.putInt;
import static org.apache.ignite.internal.util.GridUnsafe.putIntVolatile;
import static org.apache.ignite.internal.util.GridUnsafe.putLong;
import static org.apache.ignite.internal.util.GridUnsafe.putLongVolatile;

import org.apache.ignite.internal.pagememory.FullPageId;
import org.apache.ignite.internal.util.GridUnsafe;

/**
* Helper class for working with the page header that is stored in memory for {@link PersistentPageMemory}.
Expand Down Expand Up @@ -198,10 +198,10 @@ public static boolean isAcquired(long absPtr) {
* Acquires a page.
*
* @param absPtr Absolute pointer.
* @return Number of acquires for the page.
* @return Previous number of acquires for the page.
*/
public static int acquirePage(long absPtr) {
return incrementAndGetInt(absPtr + PAGE_PIN_CNT_OFFSET);
return GridUnsafe.getAndIncrementInt(absPtr + PAGE_PIN_CNT_OFFSET);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,7 @@
import static org.apache.ignite.internal.util.GridUnsafe.BYTE_ARR_OFF;
import static org.apache.ignite.internal.util.GridUnsafe.bufferAddress;
import static org.apache.ignite.internal.util.GridUnsafe.copyMemory;
import static org.apache.ignite.internal.util.GridUnsafe.decrementAndGetInt;
import static org.apache.ignite.internal.util.GridUnsafe.getInt;
import static org.apache.ignite.internal.util.GridUnsafe.getLong;
import static org.apache.ignite.internal.util.GridUnsafe.incrementAndGetInt;
import static org.apache.ignite.internal.util.GridUnsafe.putIntVolatile;
import static org.apache.ignite.internal.util.GridUnsafe.wrapPointer;
import static org.apache.ignite.internal.util.GridUnsafe.zeroMemory;
import static org.apache.ignite.internal.util.IgniteUtils.hash;
Expand All @@ -77,6 +73,7 @@
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.IntStream;
import java.util.stream.Stream;
Expand Down Expand Up @@ -1029,28 +1026,6 @@ public long acquiredPages() {
return total;
}

/**
* Returns {@code true} if the page is contained in the loaded pages table, {@code false} otherwise.
*
* @param fullPageId Full page ID to check.
*/
public boolean hasLoadedPage(FullPageId fullPageId) {
int grpId = fullPageId.groupId();
long pageId = fullPageId.effectivePageId();

Segment seg = segment(grpId, pageId);

seg.readLock().lock();

try {
long res = seg.loadedPages.get(grpId, pageId, partGeneration(seg, fullPageId), INVALID_REL_PTR, INVALID_REL_PTR);

return res != INVALID_REL_PTR;
} finally {
seg.readLock().unlock();
}
}

/** {@inheritDoc} */
@Override
public long readLockForce(int grpId, long pageId, long page) {
Expand Down Expand Up @@ -1107,10 +1082,12 @@ private long writeLockPage(long absPtr, FullPageId fullId, boolean checkTag) {
private long postWriteLockPage(long absPtr, FullPageId fullId) {
timestamp(absPtr, coarseCurrentTimeMillis());

Segment seg = segment(fullId.groupId(), fullId.pageId());

DirtyFullPageId dirtyFullId = dirtyFullPageId(absPtr);

// Create a buffer copy if the page is scheduled for a checkpoint.
if (isInCheckpoint(dirtyFullId) && tempBufferPointer(absPtr) == INVALID_REL_PTR) {
if (isInCheckpoint(seg, dirtyFullId) && tempBufferPointer(absPtr) == INVALID_REL_PTR) {
long tmpRelPtr;

while (true) {
Expand All @@ -1129,7 +1106,7 @@ private long postWriteLockPage(long absPtr, FullPageId fullId) {
}

// Pin the page until checkpoint is not finished.
PageHeader.acquirePage(absPtr);
seg.acquirePage(absPtr);

long tmpAbsPtr = checkpointPool.absolute(tmpRelPtr);

Expand Down Expand Up @@ -1228,19 +1205,6 @@ boolean isPageReadLocked(long absPtr) {
return rwLock.isReadLocked(absPtr + PAGE_LOCK_OFFSET);
}

/**
* Returns the number of active pages across all segments. Used for test purposes only.
*/
public int activePagesCount() {
int total = 0;

for (Segment seg : segments) {
total += seg.acquiredPages();
}

return total;
}

/**
* This method must be called in synchronized context.
*
Expand Down Expand Up @@ -1345,17 +1309,11 @@ public class Segment extends ReentrantReadWriteLock {
/** Serial version uid. */
private static final long serialVersionUID = 0L;

/** Pointer to acquired pages integer counter. */
private static final int ACQUIRED_PAGES_SIZEOF = 4;

/** Padding to read from word beginning. */
private static final int ACQUIRED_PAGES_PADDING = 4;

/** Page ID to relative pointer map. */
private final LoadedPagesMap loadedPages;

/** Pointer to acquired pages integer counter. */
private final long acquiredPagesPtr;
/** Acquired pages counter. */
private final LongAdder acquiredPages = new LongAdder();

/** Page pool. */
private final PagePool pool;
Expand Down Expand Up @@ -1409,29 +1367,23 @@ private Segment(int idx, DirectMemoryRegion region) {

int pages = (int) (totalMemory / sysPageSize);

acquiredPagesPtr = region.address();

putIntVolatile(null, acquiredPagesPtr, 0);

int ldPagesMapOffInRegion = ACQUIRED_PAGES_SIZEOF + ACQUIRED_PAGES_PADDING;

long ldPagesAddr = region.address() + ldPagesMapOffInRegion;
long ldPagesAddr = region.address();

memPerTbl = RobinHoodBackwardShiftHashMap.requiredMemory(pages);

loadedPages = new RobinHoodBackwardShiftHashMap(ldPagesAddr, memPerTbl);

pages = (int) ((totalMemory - memPerTbl - ldPagesMapOffInRegion) / sysPageSize);
pages = (int) ((totalMemory - memPerTbl) / sysPageSize);

memPerRepl = pageReplacementPolicyFactory.requiredMemory(pages);

DirectMemoryRegion poolRegion = region.slice(memPerTbl + memPerRepl + ldPagesMapOffInRegion);
DirectMemoryRegion poolRegion = region.slice(memPerTbl + memPerRepl);

pool = new PagePool(idx, poolRegion, sysPageSize, rwLock);

pageReplacementPolicy = pageReplacementPolicyFactory.create(
this,
region.address() + memPerTbl + ldPagesMapOffInRegion,
region.address() + memPerTbl,
pool.pages()
);

Expand Down Expand Up @@ -1478,23 +1430,27 @@ private long replacementSize() {
return memPerRepl;
}

private void acquirePage(long absPtr) {
PageHeader.acquirePage(absPtr);
protected void acquirePage(long absPtr) {
int oldPinCount = PageHeader.acquirePage(absPtr);

incrementAndGetInt(acquiredPagesPtr);
if (oldPinCount == 0) {
acquiredPages.increment();
}
}

private void releasePage(long absPtr) {
PageHeader.releasePage(absPtr);
protected void releasePage(long absPtr) {
int newPinCount = PageHeader.releasePage(absPtr);

decrementAndGetInt(acquiredPagesPtr);
if (newPinCount == 0) {
acquiredPages.decrement();
}
}

/**
* Returns total number of acquired pages.
*/
private int acquiredPages() {
return getInt(acquiredPagesPtr);
return acquiredPages.intValue();
}

/**
Expand Down Expand Up @@ -1616,7 +1572,7 @@ public long refreshOutdatedPage(int grpId, long pageId, boolean rmv) {
tempBufferPointer(absPtr, INVALID_REL_PTR);

// We pinned the page when allocated the temp buffer, release it now.
PageHeader.releasePage(absPtr);
releasePage(absPtr);

releaseCheckpointBufferPage(tmpBufPtr);
}
Expand Down Expand Up @@ -1854,6 +1810,10 @@ private void releaseCheckpointBufferPage(long tmpBufPtr) {
private boolean isInCheckpoint(DirtyFullPageId pageId) {
Segment seg = segment(pageId.groupId(), pageId.pageId());

return isInCheckpoint(seg, pageId);
}

private static boolean isInCheckpoint(Segment seg, DirtyFullPageId pageId) {
CheckpointPages pages0 = seg.checkpointPages;

return pages0 != null && pages0.contains(pageId);
Expand All @@ -1877,6 +1837,7 @@ private boolean removeOnCheckpoint(DirtyFullPageId fullPageId) {
/**
* Makes a full copy of the dirty page for checkpointing, then marks the page as not dirty.
*
* @param seg Segment.
* @param absPtr Absolute page pointer.
* @param fullId Full page ID.
* @param buf Buffer for copy page content for future write via {@link PageStoreWriter}.
Expand All @@ -1889,6 +1850,7 @@ private boolean removeOnCheckpoint(DirtyFullPageId fullPageId) {
* <b>write lock</b> on page.
*/
private void copyPageForCheckpoint(
Segment seg,
long absPtr,
DirtyFullPageId fullId,
ByteBuffer buf,
Expand All @@ -1899,19 +1861,19 @@ private void copyPageForCheckpoint(
boolean useTryWriteLockOnPage
) throws IgniteInternalCheckedException {
assert absPtr != 0 : fullId.pageId();
assert isAcquired(absPtr) || !isInCheckpoint(fullId) : fullId.pageId();
assert isAcquired(absPtr) || !isInCheckpoint(seg, fullId) : fullId.pageId();

if (useTryWriteLockOnPage) {
if (!rwLock.tryWriteLock(absPtr + PAGE_LOCK_OFFSET, TAG_LOCK_ALWAYS)) {
// We release the page only once here because this page will be copied sometime later and
// will be released properly then.
if (!pageSingleAcquire) {
PageHeader.releasePage(absPtr);
seg.releasePage(absPtr);
}

buf.clear();

if (isInCheckpoint(fullId)) {
if (isInCheckpoint(seg, fullId)) {
pageStoreWriter.writePage(fullId, buf, TRY_AGAIN_TAG);
}

Expand All @@ -1927,7 +1889,7 @@ private void copyPageForCheckpoint(
rwLock.writeUnlock(absPtr + PAGE_LOCK_OFFSET, TAG_LOCK_ALWAYS);

if (!pageSingleAcquire) {
PageHeader.releasePage(absPtr);
seg.releasePage(absPtr);
}

return;
Expand Down Expand Up @@ -1958,7 +1920,7 @@ private void copyPageForCheckpoint(
// Need release again because we pin page when resolve abs pointer,
// and page did not have tmp buffer page.
if (!pageSingleAcquire) {
PageHeader.releasePage(absPtr);
seg.releasePage(absPtr);
}
} else {
copyInBuffer(absPtr, buf);
Expand All @@ -1985,7 +1947,7 @@ private void copyPageForCheckpoint(

// We pinned the page either when allocated the temp buffer, or when resolved abs pointer.
// Must release the page only after write unlock.
PageHeader.releasePage(absPtr);
seg.releasePage(absPtr);
}
}

Expand Down Expand Up @@ -2024,7 +1986,7 @@ public void checkpointWritePage(
seg.readLock().lock();

try {
if (!isInCheckpoint(fullId)) {
if (!isInCheckpoint(seg, fullId)) {
return;
}

Expand All @@ -2046,7 +2008,7 @@ public void checkpointWritePage(

// Pin the page until page will not be copied. This helpful to prevent page replacement of this page.
if (tempBufferPointer(absPtr) == INVALID_REL_PTR) {
PageHeader.acquirePage(absPtr);
seg.acquirePage(absPtr);
} else {
pageSingleAcquire = true;
}
Expand Down Expand Up @@ -2085,6 +2047,7 @@ public void checkpointWritePage(
}

copyPageForCheckpoint(
seg,
absPtr,
fullId,
buf,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ void testReadWriteFullPageId() {

@Test
void testAcquireRelease() {
assertEquals(1, acquirePage(pageHeaderAddr));
assertEquals(0, acquirePage(pageHeaderAddr));
assertEquals(1, pinCount(pageHeaderAddr));
assertTrue(isAcquired(pageHeaderAddr));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ public void start() {
regionConfiguration(dataRegionConfigView, sizeBytes, pageSize),
metricSource,
ioRegistry,
calculateSegmentSizes(sizeBytes, Runtime.getRuntime().availableProcessors()),
calculateSegmentSizes(sizeBytes, Runtime.getRuntime().availableProcessors() * 4),
calculateCheckpointBufferSize(sizeBytes),
filePageStoreManager,
this::flushDirtyPageOnReplacement,
Expand Down