Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 30 additions & 10 deletions modules/core/src/main/java/org/apache/ignite/dump/DumpReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -131,38 +131,40 @@ public DumpReader(DumpReaderConfiguration cfg, IgniteLogger log) {

for (Map.Entry<Integer, List<String>> e : grpsCfgs.grpToNodes.entrySet()) {
int grp = e.getKey();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should rename grp -> grpId - both here and in the log messages?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

grp must be used for group name in logs.

String grpName = grpsCfgs.grpIdToName.get(grp);

for (String node : e.getValue()) {
for (int part : dump.partitions(node, grp)) {
if (grps != null && !grps.get(grp).add(part)) {
log.info("Skip copy partition [node=" + node + ", grp=" + grp + ", part=" + part + ']');
log.info("Skip copy partition [node=" + node + ", grpName=" + grpName +
", grp=" + grp + ", part=" + part + ']');

continue;
}

Runnable consumePart = () -> {
if (skip.get()) {
if (log.isDebugEnabled()) {
log.debug("Skip partition due to previous error [node=" + node + ", grp=" + grp +
", part=" + part + ']');
log.debug("Skip partition due to previous error [node=" + node +
", grpName=" + grpName + ", grp=" + grp + ", part=" + part + ']');
}

return;
}

try (DumpedPartitionIterator iter = dump.iterator(node, grp, part, grpsCfgs.cacheIds)) {
if (log.isDebugEnabled()) {
log.debug("Consuming partition [node=" + node + ", grp=" + grp +
", part=" + part + ']');
log.debug("Consuming partition [node=" + node + ", grpName=" + grpName +
", grp=" + grp + ", part=" + part + ']');
}

cnsmr.onPartition(grp, part, iter);
}
catch (Exception ex) {
skip.set(cfg.failFast());

log.error("Error consuming partition [node=" + node + ", grp=" + grp +
", part=" + part + ']', ex);
log.error("Error consuming partition [node=" + node + ", grpName=" + grpName +
", grp=" + grp + ", part=" + part + ']', ex);

throw new IgniteException(ex);
}
Expand Down Expand Up @@ -367,9 +369,19 @@ private static GridKernalContext standaloneKernalContext(SnapshotFileTree sft, I
private GroupsConfigs groupsConfigs(Dump dump) {
Map<Integer, List<String>> grpsToNodes = new HashMap<>();
List<StoredCacheData> ccfgs = new ArrayList<>();
Map<Integer, String> grpIdToName = new HashMap<>();

Set<Integer> grpIds = cfg.groupNames() != null
? Arrays.stream(cfg.groupNames()).map(CU::cacheId).collect(Collectors.toSet())
? Arrays.stream(cfg.groupNames())
.map(grpName -> {
int grpId = CU.cacheId(grpName);

if (!grpIdToName.containsKey(grpId))
grpIdToName.put(grpId, grpName);

return grpId;
})
.collect(Collectors.toSet())
: null;

Set<Integer> cacheIds = cfg.cacheNames() != null
Expand All @@ -394,11 +406,14 @@ private GroupsConfigs groupsConfigs(Dump dump) {
}

grpsToNodes.get(grp).add(meta.folderName());

if (!grpIdToName.containsKey(grp) && !grpCaches.isEmpty())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

!grpCaches.isEmpty() - this is always true here.

There's a check above in the loop:

                if (F.isEmpty(grpCaches))
                    continue;

grpIdToName.put(grp, grpCaches.get(0).configuration().getGroupName());
}
}

// Optimize - skip whole cache if only one in group!
return new GroupsConfigs(grpsToNodes, ccfgs, cacheIds);
return new GroupsConfigs(grpsToNodes, ccfgs, cacheIds, grpIdToName);
}

/** */
Expand All @@ -412,11 +427,16 @@ private static class GroupsConfigs {
/** Cache ids. */
public final Set<Integer> cacheIds;

/** Mapping from group id to group name. */
public final Map<Integer, String> grpIdToName;

/** */
public GroupsConfigs(Map<Integer, List<String>> grpToNodes, Collection<StoredCacheData> cacheCfgs, Set<Integer> cacheIds) {
public GroupsConfigs(Map<Integer, List<String>> grpToNodes, Collection<StoredCacheData> cacheCfgs, Set<Integer> cacheIds,
Map<Integer, String> grpIdToName) {
this.grpToNodes = grpToNodes;
this.cacheCfgs = cacheCfgs;
this.cacheIds = cacheIds;
this.grpIdToName = grpIdToName;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@
import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.SNAPSHOT_TRANSFER_RATE_DMS_KEY;
import static org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.AbstractCacheDumpTest.CACHE_0;
import static org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.AbstractCacheDumpTest.DMP_NAME;
import static org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.AbstractCacheDumpTest.GRP;
import static org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.AbstractCacheDumpTest.KEYS_CNT;
import static org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.AbstractCacheDumpTest.USER_FACTORY;
import static org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.AbstractCacheDumpTest.dump;
Expand Down Expand Up @@ -1219,6 +1220,190 @@ public void testReadEncrypted() throws Exception {
IntStream.range(0, KEYS_CNT).forEach(i -> assertEquals((Integer)i, dumpEntries.get(i)));
}

/** */
@Test
public void testDumpReaderDebugLogsGroupName() throws Exception {
String id = "test";
setLoggerDebugLevel();
ListeningTestLogger testLog = new ListeningTestLogger(log);

LogListener errLsnr = LogListener.matches("Error consuming partition")
.andMatches("grpName=" + GRP)
.build();
LogListener cnsmLsnr = LogListener.matches("Consuming partition")
.andMatches("grpName=" + GRP)
.build();
testLog.registerListener(errLsnr);
testLog.registerListener(cnsmLsnr);

try {
IgniteEx ign0 = startGrid(getConfiguration(id)
.setConsistentId(id)
.setGridLogger(testLog));

ign0.cluster().state(ClusterState.ACTIVE);

IgniteCache<Integer, Integer> cache = ign0.createCache(new CacheConfiguration<Integer, Integer>()
.setName(CACHE_0)
.setGroupName(GRP)
.setBackups(1)
.setAffinity(new RendezvousAffinityFunction().setPartitions(3))
);

IntStream.range(0, KEYS_CNT).forEach(i -> cache.put(i, i));

ign0.snapshot().createDump(DMP_NAME, null).get(getTestTimeout());

TestDumpConsumer cnsmr = new TestDumpConsumer() {
@Override public void onPartition(int grp, int part, Iterator<DumpEntry> data) {
throw new RuntimeException("trigger error log");
}
};

try {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest replacing it with:

        assertThrows(null, () -> new DumpReader(
            new DumpReaderConfiguration(
                DMP_NAME,
                null,
                ign0.configuration(),
                cnsmr,
                DFLT_THREAD_CNT,
                DFLT_TIMEOUT,
                true,
                true,
                false,
                new String[]{GRP},
                null,
                false,
                null
            ),
            testLog
        ).run(), RuntimeException.class, "trigger error log");

new DumpReader(
new DumpReaderConfiguration(
DMP_NAME,
null,
ign0.configuration(),
cnsmr,
DFLT_THREAD_CNT,
DFLT_TIMEOUT,
true,
true,
false,
new String[]{GRP},
null,
false,
null
),
testLog
).run();
fail("Expected IgniteException");
}
catch (IgniteException ignored) {
// No-op.
}

assertTrue("Log with group name not found", errLsnr.check());
assertTrue("Consuming with group name not found", cnsmLsnr.check());
}
finally {
stopAllGrids();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stopAllGrids() is already called after the test completes in IgniteCacheDumpSelf2Test#afterTest, so it can be removed here.

}
}

/** */
@Test
public void testDumpReaderSkipCopiesLogsGroupName() throws Exception {
int parts = 4;
String dumpName0 = "dump0";
String dumpName1 = "dump1";

File snapshotPath0 = Files.createTempDirectory("snapshots0").toFile();
File snapshotPath1 = Files.createTempDirectory("snapshots1").toFile();
File combinedDumpDir = Files.createTempDirectory("combined_dump").toFile();

ListeningTestLogger testLog = new ListeningTestLogger(log);

LogListener skipLsnr = LogListener.matches("Skip copy partition")
.times(parts)
.andMatches("grpName=" + GRP)
.build();
testLog.registerListener(skipLsnr);

try {
IgniteEx node0 = startGrid(getConfiguration("node0")
.setConsistentId("node0")
.setSnapshotPath(snapshotPath0.getAbsolutePath())
.setGridLogger(testLog));

IgniteEx node1 = startGrid(getConfiguration("node1")
.setConsistentId("node1")
.setSnapshotPath(snapshotPath1.getAbsolutePath())
.setGridLogger(testLog));

node0.cluster().state(ClusterState.ACTIVE);
node1.cluster().state(ClusterState.ACTIVE);

CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<Integer, Integer>()
.setName(DEFAULT_CACHE_NAME)
.setGroupName(GRP)
.setBackups(1)
.setAffinity(new RendezvousAffinityFunction().setPartitions(parts));

IgniteCache<Integer, Integer> cache = node0.createCache(ccfg);

IntStream.range(0, KEYS_CNT).forEach(i -> cache.put(i, i));

node0.snapshot().createDump(dumpName0, null).get(getTestTimeout());

Thread.sleep(100);

node1.snapshot().createDump(dumpName1, null).get(getTestTimeout());

File dumpDir0 = new File(snapshotPath0, dumpName0);
File dumpDir1 = new File(snapshotPath1, dumpName1);

U.copy(dumpDir0, combinedDumpDir, true);
U.copy(dumpDir1, combinedDumpDir, true);

DumpConsumer dummyConsumer = new DumpConsumer() {
@Override public void start() {
// No-op.
}

@Override public void onMappings(Iterator<TypeMapping> mappings) {
// No-op.
}

@Override public void onTypes(Iterator<BinaryType> types) {
// No-op.
}

@Override public void onCacheConfigs(Iterator<StoredCacheData> caches) {
// No-op.
}

@Override public void onPartition(int grpId, int partId, Iterator<DumpEntry> data) {
while (data.hasNext())
data.next();
}

@Override public void stop() {
// No-op.
}
};

new DumpReader(
new DumpReaderConfiguration(
null,
combinedDumpDir.getAbsolutePath(),
null,
dummyConsumer,
DFLT_THREAD_CNT,
DFLT_TIMEOUT,
false,
true,
true,
new String[]{GRP},
null,
true,
null
),
testLog
).run();

assertTrue(skipLsnr.check());
}
finally {
stopAllGrids();
U.delete(snapshotPath0);
U.delete(snapshotPath1);
U.delete(combinedDumpDir);
}
}

/** */
@Test
public void testDumpRateLimiter() throws Exception {
Expand Down