Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 81 additions & 0 deletions src/it/java/io/weaviate/integration/ClusterITest.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@

import io.weaviate.ConcurrentTest;
import io.weaviate.client6.v1.api.WeaviateClient;
import io.weaviate.client6.v1.api.cluster.Node;
import io.weaviate.client6.v1.api.cluster.NodeVerbosity;
import io.weaviate.client6.v1.api.cluster.ShardingState;
import io.weaviate.client6.v1.api.cluster.replication.Replication;
import io.weaviate.client6.v1.api.cluster.replication.ReplicationState;
import io.weaviate.client6.v1.api.cluster.replication.ReplicationType;
import io.weaviate.containers.Weaviate;

public class ClusterITest extends ConcurrentTest {
Expand Down Expand Up @@ -50,4 +55,80 @@ public void test_listNodes() throws IOException {
// Assert
Assertions.assertThat(allNodes).as("total no. nodes").hasSize(3);
}

@Test
public void test_replicateLifecycle() throws IOException {
// Arrange

// We must create the collection first before any shards exist on the nodes.
var nsThings = ns("Things");
client.collections.create(nsThings);

var nodes = client.cluster.listNodes(opt -> opt.verbosity(NodeVerbosity.VERBOSE));
Assertions.assertThat(nodes)
.as("cluster at least 2 nodes").hasSizeGreaterThanOrEqualTo(2);

Node source = null;
Node target = null;
for (var node : nodes) {
if (source == null && !node.shards().isEmpty()) {
source = node;
} else if (target == null) {
target = node;
}
}

var wantShard = source.shards().get(0).name();
var srcNode = source.name();
var tgtNode = target.name();

// Act: start replication
var replication = client.cluster.replicate(
nsThings,
wantShard,
srcNode,
tgtNode,
ReplicationType.MOVE);

var got = client.cluster.replication.get(replication.uuid());
Assertions.assertThat(got).get()
.as("expected replication status")
.returns(nsThings, Replication::collection)
.returns(wantShard, Replication::shard)
.returns(srcNode, Replication::sourceNode)
.returns(tgtNode, Replication::targetNode)
.returns(ReplicationType.MOVE, Replication::type)
.returns(null, Replication::history)
.extracting(Replication::status).isNotNull();

var withHistory = client.cluster.replication.get(
replication.uuid(),
repl -> repl.includeHistory(true));
Assertions.assertThat(withHistory).get()
.as("includes history")
.extracting(Replication::history).isNotNull();

// Act: query replications
var filtered = client.cluster.replication.list(
repl -> repl
.collection(nsThings)
.shard(wantShard)
.targetNode(tgtNode));

Assertions.assertThat(filtered)
.as("existing replications for %s-%s -> %s", nsThings, wantShard, tgtNode)
.hasSize(1);

// Act: cancel
client.cluster.replication.cancel(replication.uuid());

eventually(() -> client.cluster.replication.get(replication.uuid())
.orElseThrow()
.status().state() == ReplicationState.CANCELED, 1000, 25, "replication must be canceled");

// Act: delete replication
client.cluster.replication.delete(replication.uuid());

eventually(() -> client.cluster.replication.list().isEmpty(), 1000, 15, "replication must be deleted");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,27 @@

import com.google.gson.annotations.SerializedName;

public enum NodeVerbosity {
import io.weaviate.client6.v1.internal.json.JsonEnum;

public enum NodeVerbosity implements JsonEnum<NodeVerbosity> {
@SerializedName("minimal")
MINIMAL,
MINIMAL("minimal"),
@SerializedName("verbose")
VERBOSE;
VERBOSE("verbose");

private final String jsonValue;

private NodeVerbosity(String jsonValue) {
this.jsonValue = jsonValue;
}

@Override
public String jsonValue() {
return jsonValue;
}

@Override
public String toString() {
return jsonValue();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,25 @@
import java.util.function.Function;

import io.weaviate.client6.v1.api.WeaviateApiException;
import io.weaviate.client6.v1.api.cluster.replication.CreateReplicationRequest;
import io.weaviate.client6.v1.api.cluster.replication.Replication;
import io.weaviate.client6.v1.api.cluster.replication.ReplicationType;
import io.weaviate.client6.v1.api.cluster.replication.WeaviateReplicationClient;
import io.weaviate.client6.v1.internal.ObjectBuilder;
import io.weaviate.client6.v1.internal.rest.RestTransport;

public class WeaviateClusterClient {
private final RestTransport restTransport;

/**
* Client for {@code /replication/replicate} endpoints for managing
* replications.
*/
public final WeaviateReplicationClient replication;

public WeaviateClusterClient(RestTransport restTransport) {
this.restTransport = restTransport;
this.replication = new WeaviateReplicationClient(restTransport);
}

/**
Expand Down Expand Up @@ -75,4 +86,29 @@ public List<Node> listNodes(Function<ListNodesRequest.Builder, ObjectBuilder<Lis
throws IOException {
return this.restTransport.performRequest(ListNodesRequest.of(fn), ListNodesRequest._ENDPOINT);
}

/**
* Start a replication operation for a collection's shard.
*
* @param collection Collection name.
* @param shard Shard name.
* @param sourceNode Node on which the shard currently resides.
* @param targetNode Node onto which the files will be replicated.
* @throws WeaviateApiException in case the server returned with an
* error status code.
* @throws IOException in case the request was not sent successfully
* due to a malformed request, a networking error
* or the server being unavailable.
*/
public Replication replicate(
String collection,
String shard,
String sourceNode,
String targetNode,
ReplicationType type)
throws IOException {
return this.restTransport.performRequest(
new CreateReplicationRequest(collection, shard, sourceNode, targetNode, type),
CreateReplicationRequest._ENDPOINT);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,21 @@
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;

import io.weaviate.client6.v1.api.cluster.replication.CreateReplicationRequest;
import io.weaviate.client6.v1.api.cluster.replication.Replication;
import io.weaviate.client6.v1.api.cluster.replication.ReplicationType;
import io.weaviate.client6.v1.api.cluster.replication.WeaviateReplicationClientAsync;
import io.weaviate.client6.v1.internal.ObjectBuilder;
import io.weaviate.client6.v1.internal.rest.RestTransport;

public class WeaviateClusterClientAsync {
private final RestTransport restTransport;

public final WeaviateReplicationClientAsync replication;

public WeaviateClusterClientAsync(RestTransport restTransport) {
this.restTransport = restTransport;
this.replication = new WeaviateReplicationClientAsync(restTransport);
}

/**
Expand Down Expand Up @@ -54,4 +61,23 @@ public CompletableFuture<List<Node>> listNodes(Function<ListNodesRequest.Builder
throws IOException {
return this.restTransport.performRequestAsync(ListNodesRequest.of(fn), ListNodesRequest._ENDPOINT);
}

/**
* Start a replication operation for a collection's shard.
*
* @param collection Collection name.
* @param shard Shard name.
* @param sourceNode Node on which the shard currently resides.
* @param targetNode Node onto which the files will be replicated.
*/
public CompletableFuture<Replication> replicate(
String collection,
String shard,
String sourceNode,
String targetNode,
ReplicationType type) {
return this.restTransport.performRequestAsync(
new CreateReplicationRequest(collection, shard, sourceNode, targetNode, type),
CreateReplicationRequest._ENDPOINT);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package io.weaviate.client6.v1.api.cluster.replication;

import java.util.Collections;
import java.util.UUID;

import io.weaviate.client6.v1.internal.rest.Endpoint;
import io.weaviate.client6.v1.internal.rest.SimpleEndpoint;

public record CancelReplicationRequest(UUID uuid) {

static final Endpoint<CancelReplicationRequest, Void> _ENDPOINT = SimpleEndpoint.sideEffect(
request -> "POST",
request -> "/replication/replicate/" + request.uuid() + "/cancel",
__ -> Collections.emptyMap());
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package io.weaviate.client6.v1.api.cluster.replication;

import java.util.Collections;

import com.google.gson.annotations.SerializedName;

import io.weaviate.client6.v1.internal.json.JSON;
import io.weaviate.client6.v1.internal.rest.Endpoint;
import io.weaviate.client6.v1.internal.rest.SimpleEndpoint;

public record CreateReplicationRequest(
@SerializedName("collection") String collection,
@SerializedName("shard") String shard,
@SerializedName("sourceNode") String sourceNode,
@SerializedName("targetNode") String targetNode,
@SerializedName("type") ReplicationType type) {

public static final Endpoint<CreateReplicationRequest, Replication> _ENDPOINT = new SimpleEndpoint<>(
request -> "POST",
request -> "/replication/replicate",
request -> Collections.emptyMap(),
request -> JSON.serialize(request),
(__, response) -> JSON.deserialize(response, Replication.class));
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package io.weaviate.client6.v1.api.cluster.replication;

import java.util.Collections;

import io.weaviate.client6.v1.internal.rest.Endpoint;
import io.weaviate.client6.v1.internal.rest.SimpleEndpoint;

public record DeleteAllReplicationsRequest() {

static final Endpoint<Void, Void> _ENDPOINT = SimpleEndpoint.sideEffect(
request -> "DELETE",
request -> "/replication/replicate",
__ -> Collections.emptyMap());
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package io.weaviate.client6.v1.api.cluster.replication;

import java.util.Collections;
import java.util.UUID;

import io.weaviate.client6.v1.internal.rest.Endpoint;
import io.weaviate.client6.v1.internal.rest.SimpleEndpoint;

public record DeleteReplicationRequest(UUID uuid) {

static final Endpoint<DeleteReplicationRequest, Void> _ENDPOINT = SimpleEndpoint.sideEffect(
request -> "DELETE",
request -> "/replication/replicate/" + request.uuid(),
__ -> Collections.emptyMap());
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package io.weaviate.client6.v1.api.cluster.replication;

import java.util.Collections;
import java.util.Optional;
import java.util.UUID;
import java.util.function.Function;

import io.weaviate.client6.v1.internal.ObjectBuilder;
import io.weaviate.client6.v1.internal.rest.Endpoint;
import io.weaviate.client6.v1.internal.rest.OptionalEndpoint;

public record GetReplicationRequest(UUID uuid, boolean includeHistory) {

static final Endpoint<GetReplicationRequest, Optional<Replication>> _ENDPOINT = OptionalEndpoint.noBodyOptional(
request -> "GET",
request -> "/replication/replicate/" + request.uuid(),
request -> Collections.singletonMap("includeHistory", request.includeHistory()),
Replication.class);

public static GetReplicationRequest of(UUID uuid) {
return of(uuid, ObjectBuilder.identity());
}

public static GetReplicationRequest of(UUID uuid, Function<Builder, ObjectBuilder<GetReplicationRequest>> fn) {
return fn.apply(new Builder(uuid)).build();
}

public GetReplicationRequest(Builder builder) {
this(builder.uuid, builder.includeHistory);
}

public static class Builder implements ObjectBuilder<GetReplicationRequest> {
private final UUID uuid;
private boolean includeHistory = false;

public Builder(UUID uuid) {
this.uuid = uuid;
}

/**
* Include history of statuses for this replication.
*
* @see Replication#history
*/
public Builder includeHistory(boolean includeHistory) {
this.includeHistory = includeHistory;
return this;
}

@Override
public GetReplicationRequest build() {
return new GetReplicationRequest(this);
}
}

}
Loading