Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,8 @@ public enum CassandraRelevantProperties
MONITORING_MAX_OPERATIONS("cassandra.monitoring_max_operations", "50"),
/** Defines the interval for reporting any operations that have timed out. */
MONITORING_REPORT_INTERVAL_MS("cassandra.monitoring_report_interval_ms", "5000"),
/** The most recent version that's supported by all nodes in the cluster **/
MT_CLUSTER_SAFE_VERSION("cassandra.mutation_tracking.cluster_safe_version"),
MV_ALLOW_FILTERING_NONKEY_COLUMNS_UNSAFE("cassandra.mv.allow_filtering_nonkey_columns_unsafe"),
MV_ENABLE_COORDINATOR_BATCHLOG("cassandra.mv_enable_coordinator_batchlog"),
/** mx4jaddress */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ else if (localComparisonEpoch.isAfter(readCommand.serializedAtEpoch()))

private ClusterMetadata checkTokenOwnership(ClusterMetadata metadata, Message<T> message)
{
boolean acceptsTransient = message.verb() == Verb.TRACKED_SUMMARY_REQ;
boolean acceptsTransient = message.verb() == Verb.MT_SUMMARY_REQ;
ReadCommand command = getCommand(message.payload);

if (command.metadata().isVirtual())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,10 @@ public void serialize(EmbeddableSinglePartitionReadCommand command, DataOutputPl
ReadCommand.serializer.serialize((ReadCommand) command, out, version);
break;
case TRACKED_DATA:
DataRequest.serializer.serialize((DataRequest) command, out, version);
DataRequest.embedded.serialize((DataRequest) command, out, version);
break;
case TRACKED_SUMMARY:
SummaryRequest.serializer.serialize((SummaryRequest) command, out, version);
SummaryRequest.embedded.serialize((SummaryRequest) command, out, version);
break;
default:
throw new IllegalStateException("Unhandled kind: " + command.kind());
Expand All @@ -87,9 +87,9 @@ public EmbeddableSinglePartitionReadCommand deserialize(DataInputPlus in, int ve
case UNTRACKED:
return (SinglePartitionReadCommand)ReadCommand.serializer.deserialize(in, version);
case TRACKED_DATA:
return DataRequest.serializer.deserialize(in, version);
return DataRequest.embedded.deserialize(in, version);
case TRACKED_SUMMARY:
return SummaryRequest.serializer.deserialize(in, version);
return SummaryRequest.embedded.deserialize(in, version);
default:
throw new IllegalStateException("Unhandled kind: " + kind);
}
Expand All @@ -109,9 +109,9 @@ public long serializedSize(EmbeddableSinglePartitionReadCommand command, int ver
case UNTRACKED:
return size + ReadCommand.serializer.serializedSize((ReadCommand) command, version);
case TRACKED_DATA:
return size + DataRequest.serializer.serializedSize((DataRequest) command, version);
return size + DataRequest.embedded.serializedSize((DataRequest) command, version);
case TRACKED_SUMMARY:
return size + SummaryRequest.serializer.serializedSize((SummaryRequest) command, version);
return size + SummaryRequest.embedded.serializedSize((SummaryRequest) command, version);
default:
throw new IllegalStateException("Unhandled kind: " + command.kind());
}
Expand Down
12 changes: 6 additions & 6 deletions src/java/org/apache/cassandra/db/IReadResponse.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ public void serialize(IReadResponse response, DataOutputPlus out, int version) t
ReadResponse.serializer.serialize((ReadResponse) response, out, version);
break;
case TRACKED_DATA:
TrackedDataResponse.serializer.serialize((TrackedDataResponse) response, out, version);
TrackedDataResponse.embedded.serialize((TrackedDataResponse) response, out, version);
break;
case TRACKED_SUMMARY:
TrackedSummaryResponse.serializer.serialize((TrackedSummaryResponse) response, out, version);
TrackedSummaryResponse.embedded.serialize((TrackedSummaryResponse) response, out, version);
break;
default:
throw new IllegalStateException("Unhandled kind: " + response.kind());
Expand All @@ -70,9 +70,9 @@ public IReadResponse deserialize(DataInputPlus in, int version) throws IOExcepti
case UNTRACKED:
return ReadResponse.serializer.deserialize(in, version);
case TRACKED_DATA:
return TrackedDataResponse.serializer.deserialize(in, version);
return TrackedDataResponse.embedded.deserialize(in, version);
case TRACKED_SUMMARY:
return TrackedSummaryResponse.serializer.deserialize(in, version);
return TrackedSummaryResponse.embedded.deserialize(in, version);
default:
throw new IllegalStateException("Unhandled kind: " + kind);
}
Expand All @@ -93,9 +93,9 @@ public long serializedSize(IReadResponse response, int version)
case UNTRACKED:
return size + ReadResponse.serializer.serializedSize((ReadResponse) response, version);
case TRACKED_DATA:
return size + TrackedDataResponse.serializer.serializedSize((TrackedDataResponse) response, version);
return size + TrackedDataResponse.embedded.serializedSize((TrackedDataResponse) response, version);
case TRACKED_SUMMARY:
return size + TrackedSummaryResponse.serializer.serializedSize((TrackedSummaryResponse) response, version);
return size + TrackedSummaryResponse.embedded.serializedSize((TrackedSummaryResponse) response, version);
default:
throw new IllegalStateException("Unhandled kind: " + response.kind());
}
Expand Down
65 changes: 38 additions & 27 deletions src/java/org/apache/cassandra/db/Mutation.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,8 @@

public class Mutation implements IMutation, Supplier<Mutation>, Commitable
{
public static final MutationSerializer serializer = new MutationSerializer();
public static final int ALLOW_POTENTIAL_TRANSACTION_CONFLICTS = 0x01;

public static final int HAS_MUTATION_ID = 0x02;

private final MutationId id;
// todo this is redundant
Expand Down Expand Up @@ -627,17 +626,20 @@ static void serializeInternal(PartitionUpdate.PartitionUpdateSerializer serializ
DataOutputPlus out,
int version) throws IOException
{
Map<TableId, PartitionUpdate> modifications = mutation.modifications;
boolean hasMutationId = version >= VERSION_61 && !mutation.id.isNone();

if (version >= VERSION_60)
{
int flags = 0;
flags |= potentialTxnConflictsFlag(mutation.potentialTxnConflicts);
if (hasMutationId) flags |= HAS_MUTATION_ID;
out.write(flags);
}

if (version >= MessagingService.VERSION_61)
MutationId.serializer.serialize(mutation.id, out, version);
if (hasMutationId)
MutationId.serializer.serialize(mutation.id, out);

Map<TableId, PartitionUpdate> modifications = mutation.modifications;

/* serialize the modifications in the mutation */
int size = modifications.size();
Expand All @@ -659,15 +661,15 @@ public Mutation deserialize(DataInputPlus in, int version, DeserializationHelper
teeIn = new TeeDataInputPlus(in, dob, CACHEABLE_MUTATION_SIZE_LIMIT);

PotentialTxnConflicts potentialTxnConflicts = PotentialTxnConflicts.DISALLOW;
boolean hasMutationId = false;
if (version >= VERSION_60)
{
int flags = teeIn.readByte();
int flags = teeIn.readUnsignedByte();
potentialTxnConflicts = potentialTxnConflicts(flags);
hasMutationId = version >= VERSION_61 && (flags & HAS_MUTATION_ID) == HAS_MUTATION_ID;
}

MutationId id = version >= MessagingService.VERSION_61
? MutationId.serializer.deserialize(teeIn, version)
: MutationId.none();
MutationId id = hasMutationId ? MutationId.serializer.deserialize(teeIn) : MutationId.none();

int size = teeIn.readUnsignedVInt32();
assert size > 0;
Expand Down Expand Up @@ -705,32 +707,37 @@ public Mutation deserialize(DataInputPlus in, int version, DeserializationHelper
*/
public Pair<DecoratedKey, TableMetadata> deserializeKeyAndTableMetadata(DataInputBuffer in, int version, DeserializationHelper.Flag flag) throws IOException
{
if (version >= VERSION_60)
in.skipBytes(1); // potentialTxnConflicts

if (version >= VERSION_61)
MutationId.serializer.skip(in, version);

skipHeader(in, version);
int size = in.readUnsignedVInt32();
assert size > 0;

return PartitionUpdate.serializer.deserializeMetadataAndKey(in, version, flag);
}

public TableId deserializeTableId(DataInputBuffer in, int version, DeserializationHelper.Flag flag) throws IOException
/**
* Return first (out of potentially multiple) table ids in this mutation.
*/
public TableId deserializeFirstTableId(DataInputBuffer in, int version, DeserializationHelper.Flag flag) throws IOException
{
if (version >= VERSION_60)
in.skipBytes(1); // flags

if (version >= VERSION_61)
MutationId.serializer.skip(in, version);

skipHeader(in, version);
int size = in.readUnsignedVInt32();
assert size > 0;

return PartitionUpdate.serializer.deserializeTableId(in, version, flag);
}

private void skipHeader(DataInputBuffer in, int version) throws IOException
{
boolean hasMutationId = false;
if (version >= VERSION_60)
{
int flags = in.readUnsignedByte();
if (version >= VERSION_61)
hasMutationId = (flags & HAS_MUTATION_ID) == HAS_MUTATION_ID;
}

if (hasMutationId)
MutationId.serializer.skip(in);
}

public Mutation deserialize(DataInputPlus in, int version) throws IOException
{
return deserialize(in, version, DeserializationHelper.Flag.FROM_REMOTE);
Expand All @@ -742,6 +749,8 @@ public long serializedSize(Mutation mutation, int version)
}
}

public static final MutationSerializer serializer = new MutationSerializer();

/**
* There are two implementations of this class. One that keeps the serialized representation on-heap for later
* reuse and one that doesn't. Keeping all sized mutations around may lead to "bad" GC pressure (G1 GC) due to humongous objects.
Expand Down Expand Up @@ -799,9 +808,11 @@ long serializedSize(PartitionUpdate.PartitionUpdateSerializer serializer, Mutati
if (size == 0L)
{
if (version >= VERSION_60)
size += TypeSizes.sizeof((byte)ALLOW_POTENTIAL_TRANSACTION_CONFLICTS); // flags
if (version >= MessagingService.VERSION_61)
size += MutationId.serializer.serializedSize(mutation.id, version);
size += TypeSizes.BYTE_SIZE; // flags

if (version >= MessagingService.VERSION_61 && !mutation.id.isNone())
size += MutationId.serializer.serializedSize(mutation.id);

size += TypeSizes.sizeofUnsignedVInt(mutation.modifications.size());
for (PartitionUpdate partitionUpdate : mutation.modifications.values())
size += serializer.serializedSize(partitionUpdate, version);
Expand Down
5 changes: 3 additions & 2 deletions src/java/org/apache/cassandra/db/SystemKeyspace.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.db.rows.Rows;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.LocalPartitioner;
import org.apache.cassandra.dht.Range;
Expand Down Expand Up @@ -2027,7 +2028,7 @@ public static ByteBuffer rangeToBytes(Range<Token> range)
// pass 0 as the version to trigger that legacy code.
// In the future, it might be worth switching to a stable text format for the ranges to 1) save that and 2)
// be more user friendly (the serialization format we currently use is pretty custom).
Range.tokenSerializer.serialize(range, out, 0);
AbstractBounds.tokenSerializer.serialize(range, out, 0);
return out.buffer();
}
catch (IOException e)
Expand All @@ -2042,7 +2043,7 @@ private static Range<Token> byteBufferToRange(ByteBuffer rawRange, IPartitioner
try (DataInputPlus.DataInputStreamPlus in = new DataInputBuffer(ByteBufferUtil.getArray(rawRange)))
{
// See rangeToBytes above for why version is 0.
return (Range<Token>) Range.tokenSerializer.deserialize(in, partitioner, 0);
return (Range<Token>) AbstractBounds.tokenSerializer.deserialize(in, partitioner, 0);
}
catch (IOException e)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.utils.CollectionSerializers;
import org.apache.cassandra.utils.StringSerializer;

public class FunctionExecutionException extends RequestExecutionException
{
Expand Down Expand Up @@ -63,7 +64,7 @@ protected void serializeSpecificFields(DataOutputPlus out, int version) throws I
if (functionName.keyspace != null)
out.writeUTF(functionName.keyspace);
out.writeUTF(functionName.name);
CollectionSerializers.serializeList(argTypes, out, version, CollectionSerializers.STRING_SERIALIZER);
CollectionSerializers.serializeList(argTypes, out, version, StringSerializer.instance);
out.writeUTF(detail);
}

Expand All @@ -74,7 +75,7 @@ protected long serializedSizeSpecificFields(int version)
if (functionName.keyspace != null)
size += TypeSizes.sizeof(functionName.keyspace);
size += TypeSizes.sizeof(functionName.name);
size += CollectionSerializers.serializedListSize(argTypes, version, CollectionSerializers.STRING_SERIALIZER);
size += CollectionSerializers.serializedListSize(argTypes, version, StringSerializer.instance);
size += TypeSizes.sizeof(detail);
return size;
}
Expand All @@ -83,7 +84,7 @@ static FunctionExecutionException deserializeFields(String message, DataInputPlu
{
String keyspace = in.readBoolean() ? in.readUTF() : null;
String name = in.readUTF();
List<String> argTypes = CollectionSerializers.deserializeList(in, version, CollectionSerializers.STRING_SERIALIZER);
List<String> argTypes = CollectionSerializers.deserializeList(in, version, StringSerializer.instance);
String detail = in.readUTF();
return new FunctionExecutionException(new FunctionName(keyspace, name), argTypes, detail);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.io;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;

import org.apache.cassandra.io.util.DataInputBuffer;
Expand All @@ -42,17 +40,6 @@ default ByteBuffer serialize(In t, Version version) throws IOException
}
}

default ByteBuffer serializeUnchecked(In t, Version version)
{
try
{
return serialize(t, version);
}
catch (IOException e)
{
throw new UncheckedIOException(e);
}
}
Out deserialize(DataInputPlus in, Version version) throws IOException;

default void skip(DataInputPlus in, Version version) throws IOException
Expand All @@ -68,17 +55,6 @@ default Out deserialize(ByteBuffer buffer, Version version) throws IOException
}
}

default Out deserializeUnchecked(ByteBuffer buffer, Version version)
{
try
{
return deserialize(buffer, version);
}
catch (IOException e)
{
throw new UncheckedIOException(e);
}
}
long serializedSize(In t, Version version);

static <In, Out, Version> AsymmetricVersionedSerializer<In, Out, Version> from(AsymmetricUnversionedSerializer<In, Out> delegate)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.io;

import java.io.IOException;

import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;

// TODO (expected): so, this is *not* ok if a major upgrade happens?
// depending on the messaging version, we should be using the safest most recent supported Version
// corresponding to the negotiated messaging version.
public class EmbeddedAsymmetricVersionedSerializer<In, Out, Version> implements IVersionedAsymmetricSerializer<In, Out>, AsymmetricUnversionedSerializer<In, Out>
{
private final Version version;
Expand Down Expand Up @@ -64,11 +66,6 @@ public Out deserialize(DataInputPlus in) throws IOException
return delegate.deserialize(in, version);
}

public Version deserializeVersion(DataInputPlus in) throws IOException
{
return versionSerializer.deserialize(in);
}

@Override
public long serializedSize(In t, int msgVersion)
{
Expand All @@ -81,4 +78,32 @@ public long serializedSize(In t)
return versionSerializer.serializedSize(version)
+ delegate.serializedSize(t, version);
}

public static <A, B> IVersionedAsymmetricSerializer<A, B> accordEmbedded(AsymmetricVersionedSerializer<A, B, org.apache.cassandra.service.accord.serializers.Version> delegate)
{
return new EmbeddedAsymmetricVersionedSerializer<>(
org.apache.cassandra.service.accord.serializers.Version.CLUSTER_SAFE_VERSION,
org.apache.cassandra.service.accord.serializers.Version.Serializer.instance,
delegate
);
}

public static <A, B> IVersionedAsymmetricSerializer<A, B> accordEmbedded(AsymmetricUnversionedSerializer<A, B> delegate)
{
return accordEmbedded(AsymmetricVersionedSerializer.from(delegate));
}

public static <A, B> IVersionedAsymmetricSerializer<A, B> mtEmbedded(AsymmetricVersionedSerializer<A, B, org.apache.cassandra.replication.Version> delegate)
{
return new EmbeddedAsymmetricVersionedSerializer<>(
org.apache.cassandra.replication.Version.CLUSTER_SAFE_VERSION,
org.apache.cassandra.replication.Version.serializer,
delegate
);
}

public static <A, B> IVersionedAsymmetricSerializer<A, B> mtEmbedded(AsymmetricUnversionedSerializer<A, B> delegate)
{
return mtEmbedded(AsymmetricVersionedSerializer.from(delegate));
}
}
Loading