Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/it/java/io/weaviate/containers/Weaviate.java
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,11 @@ public Builder withApiKeys(String... apiKeys) {
return this;
}

public Builder withGrpcMaxMessageSize(int bytes) {
environment.put("GRPC_MAX_MESSAGE_SIZE", String.valueOf(bytes));
return this;
}

public Builder enableTelemetry(boolean enable) {
environment.put("DISABLE_TELEMETRY", Boolean.toString(!enable));
return this;
Expand Down
29 changes: 29 additions & 0 deletions src/it/java/io/weaviate/integration/SearchITest.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.assertj.core.api.InstanceOfAssertFactories;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.rules.TestRule;

Expand Down Expand Up @@ -667,4 +668,32 @@ public void testGenerative_bm25_groupBy() throws IOException {
.extracting(TaskOutput::text, InstanceOfAssertFactories.STRING)
.isNotBlank();
}

/**
* Ensure the client respects server's configuration for max gRPC size:
* we create a server with 1-byte message size and try to send a large payload
* there. If the channel is configured correctly, it will refuse to send it.
*/
@Test
@Ignore("Exception thrown by gRPC transport causes a deadlock")
public void test_maxGrpcMessageSize() throws Exception {
var w = Weaviate.custom().withGrpcMaxMessageSize(1).build();
var nsHugeVectors = ns("HugeVectors");

try (final var _client = w.getClient()) {
var huge = _client.collections.create(nsHugeVectors, c -> c
.vectorConfig(VectorConfig.selfProvided()));

final var vector = randomVector(5000, -.01f, .01f);
final WeaviateObject<Map<String, Object>, Reference, ObjectMetadata> hugeObject = WeaviateObject.of(obj -> obj
.metadata(ObjectMetadata.of(m -> m
.vectors(Vectors.of(vector)))));

Assertions.assertThatThrownBy(() -> {
// insertMany to route this request through gRPC.
huge.data.insertMany(hugeObject);
}).isInstanceOf(io.grpc.StatusRuntimeException.class);
}
System.out.println("here?");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ public final CompletableFuture<InsertManyResponse> insertMany(PropertiesT... obj
return insertMany(InsertManyRequest.of(objects));
}

@SafeVarargs
public final CompletableFuture<InsertManyResponse> insertMany(
WeaviateObject<PropertiesT, Reference, ObjectMetadata>... objects) {
return insertMany(Arrays.asList(objects));
}

public CompletableFuture<InsertManyResponse> insertMany(
List<WeaviateObject<PropertiesT, Reference, ObjectMetadata>> objects) {
return insertMany(new InsertManyRequest<>(objects));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ public DefaultGrpcTransport(GrpcChannelOptions transportOptions) {

if (transportOptions.maxMessageSize() != null) {
var max = transportOptions.maxMessageSize();
blockingStub.withMaxInboundMessageSize(max).withMaxOutboundMessageSize(max);
futureStub.withMaxInboundMessageSize(max).withMaxOutboundMessageSize(max);
blockingStub = blockingStub.withMaxInboundMessageSize(max).withMaxOutboundMessageSize(max);
futureStub = futureStub.withMaxInboundMessageSize(max).withMaxOutboundMessageSize(max);
}

if (transportOptions.tokenProvider() != null) {
Expand Down