-
Notifications
You must be signed in to change notification settings - Fork 1.9k
IGNITE-27716: Add group name to DumpReader logs #12840
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -131,38 +131,40 @@ public DumpReader(DumpReaderConfiguration cfg, IgniteLogger log) { | |
|
|
||
| for (Map.Entry<Integer, List<String>> e : grpsCfgs.grpToNodes.entrySet()) { | ||
| int grp = e.getKey(); | ||
| String grpName = grpsCfgs.grpIdToName.get(grp); | ||
|
|
||
| for (String node : e.getValue()) { | ||
| for (int part : dump.partitions(node, grp)) { | ||
| if (grps != null && !grps.get(grp).add(part)) { | ||
| log.info("Skip copy partition [node=" + node + ", grp=" + grp + ", part=" + part + ']'); | ||
| log.info("Skip copy partition [node=" + node + ", grpName=" + grpName + | ||
| ", grp=" + grp + ", part=" + part + ']'); | ||
|
|
||
| continue; | ||
| } | ||
|
|
||
| Runnable consumePart = () -> { | ||
| if (skip.get()) { | ||
| if (log.isDebugEnabled()) { | ||
| log.debug("Skip partition due to previous error [node=" + node + ", grp=" + grp + | ||
| ", part=" + part + ']'); | ||
| log.debug("Skip partition due to previous error [node=" + node + | ||
| ", grpName=" + grpName + ", grp=" + grp + ", part=" + part + ']'); | ||
| } | ||
|
|
||
| return; | ||
| } | ||
|
|
||
| try (DumpedPartitionIterator iter = dump.iterator(node, grp, part, grpsCfgs.cacheIds)) { | ||
| if (log.isDebugEnabled()) { | ||
| log.debug("Consuming partition [node=" + node + ", grp=" + grp + | ||
| ", part=" + part + ']'); | ||
| log.debug("Consuming partition [node=" + node + ", grpName=" + grpName + | ||
| ", grp=" + grp + ", part=" + part + ']'); | ||
| } | ||
|
|
||
| cnsmr.onPartition(grp, part, iter); | ||
| } | ||
| catch (Exception ex) { | ||
| skip.set(cfg.failFast()); | ||
|
|
||
| log.error("Error consuming partition [node=" + node + ", grp=" + grp + | ||
| ", part=" + part + ']', ex); | ||
| log.error("Error consuming partition [node=" + node + ", grpName=" + grpName + | ||
| ", grp=" + grp + ", part=" + part + ']', ex); | ||
|
|
||
| throw new IgniteException(ex); | ||
| } | ||
|
|
@@ -367,9 +369,19 @@ private static GridKernalContext standaloneKernalContext(SnapshotFileTree sft, I | |
| private GroupsConfigs groupsConfigs(Dump dump) { | ||
| Map<Integer, List<String>> grpsToNodes = new HashMap<>(); | ||
| List<StoredCacheData> ccfgs = new ArrayList<>(); | ||
| Map<Integer, String> grpIdToName = new HashMap<>(); | ||
|
|
||
| Set<Integer> grpIds = cfg.groupNames() != null | ||
| ? Arrays.stream(cfg.groupNames()).map(CU::cacheId).collect(Collectors.toSet()) | ||
| ? Arrays.stream(cfg.groupNames()) | ||
| .map(grpName -> { | ||
| int grpId = CU.cacheId(grpName); | ||
|
|
||
| if (!grpIdToName.containsKey(grpId)) | ||
| grpIdToName.put(grpId, grpName); | ||
|
|
||
| return grpId; | ||
| }) | ||
| .collect(Collectors.toSet()) | ||
| : null; | ||
|
|
||
| Set<Integer> cacheIds = cfg.cacheNames() != null | ||
|
|
@@ -394,11 +406,14 @@ private GroupsConfigs groupsConfigs(Dump dump) { | |
| } | ||
|
|
||
| grpsToNodes.get(grp).add(meta.folderName()); | ||
|
|
||
| if (!grpIdToName.containsKey(grp) && !grpCaches.isEmpty()) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There's a check above in the loop: |
||
| grpIdToName.put(grp, grpCaches.get(0).configuration().getGroupName()); | ||
| } | ||
| } | ||
|
|
||
| // Optimize - skip whole cache if only one in group! | ||
| return new GroupsConfigs(grpsToNodes, ccfgs, cacheIds); | ||
| return new GroupsConfigs(grpsToNodes, ccfgs, cacheIds, grpIdToName); | ||
| } | ||
|
|
||
| /** */ | ||
|
|
@@ -412,11 +427,16 @@ private static class GroupsConfigs { | |
| /** Cache ids. */ | ||
| public final Set<Integer> cacheIds; | ||
|
|
||
| /** Mapping from group id to group name. */ | ||
| public final Map<Integer, String> grpIdToName; | ||
|
|
||
| /** */ | ||
| public GroupsConfigs(Map<Integer, List<String>> grpToNodes, Collection<StoredCacheData> cacheCfgs, Set<Integer> cacheIds) { | ||
| public GroupsConfigs(Map<Integer, List<String>> grpToNodes, Collection<StoredCacheData> cacheCfgs, Set<Integer> cacheIds, | ||
| Map<Integer, String> grpIdToName) { | ||
| this.grpToNodes = grpToNodes; | ||
| this.cacheCfgs = cacheCfgs; | ||
| this.cacheIds = cacheIds; | ||
| this.grpIdToName = grpIdToName; | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -121,6 +121,7 @@ | |
| import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.SNAPSHOT_TRANSFER_RATE_DMS_KEY; | ||
| import static org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.AbstractCacheDumpTest.CACHE_0; | ||
| import static org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.AbstractCacheDumpTest.DMP_NAME; | ||
| import static org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.AbstractCacheDumpTest.GRP; | ||
| import static org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.AbstractCacheDumpTest.KEYS_CNT; | ||
| import static org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.AbstractCacheDumpTest.USER_FACTORY; | ||
| import static org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.AbstractCacheDumpTest.dump; | ||
|
|
@@ -1219,6 +1220,190 @@ public void testReadEncrypted() throws Exception { | |
| IntStream.range(0, KEYS_CNT).forEach(i -> assertEquals((Integer)i, dumpEntries.get(i))); | ||
| } | ||
|
|
||
| /** */ | ||
| @Test | ||
| public void testDumpReaderDebugLogsGroupName() throws Exception { | ||
| String id = "test"; | ||
| setLoggerDebugLevel(); | ||
| ListeningTestLogger testLog = new ListeningTestLogger(log); | ||
|
|
||
| LogListener errLsnr = LogListener.matches("Error consuming partition") | ||
| .andMatches("grpName=" + GRP) | ||
| .build(); | ||
| LogListener cnsmLsnr = LogListener.matches("Consuming partition") | ||
| .andMatches("grpName=" + GRP) | ||
| .build(); | ||
| testLog.registerListener(errLsnr); | ||
| testLog.registerListener(cnsmLsnr); | ||
|
|
||
| try { | ||
| IgniteEx ign0 = startGrid(getConfiguration(id) | ||
| .setConsistentId(id) | ||
| .setGridLogger(testLog)); | ||
|
|
||
| ign0.cluster().state(ClusterState.ACTIVE); | ||
|
|
||
| IgniteCache<Integer, Integer> cache = ign0.createCache(new CacheConfiguration<Integer, Integer>() | ||
| .setName(CACHE_0) | ||
| .setGroupName(GRP) | ||
| .setBackups(1) | ||
| .setAffinity(new RendezvousAffinityFunction().setPartitions(3)) | ||
| ); | ||
|
|
||
| IntStream.range(0, KEYS_CNT).forEach(i -> cache.put(i, i)); | ||
|
|
||
| ign0.snapshot().createDump(DMP_NAME, null).get(getTestTimeout()); | ||
|
|
||
| TestDumpConsumer cnsmr = new TestDumpConsumer() { | ||
| @Override public void onPartition(int grp, int part, Iterator<DumpEntry> data) { | ||
| throw new RuntimeException("trigger error log"); | ||
| } | ||
| }; | ||
|
|
||
| try { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I suggest replacing it with: |
||
| new DumpReader( | ||
| new DumpReaderConfiguration( | ||
| DMP_NAME, | ||
| null, | ||
| ign0.configuration(), | ||
| cnsmr, | ||
| DFLT_THREAD_CNT, | ||
| DFLT_TIMEOUT, | ||
| true, | ||
| true, | ||
| false, | ||
| new String[]{GRP}, | ||
| null, | ||
| false, | ||
| null | ||
| ), | ||
| testLog | ||
| ).run(); | ||
| fail("Expected IgniteException"); | ||
| } | ||
| catch (IgniteException ignored) { | ||
| // No-op. | ||
| } | ||
|
|
||
| assertTrue("Log with group name not found", errLsnr.check()); | ||
| assertTrue("Consuming with group name not found", cnsmLsnr.check()); | ||
| } | ||
| finally { | ||
| stopAllGrids(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| } | ||
| } | ||
|
|
||
| /** */ | ||
| @Test | ||
| public void testDumpReaderSkipCopiesLogsGroupName() throws Exception { | ||
| int parts = 4; | ||
| String dumpName0 = "dump0"; | ||
| String dumpName1 = "dump1"; | ||
|
|
||
| File snapshotPath0 = Files.createTempDirectory("snapshots0").toFile(); | ||
| File snapshotPath1 = Files.createTempDirectory("snapshots1").toFile(); | ||
| File combinedDumpDir = Files.createTempDirectory("combined_dump").toFile(); | ||
|
|
||
| ListeningTestLogger testLog = new ListeningTestLogger(log); | ||
|
|
||
| LogListener skipLsnr = LogListener.matches("Skip copy partition") | ||
| .times(parts) | ||
| .andMatches("grpName=" + GRP) | ||
| .build(); | ||
| testLog.registerListener(skipLsnr); | ||
|
|
||
| try { | ||
| IgniteEx node0 = startGrid(getConfiguration("node0") | ||
| .setConsistentId("node0") | ||
| .setSnapshotPath(snapshotPath0.getAbsolutePath()) | ||
| .setGridLogger(testLog)); | ||
|
|
||
| IgniteEx node1 = startGrid(getConfiguration("node1") | ||
| .setConsistentId("node1") | ||
| .setSnapshotPath(snapshotPath1.getAbsolutePath()) | ||
| .setGridLogger(testLog)); | ||
|
|
||
| node0.cluster().state(ClusterState.ACTIVE); | ||
| node1.cluster().state(ClusterState.ACTIVE); | ||
|
|
||
| CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<Integer, Integer>() | ||
| .setName(DEFAULT_CACHE_NAME) | ||
| .setGroupName(GRP) | ||
| .setBackups(1) | ||
| .setAffinity(new RendezvousAffinityFunction().setPartitions(parts)); | ||
|
|
||
| IgniteCache<Integer, Integer> cache = node0.createCache(ccfg); | ||
|
|
||
| IntStream.range(0, KEYS_CNT).forEach(i -> cache.put(i, i)); | ||
|
|
||
| node0.snapshot().createDump(dumpName0, null).get(getTestTimeout()); | ||
|
|
||
| Thread.sleep(100); | ||
|
|
||
| node1.snapshot().createDump(dumpName1, null).get(getTestTimeout()); | ||
|
|
||
| File dumpDir0 = new File(snapshotPath0, dumpName0); | ||
| File dumpDir1 = new File(snapshotPath1, dumpName1); | ||
|
|
||
| U.copy(dumpDir0, combinedDumpDir, true); | ||
| U.copy(dumpDir1, combinedDumpDir, true); | ||
|
|
||
| DumpConsumer dummyConsumer = new DumpConsumer() { | ||
| @Override public void start() { | ||
| // No-op. | ||
| } | ||
|
|
||
| @Override public void onMappings(Iterator<TypeMapping> mappings) { | ||
| // No-op. | ||
| } | ||
|
|
||
| @Override public void onTypes(Iterator<BinaryType> types) { | ||
| // No-op. | ||
| } | ||
|
|
||
| @Override public void onCacheConfigs(Iterator<StoredCacheData> caches) { | ||
| // No-op. | ||
| } | ||
|
|
||
| @Override public void onPartition(int grpId, int partId, Iterator<DumpEntry> data) { | ||
| while (data.hasNext()) | ||
| data.next(); | ||
| } | ||
|
|
||
| @Override public void stop() { | ||
| // No-op. | ||
| } | ||
| }; | ||
|
|
||
| new DumpReader( | ||
| new DumpReaderConfiguration( | ||
| null, | ||
| combinedDumpDir.getAbsolutePath(), | ||
| null, | ||
| dummyConsumer, | ||
| DFLT_THREAD_CNT, | ||
| DFLT_TIMEOUT, | ||
| false, | ||
| true, | ||
| true, | ||
| new String[]{GRP}, | ||
| null, | ||
| true, | ||
| null | ||
| ), | ||
| testLog | ||
| ).run(); | ||
|
|
||
| assertTrue(skipLsnr.check()); | ||
| } | ||
| finally { | ||
| stopAllGrids(); | ||
| U.delete(snapshotPath0); | ||
| U.delete(snapshotPath1); | ||
| U.delete(combinedDumpDir); | ||
| } | ||
| } | ||
|
|
||
| /** */ | ||
| @Test | ||
| public void testDumpRateLimiter() throws Exception { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should rename grp -> grpId - both here and in the log messages?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
grp must be used for group name in logs.