Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 107 additions & 60 deletions src/java/org/apache/cassandra/tcm/membership/Directory.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,14 @@
import org.apache.cassandra.tcm.MetadataValue;
import org.apache.cassandra.tcm.serialization.MetadataSerializer;
import org.apache.cassandra.tcm.serialization.Version;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.UUIDSerializer;
import org.apache.cassandra.utils.btree.BTreeBiMap;
import org.apache.cassandra.utils.btree.BTreeMap;
import org.apache.cassandra.utils.btree.BTreeMultimap;

import static org.apache.cassandra.db.TypeSizes.sizeof;
import static org.apache.cassandra.tcm.membership.NodeVersion.CURRENT;
import static org.apache.cassandra.tcm.membership.NodeVersion.CURRENT_METADATA_VERSION;

public class Directory implements MetadataValue<Directory>
{
Expand Down Expand Up @@ -106,6 +106,41 @@ private Directory(int nextId,
BTreeMap<NodeId, NodeAddresses> addresses,
BTreeMultimap<String, InetAddressAndPort> endpointsByDC,
BTreeMap<String, Multimap<String, InetAddressAndPort>> racksByDC)
{
this(nextId, lastModified, peers, removedNodes, locations, states, versions, hostIds, addresses, endpointsByDC, racksByDC, clusterVersions(states, versions));
}

private Directory(int nextId,
Epoch lastModified,
BTreeBiMap<NodeId, InetAddressAndPort> peers,
BTreeSet<RemovedNode> removedNodes,
BTreeMap<NodeId, Location> locations,
BTreeMap<NodeId, NodeState> states,
BTreeMap<NodeId, NodeVersion> versions,
BTreeBiMap<NodeId, UUID> hostIds,
BTreeMap<NodeId, NodeAddresses> addresses,
BTreeMultimap<String, InetAddressAndPort> endpointsByDC,
BTreeMap<String, Multimap<String, InetAddressAndPort>> racksByDC,
ClusterVersions clusterVersions)
{
this(nextId, lastModified, peers, removedNodes, locations, states, versions, hostIds, addresses, endpointsByDC, racksByDC,
clusterVersions.clusterMinVersion, clusterVersions.clusterMaxVersion, clusterVersions.commonSerializationVersion);
}

private Directory(int nextId,
Epoch lastModified,
BTreeBiMap<NodeId, InetAddressAndPort> peers,
BTreeSet<RemovedNode> removedNodes,
BTreeMap<NodeId, Location> locations,
BTreeMap<NodeId, NodeState> states,
BTreeMap<NodeId, NodeVersion> versions,
BTreeBiMap<NodeId, UUID> hostIds,
BTreeMap<NodeId, NodeAddresses> addresses,
BTreeMultimap<String, InetAddressAndPort> endpointsByDC,
BTreeMap<String, Multimap<String, InetAddressAndPort>> racksByDC,
NodeVersion clusterMinVersion,
NodeVersion clusterMaxVersion,
Version commonSerializationVersion)
{
this.nextId = nextId;
this.lastModified = lastModified;
Expand All @@ -118,10 +153,9 @@ private Directory(int nextId,
this.addresses = addresses;
this.endpointsByDC = endpointsByDC;
this.racksByDC = racksByDC;
Pair<NodeVersion, NodeVersion> minMaxVer = minMaxVersions(states, versions);
clusterMinVersion = minMaxVer.left;
clusterMaxVersion = minMaxVer.right;
commonSerializationVersion = minCommonSerializationVersion(states, versions);
this.clusterMinVersion = clusterMinVersion;
this.clusterMaxVersion = clusterMaxVersion;
this.commonSerializationVersion = commonSerializationVersion;
}

@Override
Expand Down Expand Up @@ -161,7 +195,7 @@ public Epoch lastModified()
@Override
public Directory withLastModified(Epoch epoch)
{
return new Directory(nextId, epoch, peers, removedNodes, locations, states, versions, hostIds, addresses, endpointsByDC, racksByDC);
return new Directory(nextId, epoch, peers, removedNodes, locations, states, versions, hostIds, addresses, endpointsByDC, racksByDC, clusterMinVersion, clusterMaxVersion, commonSerializationVersion);
}

public Directory withNonUpgradedNode(NodeAddresses addresses,
Expand Down Expand Up @@ -250,9 +284,18 @@ public Directory withNodeAddresses(NodeId id, NodeAddresses nodeAddresses)
BTreeMap<String, Multimap<String, InetAddressAndPort>> updatedEndpointsByRack = racksByDC.withForce(location(id).datacenter, rackEP);

return new Directory(nextId, lastModified,
peers.withForce(id,nodeAddresses.broadcastAddress), removedNodes, locations, states, versions, hostIds, addresses.withForce(id, nodeAddresses),
peers.withForce(id, nodeAddresses.broadcastAddress),
removedNodes,
locations,
states,
versions,
hostIds,
addresses.withForce(id, nodeAddresses),
updatedEndpointsByDC,
updatedEndpointsByRack);
updatedEndpointsByRack,
clusterMinVersion,
clusterMaxVersion,
commonSerializationVersion);
}

public Directory withRackAndDC(NodeId id)
Expand All @@ -266,7 +309,10 @@ public Directory withRackAndDC(NodeId id)

return new Directory(nextId, lastModified, peers, removedNodes, locations, states, versions, hostIds, addresses,
endpointsByDC.with(location.datacenter, endpoint),
racksByDC.withForce(location.datacenter, rackEP));
racksByDC.withForce(location.datacenter, rackEP),
clusterMinVersion,
clusterMaxVersion,
commonSerializationVersion);
}

public Directory withoutRackAndDC(NodeId id)
Expand All @@ -286,7 +332,10 @@ public Directory withoutRackAndDC(NodeId id)
newRacksByDC = racksByDC.withForce(location.datacenter, rackEP);
return new Directory(nextId, lastModified, peers, removedNodes, locations, states, versions, hostIds, addresses,
endpointsByDC.without(location.datacenter, endpoint),
newRacksByDC);
newRacksByDC,
clusterMinVersion,
clusterMaxVersion,
commonSerializationVersion);
}

public Directory withUpdatedRackAndDc(NodeId id, Location location)
Expand All @@ -306,23 +355,7 @@ private Directory withLocation(NodeId id, Location location)
return this;

return new Directory(nextId, lastModified, peers, removedNodes, locations.withForce(id, location), states, versions, hostIds,
addresses, endpointsByDC, racksByDC);
}

public Directory removed(Epoch removedIn, NodeId id, InetAddressAndPort addr)
{
Invariants.require(!peers.containsKey(id));
return new Directory(nextId,
lastModified,
peers,
removedNodes.with(new RemovedNode(removedIn, id, addr)),
locations,
states,
versions,
hostIds,
addresses,
endpointsByDC,
racksByDC);
addresses, endpointsByDC, racksByDC, clusterMinVersion, clusterMaxVersion, commonSerializationVersion);
}

public Directory without(Epoch removedIn, NodeId id)
Expand Down Expand Up @@ -641,14 +674,23 @@ public Directory deserialize(DataInputPlus in, Version version) throws IOExcepti
if (version.isAtLeast(Version.V1))
nextId = in.readInt();
int count = in.readInt();
Directory newDir = new Directory();

BTreeBiMap<NodeId, InetAddressAndPort> peers = BTreeBiMap.empty();
BTreeMap<NodeId, Location> locations = BTreeMap.empty();
BTreeMap<NodeId, NodeState> states = BTreeMap.empty();
BTreeMap<NodeId, NodeVersion> versions = BTreeMap.empty();
BTreeBiMap<NodeId, UUID> hostIds = BTreeBiMap.empty();
BTreeMap<NodeId, NodeAddresses> addresses = BTreeMap.empty();
for (int i = 0; i < count; i++)
{
Node n = Node.serializer.deserialize(in, version);
// todo: bulk operations
newDir = newDir.with(n.addresses, n.id, n.hostId, n.location, n.version)
.withNodeState(n.id, n.state);
NodeId id = n.id;
peers = peers.withForce(id, n.addresses.broadcastAddress);
locations = locations.withForce(id, n.location);
states = states.withForce(id, n.state);
versions = versions.withForce(id, n.version);
hostIds = hostIds.withForce(id, n.hostId);
addresses = addresses.withForce(id, n.addresses);
}

int dcCount = in.readInt();
Expand Down Expand Up @@ -677,7 +719,7 @@ public Directory deserialize(DataInputPlus in, Version version) throws IOExcepti
if (version.isBefore(Version.V1))
{
NodeId maxId = null;
for (NodeId id : newDir.peers.keySet())
for (NodeId id : peers.keySet())
{
if (maxId == null || id.compareTo(maxId) > 0)
maxId = id;
Expand All @@ -688,7 +730,7 @@ public Directory deserialize(DataInputPlus in, Version version) throws IOExcepti
else
nextId = maxId.id() + 1;
}

BTreeSet<RemovedNode> removed = BTreeSet.empty(RemovedNode::compareTo);
if (version.isAtLeast(Version.V7))
{
int removedNodes = in.readInt();
Expand All @@ -697,19 +739,20 @@ public Directory deserialize(DataInputPlus in, Version version) throws IOExcepti
long epoch = in.readLong();
NodeId nodeId = NodeId.serializer.deserialize(in, version);
InetAddressAndPort addr = InetAddressAndPort.MetadataSerializer.serializer.deserialize(in, version);
newDir.removed(Epoch.create(epoch), nodeId, addr);
Invariants.require(!peers.containsKey(nodeId));
removed = removed.with(new RemovedNode(Epoch.create(epoch), nodeId, addr));
}
}

return new Directory(nextId,
lastModified,
newDir.peers,
newDir.removedNodes,
newDir.locations,
newDir.states,
newDir.versions,
newDir.hostIds,
newDir.addresses,
peers,
removed,
locations,
states,
versions,
hostIds,
addresses,
dcEndpoints,
racksByDC);
}
Expand Down Expand Up @@ -769,10 +812,11 @@ public boolean equals(Object o)
equivalentTo(directory);
}

private static Pair<NodeVersion, NodeVersion> minMaxVersions(BTreeMap<NodeId, NodeState> states, BTreeMap<NodeId, NodeVersion> versions)
private static ClusterVersions clusterVersions(BTreeMap<NodeId, NodeState> states, BTreeMap<NodeId, NodeVersion> versions)
{
NodeVersion minVersion = null;
NodeVersion maxVersion = null;
int commonVersion = Integer.MAX_VALUE;
for (Map.Entry<NodeId, NodeState> entry : states.entrySet())
{
if (entry.getValue() != NodeState.LEFT)
Expand All @@ -782,26 +826,15 @@ private static Pair<NodeVersion, NodeVersion> minMaxVersions(BTreeMap<NodeId, No
minVersion = ver;
if (maxVersion == null || ver.compareTo(maxVersion) > 0)
maxVersion = ver;
}
}
if (minVersion == null)
return Pair.create(CURRENT, CURRENT);
return Pair.create(minVersion, maxVersion);
}

public static Version minCommonSerializationVersion(BTreeMap<NodeId, NodeState> states, BTreeMap<NodeId, NodeVersion> versions)
{
int commonVersion = Integer.MAX_VALUE;
for (Map.Entry<NodeId, NodeState> entry : states.entrySet())
{
if (entry.getValue() != NodeState.LEFT)
{
NodeVersion ver = versions.get(entry.getKey());
if (ver.serializationVersion > Version.OLD.asInt() && ver.serializationVersion < commonVersion)
commonVersion = ver.serializationVersion;
}
}
return commonVersion == Integer.MAX_VALUE ? NodeVersion.CURRENT_METADATA_VERSION : Version.fromInt(commonVersion);
if (minVersion == null)
return new ClusterVersions(CURRENT, CURRENT, CURRENT_METADATA_VERSION);

return new ClusterVersions(minVersion, maxVersion,
commonVersion == Integer.MAX_VALUE ? NodeVersion.CURRENT_METADATA_VERSION : Version.fromInt(commonVersion));
}

@Override
Expand All @@ -825,7 +858,8 @@ public boolean equivalentTo(Directory directory)
Objects.equals(endpointsByDC, directory.endpointsByDC) &&
Objects.equals(racksByDC, directory.racksByDC) &&
Objects.equals(versions, directory.versions) &&
Objects.equals(addresses, directory.addresses);
Objects.equals(addresses, directory.addresses) &&
Objects.equals(removedNodes, directory.removedNodes);
}

private static final Logger logger = LoggerFactory.getLogger(Directory.class);
Expand Down Expand Up @@ -891,7 +925,6 @@ public static <K, V> void dumpDiff(Logger logger, Map<K, V> l, Map<K, V> r)

}


public static class RemovedNode implements Comparable<RemovedNode>
{
public final Epoch removedIn;
Expand Down Expand Up @@ -923,4 +956,18 @@ public int compareTo(RemovedNode o)
return id.compareTo(o.id);
}
}

private static class ClusterVersions
{
private final NodeVersion clusterMinVersion;
private final NodeVersion clusterMaxVersion;
private final Version commonSerializationVersion;
public ClusterVersions(NodeVersion clusterMinVersion, NodeVersion clusterMaxVersion, Version commonSerializationVersion)
{

this.clusterMinVersion = clusterMinVersion;
this.clusterMaxVersion = clusterMaxVersion;
this.commonSerializationVersion = commonSerializationVersion;
}
}
}
Loading