Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions changelog/unreleased/solr-18077.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# See https://github.com/apache/solr/blob/main/dev-docs/changelog.adoc
title: "CrossDC Consumer: fix potential out-of-order Kafka partition processing"
type: fixed # added, changed, fixed, deprecated, removed, dependency_update, security, other
authors:
- name: Andrzej Bialecki
nick: ab
links:
- name: SOLR-18077
url: https://issues.apache.org/jira/browse/SOLR-18077
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.lang.invoke.MethodHandles;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
Expand All @@ -33,6 +34,7 @@
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.stream.IntStream;
import org.apache.commons.io.IOUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
Expand Down Expand Up @@ -121,8 +123,10 @@ public String bootstrapServers() {
};
kafkaCluster.start();

kafkaCluster.createTopic(TOPIC, 1, 1);
kafkaCluster.createTopic(TOPIC, 10, 1);

// ensure small batches to test multi-partition ordering
System.setProperty("batchSizeBytes", "128");
System.setProperty("solr.crossdc.topicName", TOPIC);
System.setProperty("solr.crossdc.bootstrapServers", kafkaCluster.bootstrapServers());
System.setProperty(INDEX_UNMIRRORABLE_DOCS, "false");
Expand Down Expand Up @@ -182,6 +186,7 @@ public void afterSolrAndKafkaIntegrationTest() throws Exception {
Thread.setDefaultUncaughtExceptionHandler(uceh);
}

@Test
public void testFullCloudToCloud() throws Exception {
CloudSolrClient client = solrCluster1.getSolrClient(COLLECTION);
SolrInputDocument doc = new SolrInputDocument();
Expand All @@ -197,6 +202,7 @@ public void testFullCloudToCloud() throws Exception {
assertCluster2EventuallyHasDocs(COLLECTION, "*:*", 1);
}

@Test
public void testProducerToCloud() throws Exception {
Properties properties = new Properties();
properties.put("bootstrap.servers", kafkaCluster.bootstrapServers());
Expand Down Expand Up @@ -227,6 +233,39 @@ public void testProducerToCloud() throws Exception {
producer.close();
}

private static final String LOREM_IPSUM =
"Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.";

@Test
public void testStrictOrdering() throws Exception {
CloudSolrClient client = solrCluster1.getSolrClient();
int NUM_DOCS = 5000;
// delay deletes by this many docs
int DELTA = 100;
for (int i = 0; i < NUM_DOCS; i++) {
SolrInputDocument doc = new SolrInputDocument();
doc.addField("id", "id-" + i);
doc.addField("text", "some test with a relatively long field. " + LOREM_IPSUM);

client.add(COLLECTION, doc);
if (i >= DELTA) {
client.deleteById(COLLECTION, "id-" + (i - DELTA));
}
}

// send the remaining deletes in random order
ArrayList<Integer> ids = new ArrayList<>(DELTA);
IntStream.range(0, DELTA).forEach(i -> ids.add(i));
Collections.shuffle(ids, random());
for (Integer id : ids) {
client.deleteById(COLLECTION, "id-" + (NUM_DOCS - DELTA + id));
}

client.commit(COLLECTION);

assertCluster2EventuallyHasDocs(COLLECTION, "*:*", 0);
}

@Test
@Ignore("This relies on collection properties and I don't see where they are read anymore")
public void testMirroringUpdateProcessor() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,15 @@ private void submitRequest(MirroredSolrRequest<?> request, String topicName)
}

final long enqueueStartNanos = System.nanoTime();
// required for multi-partition topics to preserve ordering of requests for a collection
final String recordKey =
request.getSolrRequest() != null ? request.getSolrRequest().getCollection() : null;

// Create Producer record
try {

producer.send(
new ProducerRecord<>(topicName, request),
new ProducerRecord<>(topicName, recordKey, request),
(metadata, exception) -> {
if (exception != null) {
log.error(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ Optional configuration properties:
`solr.crossdc.retryBackoffMs` _<integer>_:: The amount of time to wait before attempting to retry a failed request to a given topic partition.
`solr.crossdc.deliveryTimeoutMS` _<integer>_:: Updates sent to the Kafka queue will be failed before the number of retries has been exhausted if the timeout configured by delivery.timeout.ms expires first
`solr.crossdc.maxRequestSizeBytes` _<integer>_:: The maximum size of a Kafka queue request in bytes - limits the number of requests that will be sent over the queue in a single batch.
`solr.crossdc.dlqTopicName` _<string>_: If not empty then requests that failed processing `maxAttempts` times will be sent to a "dead letter queue" topic in Kafka (must exist if configured).
`solr.crossdc.dlqTopicName` _<string>_:: If not empty then requests that failed processing `maxAttempts` times will be sent to a "dead letter queue" topic in Kafka (must exist if configured).
`solr.crossdc.mirrorCommits` _<boolean>_:: If `true` then standalone commit requests will be mirrored, otherwise they will be processed only locally.
`solr.crossdc.expandDbq` _<enum>_ :: If set to `expand` (default) then Delete-By-Query will be expanded before mirroring into series of Delete-By-Id, which may help with correct processing of out-of-order requests on the consumer side.
If set to `none` then Delete-By-Query requests will be mirrored as-is.
Expand Down Expand Up @@ -211,4 +211,8 @@ Setting the `solr.crossdc.enabled` system property or xref:collection-management
- When `solr.crossdc.expandDbq` property is set to `expand` (default) then Delete-By-Query converts to a series of Delete-By-Id, which can be much less efficient for queries matching large numbers of documents.
Setting this property to `none` results in forwarding a real Delete-By-Query - this reduces the amount of data to mirror but may cause different results due to the potential re-ordering of failed & re-submitted requests between Consumer and the target Solr.
- When `solr.crossdc.collapseUpdates` is set to `all` then multiple requests containing a mix of add and delete ops will be collapsed into a single outgoing request.
This will cause the original ordering of add / delete ops to be lost (because Solr processing of an update request always processes all add ops first, and only then the delete ops), which may affect the final outcome when some of the ops refer to the same document ids.
This will cause the original ordering of add / delete ops to be lost (because Solr processing of an update request always processes all add ops first, and only then the delete ops), which may affect the final outcome when some of the ops refer to the same document ids.
- When the Kafka topic used for mirroring has multiple partitions the CrossDC Producer and Consumer guarantee strict ordering of updates ONLY within the same collection.
In other words, when a multi-partition topic is used for mirroring there's no guarantee of a strict global request ordering across
collections, which normally should not be an issue. However, if a strict global ordering across collections is required then
the mirroring topic must use a single partition.