Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import org.apache.ignite.internal.cache.query.index.IndexQueryResultMeta;
import org.apache.ignite.internal.cache.query.index.sorted.IndexKeyDefinition;
import org.apache.ignite.internal.cache.query.index.sorted.IndexKeyTypeSettings;
import org.apache.ignite.internal.management.cache.IdleVerifyDumpResult;
import org.apache.ignite.internal.management.cache.IdleVerifyResult;
import org.apache.ignite.internal.management.cache.PartitionKey;
import org.apache.ignite.internal.managers.checkpoint.GridCheckpointRequest;
import org.apache.ignite.internal.managers.communication.CompressedMessage;
import org.apache.ignite.internal.managers.communication.ErrorMessage;
Expand Down Expand Up @@ -153,6 +156,7 @@
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotOperationRequest;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotOperationResponse;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotPartitionsVerifyHandlerResponse;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotPartitionsVerifyResult;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotRestoreOperationResponse;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotRestoreStartRequest;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotStartDiscoveryMessage;
Expand All @@ -168,6 +172,8 @@
import org.apache.ignite.internal.processors.cache.transactions.TxLock;
import org.apache.ignite.internal.processors.cache.transactions.TxLocksRequest;
import org.apache.ignite.internal.processors.cache.transactions.TxLocksResponse;
import org.apache.ignite.internal.processors.cache.verify.PartitionHashRecord;
import org.apache.ignite.internal.processors.cache.verify.TransactionsHashRecord;
import org.apache.ignite.internal.processors.cache.version.GridCacheRawVersionedEntry;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx;
Expand Down Expand Up @@ -436,6 +442,7 @@ public CoreMessagesProvider(Marshaller dfltMarsh, Marshaller schemaAwareMarsh, C
withNoSchema(ExchangeFailureMessage.class);
withNoSchema(CacheStatisticsClearMessage.class);
withNoSchema(ClientCacheChangeDummyDiscoveryMessage.class);
// TODO: revise using resolved class loader, https://issues.apache.org/jira/browse/IGNITE-28637
withNoSchemaResolvedClassLoader(DynamicCacheChangeBatch.class);

// [10000 - 10200]: Transaction and lock related messages. Most of them originally comes from Communication.
Expand Down Expand Up @@ -531,6 +538,8 @@ public CoreMessagesProvider(Marshaller dfltMarsh, Marshaller schemaAwareMarsh, C
withNoSchema(GridDhtPartitionsFullMessage.class);
withNoSchema(GridDhtPartitionsSingleMessage.class);
withNoSchema(GridDhtPartitionsSingleRequest.class);
withNoSchema(PartitionKey.class);
withNoSchema(PartitionHashRecord.class);

// [10900-11100]: Query, schema and SQL related messages.
msgIdx = 10900;
Expand Down Expand Up @@ -647,6 +656,10 @@ public CoreMessagesProvider(Marshaller dfltMarsh, Marshaller schemaAwareMarsh, C
withNoSchema(WalStateAckMessage.class);
withNoSchema(CacheConfigurationEnrichment.class);
withNoSchemaResolvedClassLoader(DynamicCacheChangeRequest.class);
withNoSchema(TransactionsHashRecord.class);
withNoSchema(SnapshotPartitionsVerifyResult.class);
withNoSchema(IdleVerifyResult.class);
withNoSchema(IdleVerifyDumpResult.class);

assert msgIdx <= MAX_MESSAGE_ID;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,20 @@
*/
package org.apache.ignite.internal.management.cache;

import java.io.Serializable;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.dto.IgniteDataTransferObject;
import org.apache.ignite.internal.processors.cache.verify.PartitionHashRecord;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageFactory;

/**
* Encapsulates result of {@link VerifyBackupPartitionsDumpTask}.
*/
public class IdleVerifyDumpResult extends IgniteDataTransferObject {
public class IdleVerifyDumpResult implements Message, Serializable {
/** */
private static final long serialVersionUID = 0L;

Expand All @@ -43,9 +45,10 @@ public IdleVerifyDumpResult(LinkedHashMap<PartitionKey, List<PartitionHashRecord
}

/**
* Default constructor for Externalizable.
* Default constructor for a {@link MessageFactory}.
*/
public IdleVerifyDumpResult() {
// No-op.
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.ignite.internal.management.cache;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -31,21 +32,25 @@
import java.util.stream.Collectors;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.dto.IgniteDataTransferObject;
import org.apache.ignite.internal.managers.communication.ErrorMessage;
import org.apache.ignite.internal.processors.cache.verify.PartitionHashRecord;
import org.apache.ignite.internal.processors.cache.verify.TransactionsHashRecord;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
import org.jetbrains.annotations.Nullable;

import static org.apache.ignite.internal.util.IgniteUtils.nl;

/**
* Encapsulates result of {@link VerifyBackupPartitionsTask}.
*/
public class IdleVerifyResult extends IgniteDataTransferObject {
public class IdleVerifyResult implements Message, Serializable {
/** */
private static final long serialVersionUID = 0L;

Expand Down Expand Up @@ -77,17 +82,18 @@ public class IdleVerifyResult extends IgniteDataTransferObject {
/** Partial committed transactions. */
@Order(5)
@GridToStringInclude
@Nullable Map<ClusterNode, Collection<GridCacheVersion>> partiallyCommittedTxs;
@Nullable Map<TcpDiscoveryNode, Collection<GridCacheVersion>> partiallyCommittedTxs;

/** Exceptions. */
@Order(6)
@GridToStringInclude
Map<ClusterNode, Exception> exceptions;
Map<TcpDiscoveryNode, ErrorMessage> exceptions;

/**
* Default constructor for Externalizable.
* Default constructor for a {@link MessageFactory}.
*/
public IdleVerifyResult() {
// No-op.
}

/**
Expand All @@ -107,9 +113,22 @@ private IdleVerifyResult(
this.movingPartitions = movingPartitions;
this.lostPartitions = lostPartitions;
this.txHashConflicts = txHashConflicts;
this.partiallyCommittedTxs = partiallyCommittedTxs;

this.exceptions = exceptions;
if (!F.isEmpty(partiallyCommittedTxs)) {
// May consist of ZookeeperClusterNode
if (F.first(partiallyCommittedTxs.keySet()) instanceof TcpDiscoveryNode)
this.partiallyCommittedTxs = TcpDiscoveryNode.downcast(partiallyCommittedTxs);
else {
this.partiallyCommittedTxs = U.newHashMap(partiallyCommittedTxs.size());
partiallyCommittedTxs.forEach((n, cl) -> this.partiallyCommittedTxs.put(TcpDiscoveryNode.of(n), cl));
}
}

if (!F.isEmpty(exceptions)) {
this.exceptions = U.newHashMap(exceptions.size());
// May consist of ZookeeperClusterNode
exceptions.forEach((n, e) -> this.exceptions.put(TcpDiscoveryNode.of(n), new ErrorMessage(e)));
}
}

/**
Expand Down Expand Up @@ -152,7 +171,14 @@ public boolean hasConflicts() {
* @return Exceptions on nodes.
*/
public Map<ClusterNode, Exception> exceptions() {
return exceptions;
if (F.isEmpty(exceptions))
return Collections.emptyMap();

Map<ClusterNode, Exception> res = TcpDiscoveryNode.upcast(U.newHashMap(exceptions.size()));

exceptions.forEach((n, errMsg) -> res.put(n, (Exception)ErrorMessage.error(errMsg)));

return res;
}

/**
Expand Down Expand Up @@ -183,7 +209,7 @@ public void print(Consumer<String> printer, boolean printExceptionMessages) {

printer.accept("The check procedure failed on " + size + " node" + (size == 1 ? "" : "s") + ".\n");

if (!F.isEmpty(F.view(exceptions.values(), e -> e instanceof NoMatchingCachesException)))
if (!F.isEmpty(F.view(exceptions.values(), errMsg -> ErrorMessage.error(errMsg) instanceof NoMatchingCachesException)))
printer.accept("\nThere are no caches matching given filter options.\n");

printer.accept("\nThe check procedure failed on nodes:\n");
Expand Down Expand Up @@ -234,8 +260,8 @@ private void printConflicts(Consumer<String> printer) {

printer.accept("The check procedure has failed, conflict partitions has been found: [" +
"counterConflicts=" + cntrConflictsSize + ", hashConflicts=" + hashConflictsSize
+ (txHashConflicts == null ? "" : ", txHashConflicts=" + txHashConflicts.size())
+ (partiallyCommittedTxs == null ? "" : ", partiallyCommittedSize=" + partiallyCommittedTxs.size())
+ (F.isEmpty(txHashConflicts) ? "" : ", txHashConflicts=" + txHashConflicts.size())
+ (F.isEmpty(partiallyCommittedTxs) ? "" : ", partiallyCommittedSize=" + partiallyCommittedTxs.size())
+ "]" + nl());

Set<PartitionKey> allConflicts = new HashSet<>();
Expand Down Expand Up @@ -276,7 +302,7 @@ private void printConflicts(Consumer<String> printer) {
if (!F.isEmpty(partiallyCommittedTxs)) {
printer.accept("Partially committed transactions:" + nl());

for (Map.Entry<ClusterNode, Collection<GridCacheVersion>> entry : partiallyCommittedTxs.entrySet()) {
for (Map.Entry<TcpDiscoveryNode, Collection<GridCacheVersion>> entry : partiallyCommittedTxs.entrySet()) {
printer.accept("Node: " + entry.getKey() + nl());

printer.accept("Transactions: " + entry.getValue() + nl());
Expand Down Expand Up @@ -318,7 +344,7 @@ private void printConflicts(Consumer<String> printer) {

return Objects.equals(cntrConflicts, v.cntrConflicts) && Objects.equals(hashConflicts, v.hashConflicts) &&
Objects.equals(movingPartitions, v.movingPartitions) && Objects.equals(lostPartitions, v.lostPartitions) &&
Objects.equals(exceptions, v.exceptions) && Objects.equals(txHashConflicts, v.txHashConflicts) &&
Objects.equals(exceptions(), v.exceptions()) && Objects.equals(txHashConflicts, v.txHashConflicts) &&
Objects.equals(partiallyCommittedTxs, v.partiallyCommittedTxs);
}

Expand All @@ -329,7 +355,7 @@ private void printConflicts(Consumer<String> printer) {
hashConflicts,
movingPartitions,
lostPartitions,
exceptions,
exceptions(),
txHashConflicts,
partiallyCommittedTxs);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,34 @@
package org.apache.ignite.internal.management.cache;

import java.io.Serializable;
import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageFactory;

/**
* Partition key - pair of cache group ID and partition ID.
*/
public class PartitionKey implements Serializable {
public class PartitionKey implements Message, Serializable {
/** */
private static final long serialVersionUID = 0L;

/** Group id. */
private final int grpId;
@Order(0)
int grpId;

/** Group name. Optional field, used only for output. */
private final String grpName;
@Order(1)
String grpName;

/** Partition id. */
private final int partId;
@Order(2)
int partId;

/** Empty constructor for a {@link MessageFactory}. */
public PartitionKey() {
// No-op.
}

/**
* @param grpId Group id.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.ignite.internal.managers.communication;

import java.io.Serializable;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.MarshallableMessage;
import org.apache.ignite.internal.Order;
Expand All @@ -30,7 +31,10 @@
* Message used to transfer {@link Throwable} objects.
*/
@SuppressWarnings({"NullableProblems", "unused"})
public class ErrorMessage implements MarshallableMessage {
public class ErrorMessage implements MarshallableMessage, Serializable {
/** */
private static final long serialVersionUID = 0L;

/** Error bytes. */
@Order(0)
@GridToStringExclude
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -747,6 +747,7 @@ private void processFailedMessage(UUID nodeId,
throws IgniteCheckedException {
assert msg != null;

// TODO: Do not use raw numbers, https://issues.apache.org/jira/browse/IGNITE-28636
switch (msg.directType()) {
case 10022: {
GridDhtLockRequest req = (GridDhtLockRequest)msg;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,37 +18,28 @@
package org.apache.ignite.internal.processors.cache.persistence.snapshot;

import java.util.Collection;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.MarshallableMessage;
import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.managers.communication.ErrorMessage;
import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
import org.apache.ignite.internal.processors.cache.verify.PartitionHashRecord;
import org.apache.ignite.internal.processors.cache.verify.TransactionsHashRecord;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageFactory;

/** */
public class IncrementalSnapshotVerifyResult implements MarshallableMessage {
public class IncrementalSnapshotVerifyResult implements Message {
/** Transaction hashes collection. */
private Collection<TransactionsHashRecord> txHashRes;

/** */
@Order(0)
byte[] txHashResBytes;
Collection<TransactionsHashRecord> txHashRes;

/**
* Partition hashes collection. Value is a hash of data entries {@link DataEntry} from WAL segments included
* into the incremental snapshot.
*/
private Collection<PartitionHashRecord> partHashRes;

/** */
@Order(1)
byte[] partHashResBytes;
Collection<PartitionHashRecord> partHashRes;

/** Partially committed transactions' collection. */
@Order(2)
Expand All @@ -73,7 +64,9 @@ public IncrementalSnapshotVerifyResult() {
this.txHashRes = txHashRes;
this.partHashRes = partHashRes;
this.partiallyCommittedTxs = partiallyCommittedTxs;
this.exceptions = exceptions == null ? null : F.viewReadOnly(exceptions, ErrorMessage::new);

if (!F.isEmpty(exceptions))
this.exceptions = F.viewReadOnly(exceptions, ErrorMessage::new);
}

/** */
Expand All @@ -90,20 +83,4 @@ public Collection<TransactionsHashRecord> txHashRes() {
public Collection<GridCacheVersion> partiallyCommittedTxs() {
return partiallyCommittedTxs;
}

/** {@inheritDoc} */
@Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException {
txHashResBytes = U.marshal(marsh, txHashRes);
partHashResBytes = U.marshal(marsh, partHashRes);
}

/** {@inheritDoc} */
@Override public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) throws IgniteCheckedException {
if (txHashResBytes != null)
txHashRes = U.unmarshal(marsh, txHashResBytes, clsLdr);

if (partHashResBytes != null)
partHashRes = U.unmarshal(marsh, partHashResBytes, clsLdr);
}

}
Loading
Loading