Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@

import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessageDelivery;
import org.apache.cassandra.utils.TimeUUID;

public final class BatchRemoveVerbHandler implements IVerbHandler<TimeUUID>
{
public static final BatchRemoveVerbHandler instance = new BatchRemoveVerbHandler();

public void doVerb(Message<TimeUUID> message)
public void doVerb(MessageDelivery messaging, Message<TimeUUID> message)
{
BatchlogManager.remove(message.payload);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@

import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.MessageDelivery;

public final class BatchStoreVerbHandler implements IVerbHandler<Batch>
{
public static final BatchStoreVerbHandler instance = new BatchStoreVerbHandler();

public void doVerb(Message<Batch> message)
public void doVerb(MessageDelivery messaging, Message<Batch> message)
{
BatchlogManager.store(message.payload);
MessagingService.instance().send(message.emptyResponse(), message.from());
messaging.send(message.emptyResponse(), message.from());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import org.apache.cassandra.metrics.TCMMetrics;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.MessageDelivery;
import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.tcm.ClusterMetadata;
Expand All @@ -49,12 +49,12 @@ public abstract class AbstractMutationVerbHandler<T extends IMutation> implement
private static final Logger logger = LoggerFactory.getLogger(AbstractMutationVerbHandler.class);
private static final String logMessageTemplate = "Received mutation from {} for token {} outside valid range for keyspace {}";

public void doVerb(Message<T> message) throws IOException
public void doVerb(MessageDelivery messaging, Message<T> message) throws IOException
{
processMessage(message, message.respondTo());
processMessage(messaging, message, message.respondTo());
}

protected void processMessage(Message<T> message, InetAddressAndPort respondTo)
protected void processMessage(MessageDelivery messaging, Message<T> message, InetAddressAndPort respondTo)
{
if (message.epoch().isAfter(Epoch.FIRST))
{
Expand All @@ -65,17 +65,17 @@ protected void processMessage(Message<T> message, InetAddressAndPort respondTo)

try
{
applyMutation(message, respondTo);
applyMutation(messaging, message, respondTo);
}
catch (RetryOnDifferentSystemException e)
{
logger.debug("Responding with retry on different system");
MessagingService.instance().respondWithFailure(RETRY_ON_DIFFERENT_TRANSACTION_SYSTEM, message);
messaging.respondWithFailure(RETRY_ON_DIFFERENT_TRANSACTION_SYSTEM, message);
Tracing.trace("Payload application resulted in RetryOnDifferentSysten");
}
}

abstract void applyMutation(Message<T> message, InetAddressAndPort respondToAddress);
abstract void applyMutation(MessageDelivery messaging, Message<T> message, InetAddressAndPort respondToAddress);

private ClusterMetadata checkTokenOwnership(ClusterMetadata metadata, Message<T> message, InetAddressAndPort respondTo)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.MessageDelivery;
import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.transport.Dispatcher;

Expand All @@ -33,7 +33,7 @@ public class CounterMutationVerbHandler extends AbstractMutationVerbHandler<Coun

private static final Logger logger = LoggerFactory.getLogger(CounterMutationVerbHandler.class);

protected void applyMutation(final Message<CounterMutation> message, InetAddressAndPort respondToAddress)
protected void applyMutation(MessageDelivery messaging, final Message<CounterMutation> message, InetAddressAndPort respondToAddress)
{
final CounterMutation cm = message.payload;
logger.trace("Applying forwarded {}", cm);
Expand All @@ -48,7 +48,7 @@ protected void applyMutation(final Message<CounterMutation> message, InetAddress
// it's own in that case.
StorageProxy.applyCounterMutationOnLeader(cm,
localDataCenter,
() -> MessagingService.instance().send(message.emptyResponse(), respondToAddress),
() -> messaging.send(message.emptyResponse(), respondToAddress),
Dispatcher.RequestTime.forImmediateExecution());
}
}
19 changes: 10 additions & 9 deletions src/java/org/apache/cassandra/db/MutationVerbHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.ForwardingInfo;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessageDelivery;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.ParamType;
import org.apache.cassandra.tracing.Tracing;
Expand All @@ -35,21 +36,21 @@ public class MutationVerbHandler extends AbstractMutationVerbHandler<Mutation>
{
public static final MutationVerbHandler instance = new MutationVerbHandler();

private void respond(Message<?> respondTo, InetAddressAndPort respondToAddress, Map<ParamType, Object> params)
private void respond(MessageDelivery messaging, Message<?> respondTo, InetAddressAndPort respondToAddress, Map<ParamType, Object> params)
{
Tracing.trace("Enqueuing response to {}", respondToAddress);
Message<?> response = respondTo.emptyResponse();
if (!params.isEmpty())
response = response.withParams(params);
MessagingService.instance().send(response, respondToAddress);
messaging.send(response, respondToAddress);
}

private void failed()
{
Tracing.trace("Payload application resulted in WriteTimeout, not replying");
}

public void doVerb(Message<Mutation> message)
public void doVerb(MessageDelivery messaging, Message<Mutation> message)
{
if (approxTime.now() > message.expiresAtNanos())
{
Expand All @@ -64,26 +65,26 @@ public void doVerb(Message<Mutation> message)

ForwardingInfo forwardTo = message.forwardTo();
if (forwardTo != null)
forwardToLocalNodes(message, forwardTo);
forwardToLocalNodes(messaging, message, forwardTo);

InetAddressAndPort respondToAddress = message.respondTo();
try
{
processMessage(message, respondToAddress);
processMessage(messaging, message, respondToAddress);
}
catch (WriteTimeoutException wto)
{
failed();
}
}

protected void applyMutation(Message<Mutation> message, InetAddressAndPort respondToAddress)
protected void applyMutation(MessageDelivery messaging, Message<Mutation> message, InetAddressAndPort respondToAddress)
{
Map<ParamType, Object> params = MessageParams.capture();
message.payload.applyFuture().addCallback(o -> respond(message, respondToAddress, params), wto -> failed());
message.payload.applyFuture().addCallback(o -> respond(messaging, message, respondToAddress, params), wto -> failed());
}

private static void forwardToLocalNodes(Message<Mutation> originalMessage, ForwardingInfo forwardTo)
private static void forwardToLocalNodes(MessageDelivery messaging, Message<Mutation> originalMessage, ForwardingInfo forwardTo)
{
Message.Builder<Mutation> builder =
Message.builder(originalMessage)
Expand All @@ -96,7 +97,7 @@ private static void forwardToLocalNodes(Message<Mutation> originalMessage, Forwa
forwardTo.forEach((id, target) ->
{
Tracing.trace("Enqueuing forwarded write to {}", target);
MessagingService.instance().send(message, target);
messaging.send(message, target);
});
}
}
9 changes: 5 additions & 4 deletions src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.cassandra.metrics.TCMMetrics;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessageDelivery;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.service.StorageService;
Expand Down Expand Up @@ -65,7 +66,7 @@ public ReadResponse doRead(ReadCommand command, boolean trackRepairedData)
return response;
}

public void doVerb(Message<ReadCommand> message)
public void doVerb(MessageDelivery messaging, Message<ReadCommand> message)
{
if (message.epoch().isAfter(Epoch.EMPTY))
{
Expand Down Expand Up @@ -100,13 +101,13 @@ public void doVerb(Message<ReadCommand> message)
Message<ReadResponse> reply = message.responseWith(response);
reply = MessageParams.addToMessage(reply);

MessagingService.instance().send(reply, message.from());
messaging.send(reply, message.from());
return;
}
catch (RetryOnDifferentSystemException e)
{
logger.debug("Responding with retry on different system");
MessagingService.instance().respondWithFailure(RETRY_ON_DIFFERENT_TRANSACTION_SYSTEM, message);
messaging.respondWithFailure(RETRY_ON_DIFFERENT_TRANSACTION_SYSTEM, message);
Tracing.trace("Payload application resulted in RetryOnDifferentSysten");
return;
}
Expand All @@ -126,7 +127,7 @@ public void doVerb(Message<ReadCommand> message)
Tracing.trace("Enqueuing response to {}", message.from());
Message<ReadResponse> reply = message.responseWith(response);
reply = MessageParams.addToMessage(reply);
MessagingService.instance().send(reply, message.from());
messaging.send(reply, message.from());
}
else
{
Expand Down
6 changes: 3 additions & 3 deletions src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.MessageDelivery;

public class ReadRepairVerbHandler extends AbstractMutationVerbHandler<Mutation>
{
Expand All @@ -30,9 +30,9 @@ public void applyMutation(Mutation mutation)
mutation.apply();
}

void applyMutation(Message<Mutation> message, InetAddressAndPort respondToAddress)
void applyMutation(MessageDelivery messaging, Message<Mutation> message, InetAddressAndPort respondToAddress)
{
applyMutation(message.payload);
MessagingService.instance().send(message.emptyResponse(), respondToAddress);
messaging.send(message.emptyResponse(), respondToAddress);
}
}
6 changes: 3 additions & 3 deletions src/java/org/apache/cassandra/db/TruncateVerbHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.MessageDelivery;
import org.apache.cassandra.tracing.Tracing;

public class TruncateVerbHandler implements IVerbHandler<TruncateRequest>
Expand All @@ -31,7 +31,7 @@ public class TruncateVerbHandler implements IVerbHandler<TruncateRequest>

private static final Logger logger = LoggerFactory.getLogger(TruncateVerbHandler.class);

public void doVerb(Message<TruncateRequest> message)
public void doVerb(MessageDelivery messaging, Message<TruncateRequest> message)
{
TruncateRequest truncation = message.payload;
Tracing.trace("Applying truncation of {}.{}", truncation.keyspace, truncation.table);
Expand All @@ -43,6 +43,6 @@ public void doVerb(Message<TruncateRequest> message)
TruncateResponse response = new TruncateResponse(truncation.keyspace, truncation.table, true);
if (logger.isTraceEnabled())
logger.trace("{} applied. Enqueuing response to {}@{} ", truncation, message.id(), message.from());
MessagingService.instance().send(message.responseWith(response), message.from());
messaging.send(message.responseWith(response), message.from());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessageDelivery;
import org.apache.cassandra.schema.Schema;

public class CompressionDictionaryUpdateVerbHandler implements IVerbHandler<CompressionDictionaryUpdateMessage>
Expand All @@ -34,7 +35,7 @@ public class CompressionDictionaryUpdateVerbHandler implements IVerbHandler<Comp
private CompressionDictionaryUpdateVerbHandler() {}

@Override
public void doVerb(Message<CompressionDictionaryUpdateMessage> message)
public void doVerb(MessageDelivery messaging, Message<CompressionDictionaryUpdateMessage> message)
{
CompressionDictionaryUpdateMessage payload = message.payload;

Expand Down
5 changes: 2 additions & 3 deletions src/java/org/apache/cassandra/db/virtual/VirtualMutation.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.NoPayload;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.service.ClientState;
Expand Down Expand Up @@ -77,9 +76,9 @@ public long serializedSize(VirtualMutation t, int version)
}
};

public static final IVerbHandler<VirtualMutation> handler = message -> {
public static final IVerbHandler<VirtualMutation> handler = (messaging, message) -> {
message.payload.apply();
MessagingService.instance().respond(NoPayload.noPayload, message);
messaging.respond(NoPayload.noPayload, message);
};

private final String keyspaceName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,15 @@

import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessageDelivery;

public class GossipDigestAck2VerbHandler extends GossipVerbHandler<GossipDigestAck2>
{
public static final GossipDigestAck2VerbHandler instance = new GossipDigestAck2VerbHandler();

private static final Logger logger = LoggerFactory.getLogger(GossipDigestAck2VerbHandler.class);

public void doVerb(Message<GossipDigestAck2> message)
public void doVerb(MessageDelivery messaging, Message<GossipDigestAck2> message)
{
if (logger.isTraceEnabled())
{
Expand All @@ -48,6 +49,6 @@ public void doVerb(Message<GossipDigestAck2> message)
Gossiper.instance.notifyFailureDetector(remoteEpStateMap);
Gossiper.instance.applyStateLocally(remoteEpStateMap);

super.doVerb(message);
super.doVerb(messaging, message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.MessageDelivery;

import static org.apache.cassandra.net.Verb.GOSSIP_DIGEST_ACK2;
import static org.apache.cassandra.utils.Clock.Global.nanoTime;
Expand All @@ -37,7 +37,7 @@ public class GossipDigestAckVerbHandler extends GossipVerbHandler<GossipDigestAc

private static final Logger logger = LoggerFactory.getLogger(GossipDigestAckVerbHandler.class);

public void doVerb(Message<GossipDigestAck> message)
public void doVerb(MessageDelivery messaging, Message<GossipDigestAck> message)
{
InetAddressAndPort from = message.from();
logger.trace("Received a GossipDigestAckMessage from {}", from);
Expand Down Expand Up @@ -87,8 +87,8 @@ public void doVerb(Message<GossipDigestAck> message)

Message<GossipDigestAck2> gDigestAck2Message = Message.out(GOSSIP_DIGEST_ACK2, new GossipDigestAck2(deltaEpStateMap));
logger.trace("Sending a GossipDigestAck2Message to {}", from);
MessagingService.instance().send(gDigestAck2Message, from);
messaging.send(gDigestAck2Message, from);

super.doVerb(message);
super.doVerb(messaging, message);
}
}
Loading