Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
0.2.0
-----
* CassandraInstance support for multiple tokens (CASSANALYTICS-100)
* Expose SidecarCdc builders and interfaces (CASSANALYTICS-94)
* Fix bulk reader node availability comparator ordering (CASSANALYTICS-99)
* Remove not needed buffer flips (CASSANALYTICS-95)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ public String toString()
return '{' +
"\"node\": \"" + instance.nodeName() + "\"," +
"\"dc\": \"" + instance.dataCenter() + "\"," +
"\"token\": \"" + instance.token() + "\"," +
"\"tokens\": \"" + instance.tokens() + "\"," +
"\"log\": \"" + segment.name + "\"," +
"\"idx\": \"" + segment.idx + "\"," +
"\"size\": \"" + segment.size + '"' +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,18 +50,18 @@
public class SidecarCommitLogProviderTests
{
private static final List<CassandraInstance> INSTANCES = Arrays.asList(
new CassandraInstance("0", "local1", "DC1"),
new CassandraInstance("100", "local2", "DC1"),
new CassandraInstance("200", "local3", "DC1"),
new CassandraInstance("300", "local4", "DC1"),
new CassandraInstance("400", "local5", "DC1"),
new CassandraInstance("500", "local6", "DC1"),
new CassandraInstance("1", "local7", "DC2"),
new CassandraInstance("101", "local8", "DC2"),
new CassandraInstance("201", "local9", "DC2"),
new CassandraInstance("301", "local10", "DC2"),
new CassandraInstance("401", "local11", "DC2"),
new CassandraInstance("501", "local12", "DC2")
new CassandraInstance(Set.of("0"), "local1", "DC1"),
new CassandraInstance(Set.of("100"), "local2", "DC1"),
new CassandraInstance(Set.of("200"), "local3", "DC1"),
new CassandraInstance(Set.of("300"), "local4", "DC1"),
new CassandraInstance(Set.of("400"), "local5", "DC1"),
new CassandraInstance(Set.of("500"), "local6", "DC1"),
new CassandraInstance(Set.of("1"), "local7", "DC2"),
new CassandraInstance(Set.of("101"), "local8", "DC2"),
new CassandraInstance(Set.of("201"), "local9", "DC2"),
new CassandraInstance(Set.of("301"), "local10", "DC2"),
new CassandraInstance(Set.of("401"), "local11", "DC2"),
new CassandraInstance(Set.of("501"), "local12", "DC2")
);

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ public List<CdcState> loadState(String jobId, int partitionId, @Nullable TokenRa
long numSeconds = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - startTime);
assertThat(endState.epoch >= Math.max(0, numSeconds - 4)).isTrue(); // epochs should be around ~ 1 per second
assertThat(endState.replicaCount.isEmpty()).isTrue();
Marker endMarker = endState.markers.startMarker(new CassandraInstance("0", "local-instance", "DC1"));
Marker endMarker = endState.markers.startMarker(new CassandraInstance(Set.of("0"), "local-instance", "DC1"));
assertThat(logProvider(directory).logs().map(CommitLog::segmentId).collect(Collectors.toSet()).contains(endMarker.segmentId)).isTrue();
}
finally
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.io.IOException;
import java.nio.file.Path;
import java.util.Set;

import org.apache.cassandra.cdc.api.CommitLog;
import org.apache.cassandra.spark.data.FileSystemSource;
Expand All @@ -41,7 +42,7 @@ public LocalCommitLog(Path path)
this.name = path.getFileName().toString();
this.path = path;
this.length = IOUtils.size(path);
this.instance = new CassandraInstance("0", "local-instance", "DC1");
this.instance = new CassandraInstance(Set.of("0"), "local-instance", "DC1");
}

public String name()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.math.BigInteger;
import java.util.Set;

import com.google.common.collect.ImmutableMap;
import org.junit.jupiter.api.Test;
Expand All @@ -42,7 +43,7 @@ public class CommitLogMarkerTests
@Test
public void testEmpty()
{
CassandraInstance inst = new CassandraInstance("0", "local1-i1", "DC1");
CassandraInstance inst = new CassandraInstance(Set.of("0"), "local1-i1", "DC1");
Marker marker = CommitLogMarkers.EMPTY.startMarker(inst);
assertThat(marker.segmentId).isEqualTo(0);
assertThat(marker.position).isEqualTo(0);
Expand All @@ -54,9 +55,9 @@ public void testEmpty()
@Test
public void testPerInstance()
{
CassandraInstance inst1 = new CassandraInstance("0", "local1-i1", "DC1");
CassandraInstance inst2 = new CassandraInstance("1", "local2-i1", "DC1");
CassandraInstance inst3 = new CassandraInstance("2", "local3-i1", "DC1");
CassandraInstance inst1 = new CassandraInstance(Set.of("0"), "local1-i1", "DC1");
CassandraInstance inst2 = new CassandraInstance(Set.of("1"), "local2-i1", "DC1");
CassandraInstance inst3 = new CassandraInstance(Set.of("2"), "local3-i1", "DC1");

CommitLogMarkers markers = CommitLogMarkers.of(
ImmutableMap.of(
Expand All @@ -77,9 +78,9 @@ public void testPerInstance()
@Test
public void testPerRange()
{
CassandraInstance inst1 = new CassandraInstance("0", "local1-i1", "DC1");
CassandraInstance inst2 = new CassandraInstance("1", "local2-i1", "DC1");
CassandraInstance inst3 = new CassandraInstance("2", "local3-i1", "DC1");
CassandraInstance inst1 = new CassandraInstance(Set.of("0"), "local1-i1", "DC1");
CassandraInstance inst2 = new CassandraInstance(Set.of("1"), "local2-i1", "DC1");
CassandraInstance inst3 = new CassandraInstance(Set.of("2"), "local3-i1", "DC1");

// build per range commit log markers
PerRangeCommitLogMarkers.PerRangeBuilder builder = CommitLogMarkers.perRangeBuilder();
Expand Down Expand Up @@ -126,7 +127,7 @@ public void testPerRange()
@Test
public void testIsBefore()
{
CassandraInstance inst1 = new CassandraInstance("0", "local1-i1", "DC1");
CassandraInstance inst1 = new CassandraInstance(Set.of("0"), "local1-i1", "DC1");

assertThat(inst1.zeroMarker().isBefore(inst1.zeroMarker())).isFalse();

Expand All @@ -146,18 +147,18 @@ public void testIsBefore()
public void testIsBeforeException()
{
assertThatThrownBy(() -> {
CassandraInstance inst1 = new CassandraInstance("0", "local1-i1", "DC1");
CassandraInstance inst2 = new CassandraInstance("1", "local2-i1", "DC1");
CassandraInstance inst1 = new CassandraInstance(Set.of("0"), "local1-i1", "DC1");
CassandraInstance inst2 = new CassandraInstance(Set.of("1"), "local2-i1", "DC1");
inst1.zeroMarker().isBefore(inst2.zeroMarker());
}).isInstanceOf(IllegalArgumentException.class);
}

@Test
public void testPerInstanceJdkSerialization()
{
CassandraInstance inst1 = new CassandraInstance("0", "local1-i1", "DC1");
CassandraInstance inst2 = new CassandraInstance("1", "local2-i1", "DC1");
CassandraInstance inst3 = new CassandraInstance("2", "local3-i1", "DC1");
CassandraInstance inst1 = new CassandraInstance(Set.of("0"), "local1-i1", "DC1");
CassandraInstance inst2 = new CassandraInstance(Set.of("1"), "local2-i1", "DC1");
CassandraInstance inst3 = new CassandraInstance(Set.of("2"), "local3-i1", "DC1");

CommitLogMarkers markers = CommitLogMarkers.of(
ImmutableMap.of(
Expand All @@ -179,9 +180,9 @@ public void testPerInstanceJdkSerialization()
@Test
public void testPerRangeJdkSerialization()
{
CassandraInstance inst1 = new CassandraInstance("0", "local1-i1", "DC1");
CassandraInstance inst2 = new CassandraInstance("1", "local2-i1", "DC1");
CassandraInstance inst3 = new CassandraInstance("2", "local3-i1", "DC1");
CassandraInstance inst1 = new CassandraInstance(Set.of("0"), "local1-i1", "DC1");
CassandraInstance inst2 = new CassandraInstance(Set.of("1"), "local2-i1", "DC1");
CassandraInstance inst3 = new CassandraInstance(Set.of("2"), "local3-i1", "DC1");

PerRangeCommitLogMarkers.PerRangeBuilder builder = CommitLogMarkers.perRangeBuilder();
builder.add(TokenRange.closed(BigInteger.ZERO, BigInteger.valueOf(5000)), inst1.markerAt(500, 10000));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.cassandra.cdc.model;

import java.math.BigInteger;
import java.util.Set;

import com.google.common.collect.ImmutableMap;
import org.junit.jupiter.api.Test;
Expand All @@ -39,9 +40,9 @@

public class CdcKryoSerializationTests
{
public static final CassandraInstance INST_1 = new CassandraInstance("0", "local1-i1", "DC1");
public static final CassandraInstance INST_2 = new CassandraInstance("1", "local2-i1", "DC1");
public static final CassandraInstance INST_3 = new CassandraInstance("2", "local3-i1", "DC1");
public static final CassandraInstance INST_1 = new CassandraInstance(Set.of("0"), "local1-i1", "DC1");
public static final CassandraInstance INST_2 = new CassandraInstance(Set.of("1"), "local2-i1", "DC1");
public static final CassandraInstance INST_3 = new CassandraInstance(Set.of("2"), "local3-i1", "DC1");

public static final PartitionUpdateWrapper.Digest DIGEST_1 = new PartitionUpdateWrapper.Digest("ks1",
"tb1",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@

package org.apache.cassandra.spark.data.model;

import java.util.Set;

/**
* Token owner owns a token
* Token owner owns a set of tokens
*/
public interface TokenOwner
{
/**
* @return the token it owns
* @return the tokens it owns
*/
String token();
Set<String> tokens();
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
package org.apache.cassandra.spark.data.partitioner;

import java.io.Serializable;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
Expand All @@ -34,20 +36,20 @@ public class CassandraInstance implements TokenOwner, Serializable
public static final CassandraInstance.Serializer SERIALIZER = new CassandraInstance.Serializer();

private static final long serialVersionUID = 6767636627576239773L;
private final String token;
private final Set<String> tokens;
private final String node;
private final String dataCenter;

public CassandraInstance(String token, String node, String dataCenter)
public CassandraInstance(Set<String> tokens, String node, String dataCenter)
{
this.token = token;
this.tokens = tokens;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's validate non-null or empty here . i.e

Suggested change
this.tokens = tokens;
this.tokens = Objects.requireNotNull(tokens, "tokens cannot be null");

this.node = node;
this.dataCenter = dataCenter;
}

public String token()
public Set<String> tokens()
{
return token;
return tokens;
}

public String nodeName()
Expand Down Expand Up @@ -83,35 +85,45 @@ public boolean equals(Object other)
}

CassandraInstance that = (CassandraInstance) other;
return Objects.equals(this.token, that.token)
return Objects.equals(this.tokens, that.tokens)
&& Objects.equals(this.node, that.node)
&& Objects.equals(this.dataCenter, that.dataCenter);
}

@Override
public int hashCode()
{
return Objects.hash(token, node, dataCenter);
return Objects.hash(tokens, node, dataCenter);
}

@Override
public String toString()
{
return String.format("{\"token\"=\"%s\", \"node\"=\"%s\", \"dc\"=\"%s\"}", token, node, dataCenter);
return String.format("{\"tokens\"=\"%s\", \"node\"=\"%s\", \"dc\"=\"%s\"}", tokens, node, dataCenter);
}

public static class Serializer extends com.esotericsoftware.kryo.Serializer<CassandraInstance>
{
@Override
public CassandraInstance read(Kryo kryo, Input in, Class type)
{
return new CassandraInstance(in.readString(), in.readString(), in.readString());
Set<String> tokens = new HashSet<>();
int numTokens = in.readInt();
for (int i = 0; i < numTokens; i++)
{
tokens.add(in.readString());
}
return new CassandraInstance(tokens, in.readString(), in.readString());
}

@Override
public void write(Kryo kryo, Output out, CassandraInstance instance)
{
out.writeString(instance.token());
out.writeInt(instance.tokens().size());
for (String token : instance.tokens())
{
out.writeString(token);
}
out.writeString(instance.nodeName());
out.writeString(instance.dataCenter());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import java.util.Objects;
Expand Down Expand Up @@ -115,7 +117,9 @@ public CassandraRing(Partitioner partitioner,
this.keyspace = keyspace;
this.replicationFactor = replicationFactor;
this.instances = instances.stream()
.sorted(Comparator.comparing(instance -> new BigInteger(instance.token())))
.sorted(Comparator.comparing(i -> i.tokens().stream()
.map(BigInteger::new).collect(Collectors.toList())
.stream().min(BigInteger::compareTo).get()))
.collect(Collectors.toCollection(ArrayList::new));
this.init();
}
Expand Down Expand Up @@ -209,8 +213,8 @@ private Collection<String> dataCenters()
public Collection<BigInteger> tokens()
{
return instances.stream()
.map(CassandraInstance::token)
.map(BigInteger::new)
.map(i -> i.tokens().stream().map(BigInteger::new).collect(Collectors.toList()))
.flatMap(List::stream)
.sorted()
.collect(Collectors.toList());
}
Expand All @@ -221,8 +225,8 @@ public Collection<BigInteger> tokens(String dataCenter)
"Datacenter tokens doesn't make sense for SimpleStrategy");
return instances.stream()
.filter(instance -> instance.dataCenter().matches(dataCenter))
.map(CassandraInstance::token)
.map(BigInteger::new)
.map(i -> i.tokens().stream().map(BigInteger::new).collect(Collectors.toList()))
.flatMap(List::stream)
.collect(Collectors.toList());
}

Expand Down Expand Up @@ -276,7 +280,13 @@ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundE
this.instances = new ArrayList<>(numInstances);
for (int instance = 0; instance < numInstances; instance++)
{
this.instances.add(new CassandraInstance(in.readUTF(), in.readUTF(), in.readUTF()));
Set<String> tokens = new HashSet<>();
int numTokens = in.readShort();
for (int i = 0; i < numTokens; i++)
{
tokens.add(in.readUTF());
}
this.instances.add(new CassandraInstance(tokens, in.readUTF(), in.readUTF()));
}
this.init();
}
Expand All @@ -299,7 +309,11 @@ private void writeObject(ObjectOutputStream out) throws IOException, ClassNotFou
out.writeShort(this.instances.size());
for (CassandraInstance instance : this.instances)
{
out.writeUTF(instance.token());
out.writeShort(instance.tokens().size());
for (String token : instance.tokens())
{
out.writeUTF(token);
}
out.writeUTF(instance.nodeName());
out.writeUTF(instance.dataCenter());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;

import com.google.common.base.Preconditions;
import com.google.common.collect.ArrayListMultimap;
Expand Down Expand Up @@ -156,8 +157,11 @@ public static List<Range<BigInteger>> split(Range<BigInteger> range, int nrSplit
{
Instance instance = instances.get(index);
int disjointReplica = ((instances.size() + index) - replicationFactor) % instances.size();
BigInteger rangeStart = new BigInteger(instances.get(disjointReplica).token());
BigInteger rangeEnd = new BigInteger(instance.token());
Set<String> tokensStart = instances.get(disjointReplica).tokens();
Set<String> tokensEnd = instance.tokens();

BigInteger rangeStart = tokensStart.stream().map(BigInteger::new).min(BigInteger::compareTo).get();
BigInteger rangeEnd = tokensEnd.stream().map(BigInteger::new).max(BigInteger::compareTo).get();

// If start token is greater than or equal to end token we are looking at a wrap around range, split it
if (rangeStart.compareTo(rangeEnd) >= 0)
Expand Down
Loading