Skip to content
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.metrics;

import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;

import java.util.Arrays;
import org.hamcrest.FeatureMatcher;
import org.hamcrest.Matcher;

/**
* Hamcrest matchers for testing metrics.
*/
public final class MetricMatchers {
private MetricMatchers() {
// No-op.
}

/**
* Creates a matcher that matches a {@link DistributionMetric} with the expected total number of measurements
* across all histogram buckets.
*
* @param expectedMeasuresCount Expected total number of measurements across all buckets.
* @return Matcher for distribution metric measures count.
*/
public static Matcher<DistributionMetric> hasMeasurementsCount(long expectedMeasuresCount) {
return new FeatureMatcher<>(
is(expectedMeasuresCount),
"a DistributionMetric with measures count",
"measures count") {
@Override
protected Long featureValueOf(DistributionMetric metric) {
return Arrays.stream(metric.value()).sum();
}
};
}

/**
* Creates a matcher that matches a {@link LongMetric} whose value satisfies the given matcher.
*
* @param valueMatcher Matcher for the metric value.
* @return Matcher for long metric value.
*/
public static Matcher<LongMetric> hasValue(Matcher<Long> valueMatcher) {
return new FeatureMatcher<>(
valueMatcher,
"a LongMetric with value",
"value") {
@Override
protected Long featureValueOf(LongMetric metric) {
return metric.value();
}
};
}

/**
* Creates a matcher that matches a {@link MetricSet} containing a metric with the given name
* that satisfies the given matcher.
*
* @param name Name of the metric to look up in the metric set.
* @param metricMatcher Matcher for the metric.
* @param <M> Type of the metric.
* @return Matcher for metric set containing the specified metric.
*/
public static <M extends Metric> Matcher<MetricSet> hasMetric(String name, Matcher<M> metricMatcher) {
return new FeatureMatcher<>(
allOf(notNullValue(), metricMatcher),
"a MetricSet with metric named \"" + name + "\"",
"metric named \"" + name + "\"") {
@Override
protected M featureValueOf(MetricSet metricSet) {
return metricSet.get(name);
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static java.util.concurrent.CompletableFuture.failedFuture;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.ignite.internal.metrics.MetricMatchers.hasMeasurementsCount;
import static org.apache.ignite.internal.pagememory.persistence.CheckpointUrgency.MUST_TRIGGER;
import static org.apache.ignite.internal.pagememory.persistence.CheckpointUrgency.NOT_REQUIRED;
import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.LOCK_RELEASED;
Expand All @@ -42,7 +43,6 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import java.util.Arrays;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
Expand All @@ -53,7 +53,6 @@
import org.apache.ignite.internal.failure.FailureManager;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.metrics.DistributionMetric;
import org.apache.ignite.internal.pagememory.metrics.CollectionMetricSource;
import org.apache.ignite.internal.pagememory.persistence.CheckpointUrgency;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
Expand Down Expand Up @@ -438,17 +437,26 @@ void testCheckpointReadLockMetrics() {

try {
// Verify metrics start at zero
assertDistributionMetricRecordsCount(metrics.readLockAcquisitionTime(), 0L);
assertThat(
metrics.readLockAcquisitionTime(),
hasMeasurementsCount(0L)
);

// Acquire and immediately release the lock
timeoutLock.checkpointReadLock();
timeoutLock.checkpointReadUnlock();

// Verify acquisition was recorded
assertDistributionMetricRecordsCount(metrics.readLockAcquisitionTime(), 1L);
assertThat(
metrics.readLockAcquisitionTime(),
hasMeasurementsCount(1L)
);

// Verify hold time distribution was recorded
assertDistributionMetricRecordsCount(metrics.readLockHoldTime(), 1L);
assertThat(
metrics.readLockHoldTime(),
hasMeasurementsCount(1L)
);

readWriteLock.writeLock();
runAsync(() -> {
Expand All @@ -462,20 +470,4 @@ void testCheckpointReadLockMetrics() {
timeoutLock.stop();
}
}

/**
* Verifies that the specified distribution metric has recorded the expected total number of measurements.
*
* <p>
* Rather than checking individual histogram buckets, this method aggregates all recorded measurements across every bucket
* and confirms that the expected interaction was captured in at least one of them.
*/
private static void assertDistributionMetricRecordsCount(DistributionMetric metric, long expectedMeasuresCount) {
long totalMeasuresCount = Arrays.stream(metric.value()).sum();
assertThat(
"Unexpected total measures count in distribution metric " + metric.name(),
totalMeasuresCount,
is(expectedMeasuresCount)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.ignite.internal.pagememory.persistence.replacement;

import static org.apache.ignite.internal.metrics.MetricMatchers.hasMetric;
import static org.apache.ignite.internal.metrics.MetricMatchers.hasValue;
import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_DATA;
import static org.apache.ignite.internal.pagememory.persistence.PersistentPageMemoryMetricSource.DIRTY_PAGES;
import static org.apache.ignite.internal.pagememory.persistence.PersistentPageMemoryMetricSource.LOADED_PAGES;
Expand All @@ -36,7 +38,6 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
Expand All @@ -60,7 +61,6 @@
import org.apache.ignite.internal.failure.FailureManager;
import org.apache.ignite.internal.fileio.RandomAccessFileIoFactory;
import org.apache.ignite.internal.lang.RunnableX;
import org.apache.ignite.internal.metrics.LongMetric;
import org.apache.ignite.internal.metrics.MetricSet;
import org.apache.ignite.internal.pagememory.DataRegion;
import org.apache.ignite.internal.pagememory.TestDataRegion;
Expand Down Expand Up @@ -374,12 +374,7 @@ void verifyPageMemoryMetrics() throws Throwable {
}

private void assertMetricValue(String metricName, Matcher<Long> valueMatcher) {
LongMetric metric = metricSet.get(metricName);
assertThat(metric, is(notNullValue()));
assertThat(
metric.value(),
valueMatcher
);
assertThat(metricSet, hasMetric(metricName, hasValue(valueMatcher)));
}

private void createAndFillTestSimpleValuePage(long pageId) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import static java.nio.file.StandardOpenOption.CREATE;
import static java.nio.file.StandardOpenOption.READ;
import static java.nio.file.StandardOpenOption.WRITE;
import static org.apache.ignite.internal.metrics.MetricMatchers.hasMeasurementsCount;
import static org.apache.ignite.internal.metrics.MetricMatchers.hasMetric;
import static org.apache.ignite.internal.metrics.MetricMatchers.hasValue;
import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_DATA;
import static org.apache.ignite.internal.pagememory.persistence.store.FilePageStore.VERSION_1;
import static org.apache.ignite.internal.pagememory.persistence.store.TestPageStoreUtils.createPageByteBuffer;
Expand All @@ -29,20 +32,16 @@
import static org.hamcrest.Matchers.startsWith;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.util.Arrays;
import org.apache.ignite.internal.fileio.FileIo;
import org.apache.ignite.internal.fileio.FileIoFactory;
import org.apache.ignite.internal.fileio.MeteredFileIoFactory;
import org.apache.ignite.internal.fileio.RandomAccessFileIo;
import org.apache.ignite.internal.fileio.RandomAccessFileIoFactory;
import org.apache.ignite.internal.metrics.DistributionMetric;
import org.apache.ignite.internal.metrics.LongMetric;
import org.apache.ignite.internal.metrics.MetricSet;
import org.apache.ignite.internal.pagememory.metrics.CollectionMetricSource;
import org.apache.ignite.internal.pagememory.persistence.PageMemoryIoMetrics;
Expand Down Expand Up @@ -145,7 +144,7 @@ void testIoMetricsRecordedDuringActualFileOperations() throws Exception {

// Verify write metrics were recorded - 1 write of header + 1 write of page
assertMetricValue(metricSet, PageMemoryIoMetrics.TOTAL_BYTES_WRITTEN, PAGE_SIZE * 2);
assertDistributionMetricRecordsCount(metricSet, PageMemoryIoMetrics.WRITES_TIME, 2L);
assertDistributionMetricFromSet(metricSet, PageMemoryIoMetrics.WRITES_TIME, 2L);

// Perform read operation
long pageOff = filePageStoreIo.pageOffset(pageId);
Expand All @@ -154,7 +153,7 @@ void testIoMetricsRecordedDuringActualFileOperations() throws Exception {

// Verify read metrics were recorded
assertMetricValue(metricSet, PageMemoryIoMetrics.TOTAL_BYTES_READ, PAGE_SIZE);
assertDistributionMetricRecordsCount(metricSet, PageMemoryIoMetrics.READS_TIME, 1L);
assertDistributionMetricFromSet(metricSet, PageMemoryIoMetrics.READS_TIME, 1L);
}
}

Expand All @@ -176,28 +175,16 @@ private static FilePageStoreIo createFilePageStoreIo(Path filePath, PageMemoryIo
}

private static void assertMetricValue(MetricSet metrics, String metricName, long value) {
LongMetric metric = metrics.get(metricName);

assertNotNull(metric, "Metric not found: " + metricName);
assertEquals(value, metric.value(), metricName);
assertThat(metrics, hasMetric(
metricName,
hasValue(is(value))
));
}

/**
* Verifies that the specified distribution metric has recorded the expected total number of measurements.
*
* <p>
* Rather than checking individual histogram buckets, this method aggregates all recorded measurements across every bucket
* and confirms that the expected interaction was captured in at least one of them.
*/
private static void assertDistributionMetricRecordsCount(MetricSet metrics, String metricName, long expectedMeasuresCount) {
DistributionMetric metric = metrics.get(metricName);
assertNotNull(metric, metricName);

long totalMeasuresCount = Arrays.stream(metric.value()).sum();
assertThat(
"Unexpected total measures count in distribution metric " + metric.name(),
totalMeasuresCount,
is(expectedMeasuresCount)
);
private static void assertDistributionMetricFromSet(MetricSet metrics, String metricName, long expectedMeasuresCount) {
assertThat(metrics, hasMetric(
metricName,
hasMeasurementsCount(expectedMeasuresCount)
));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.apache.ignite.internal.storage.pagememory.configuration.schema.PersistentPageMemoryProfileView;
import org.apache.ignite.internal.storage.pagememory.configuration.schema.PersistentPageMemoryStorageEngineConfiguration;
import org.apache.ignite.internal.storage.pagememory.configuration.schema.PersistentPageMemoryStorageEngineExtensionConfiguration;
import org.apache.ignite.internal.storage.pagememory.mv.RunConsistentlyMetrics;
import org.apache.ignite.internal.thread.IgniteThreadFactory;
import org.jetbrains.annotations.Nullable;

Expand Down Expand Up @@ -99,7 +100,7 @@ public class PersistentPageMemoryStorageEngine extends AbstractPageMemoryStorage

private CollectionMetricSource checkpointMetricSource;

private PersistentPageMemoryStorageMetricSource storageMetricSource;
private CollectionMetricSource storageMetricSource;

private final StorageConfiguration storageConfig;

Expand Down Expand Up @@ -130,6 +131,8 @@ public class PersistentPageMemoryStorageEngine extends AbstractPageMemoryStorage
/** For unspecified tasks, i.e. throttling log. */
private final ExecutorService commonExecutorService;

private RunConsistentlyMetrics runConsistentlyMetrics;

/**
* Constructor.
*
Expand Down Expand Up @@ -253,10 +256,12 @@ public void start() throws StorageException {

destructionExecutor = executor;

storageMetricSource = new PersistentPageMemoryStorageMetricSource("storage." + ENGINE_NAME);
storageMetricSource = new CollectionMetricSource("storage." + ENGINE_NAME, "storage", null);

PersistentPageMemoryStorageMetrics.initMetrics(storageMetricSource, filePageStoreManager);

runConsistentlyMetrics = new RunConsistentlyMetrics(storageMetricSource);

metricManager.registerSource(checkpointMetricSource);
metricManager.registerSource(storageMetricSource);
metricManager.registerSource(ioMetricSource);
Expand Down Expand Up @@ -332,7 +337,8 @@ public MvTableStorage createMvTable(
this,
dataRegion,
destructionExecutor,
failureManager
failureManager,
runConsistentlyMetrics
);

dataRegion.addTableStorage(tableStorage);
Expand Down
Loading