Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions changelog/unreleased/solr-18061.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# See https://github.com/apache/solr/blob/main/dev-docs/changelog.adoc
title: "CrossDC Consumer: add /health endpoint"
type: added # added, changed, fixed, deprecated, removed, dependency_update, security, other
authors:
- name: Andrzej Bialecki
nick: ab
links:
- name: SOLR-18061
url: https://issues.apache.org/jira/browse/SOLR-18061
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ public void start(Map<String, Object> properties) {
context.setAttribute(
MetricsServlet.SOLR_METRICS_MANAGER_ATTRIBUTE, metrics.getMetricManager());
context.addServlet(MetricsServlet.class, "/metrics/*");
context.setAttribute(HealthCheckServlet.KAFKA_CROSSDC_CONSUMER, crossDcConsumer);
context.addServlet(HealthCheckServlet.class, "/health/*");

for (ServletMapping mapping : context.getServletHandler().getServletMappings()) {
if (log.isInfoEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.crossdc.manager.consumer;

import jakarta.servlet.ServletException;
import jakarta.servlet.http.HttpServlet;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Locale;

public class HealthCheckServlet extends HttpServlet {
private static final long serialVersionUID = -7848291432584409313L;

public static final String KAFKA_CROSSDC_CONSUMER =
HealthCheckServlet.class.getName() + ".kafkaCrossDcConsumer";

private KafkaCrossDcConsumer consumer;

@Override
public void init() throws ServletException {
consumer = (KafkaCrossDcConsumer) getServletContext().getAttribute(KAFKA_CROSSDC_CONSUMER);
}

@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp)
throws ServletException, IOException {
if (consumer == null) {
resp.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
return;
}
boolean kafkaConnected = consumer.isKafkaConnected();
boolean solrConnected = consumer.isSolrConnected();
boolean running = consumer.isRunning();
String content =
String.format(
Locale.ROOT,
"{\n \"kafka\": %s,\n \"solr\": %s,\n \"running\": %s\n}",
kafkaConnected,
solrConnected,
running);
resp.setContentType("application/json");
resp.setHeader("Cache-Control", "must-revalidate,no-cache,no-store");
resp.setCharacterEncoding("UTF-8");
resp.getOutputStream().write(content.getBytes(StandardCharsets.UTF_8));
if (kafkaConnected && solrConnected && running) {
resp.setStatus(HttpServletResponse.SC_OK);
} else {
resp.setStatus(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,24 +25,29 @@
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.request.HealthCheckRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.response.HealthCheckResponse;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.ExecutorUtil;
Expand Down Expand Up @@ -72,18 +77,20 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

private final KafkaConsumer<String, MirroredSolrRequest<?>> kafkaConsumer;
private final AdminClient adminClient;
private final CountDownLatch startLatch;
KafkaMirroringSink kafkaMirroringSink;

private static final int KAFKA_CONSUMER_POLL_TIMEOUT_MS = 5000;

private final String[] topicNames;
private final int maxAttempts;
private final CrossDcConf.CollapseUpdates collapseUpdates;
private final int maxCollapseRecords;
private final SolrMessageProcessor messageProcessor;
protected final ConsumerMetrics metrics;

protected SolrClientSupplier solrClientSupplier;
protected final SolrClientSupplier solrClientSupplier;

private final ThreadPoolExecutor executor;

Expand All @@ -93,6 +100,8 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {

private final BlockingQueue<Runnable> queue = new BlockingQueue<>(10);

private volatile boolean running = false;

/**
* Supplier for creating and managing a working CloudSolrClient instance. This class ensures that
* the CloudSolrClient instance doesn't try to use its {@link
Expand Down Expand Up @@ -224,6 +233,7 @@ public KafkaCrossDcConsumer(

log.info("Creating Kafka consumer with configuration {}", kafkaConsumerProps);
kafkaConsumer = createKafkaConsumer(kafkaConsumerProps);
adminClient = createKafkaAdminClient(kafkaConsumerProps);
partitionManager = new PartitionManager(kafkaConsumer);
// Create producer for resubmitting failed requests
log.info("Creating Kafka resubmit producer");
Expand All @@ -244,6 +254,10 @@ public KafkaConsumer<String, MirroredSolrRequest<?>> createKafkaConsumer(Propert
properties, new StringDeserializer(), new MirroredSolrRequestSerializer());
}

public AdminClient createKafkaAdminClient(Properties properties) {
return AdminClient.create(properties);
}

protected KafkaMirroringSink createKafkaMirroringSink(KafkaCrossDcConf conf) {
return new KafkaMirroringSink(conf);
}
Expand All @@ -269,18 +283,25 @@ public void run() {

log.info("Consumer started");
startLatch.countDown();
running = true;

while (pollAndProcessRequests()) {
// no-op within this loop: everything is done in pollAndProcessRequests method defined
// above.
}
running = false;

log.info("Closed kafka consumer. Exiting now.");
log.info("Closing kafka consumer. Exiting now.");
try {
kafkaConsumer.close();
} catch (Exception e) {
log.warn("Failed to close kafka consumer", e);
}
try {
adminClient.close();
} catch (Exception e) {
log.warn("Failed to close kafka admin client", e);
}

try {
kafkaMirroringSink.close();
Expand All @@ -292,6 +313,44 @@ public void run() {
}
}

public boolean isRunning() {
return running;
}

public boolean isSolrConnected() {
if (solrClientSupplier == null) {
return false;
}
try {
HealthCheckRequest request = new HealthCheckRequest();
HealthCheckResponse response = request.process(solrClientSupplier.get());
if (response.getStatus() != 0) {
return false;
}
return true;
} catch (Exception e) {
return false;
}
}

public boolean isKafkaConnected() {
if (adminClient == null) {
return false;
}
try {
Collection<Node> nodes = adminClient.describeCluster().nodes().get();
if (nodes == null || nodes.isEmpty()) {
return false;
}
return true;
} catch (InterruptedException | ExecutionException e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
return false;
}
}

/**
* Polls and processes the requests from Kafka. This method returns false when the consumer needs
* to be shutdown i.e. when there's a wakeup exception.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.noggit.ObjectBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -341,7 +342,7 @@ public void testParallelUpdatesToCluster2() throws Exception {

@Test
@SuppressWarnings({"unchecked"})
public void testMetrics() throws Exception {
public void testMetricsAndHealthcheck() throws Exception {
CloudSolrClient client = solrCluster1.getSolrClient();
SolrInputDocument doc = new SolrInputDocument();
doc.addField("id", String.valueOf(new Date().getTime()));
Expand All @@ -359,13 +360,27 @@ public void testMetrics() throws Exception {
HttpJettySolrClient httpJettySolrClient =
new HttpJettySolrClient.Builder(baseUrl).useHttp1_1(true).build();
try {
// test the metrics endpoint
GenericSolrRequest req = new GenericSolrRequest(SolrRequest.METHOD.GET, "/metrics");
req.setResponseParser(new InputStreamResponseParser(null));
NamedList<Object> rsp = httpJettySolrClient.request(req);
String content =
IOUtils.toString(
(InputStream) rsp.get(InputStreamResponseParser.STREAM_KEY), StandardCharsets.UTF_8);
assertTrue(content, content.contains("crossdc_consumer_output_total"));

// test the healtcheck endpoint
req = new GenericSolrRequest(SolrRequest.METHOD.GET, "/health");
req.setResponseParser(new InputStreamResponseParser(null));
rsp = httpJettySolrClient.request(req);
content =
IOUtils.toString(
(InputStream) rsp.get(InputStreamResponseParser.STREAM_KEY), StandardCharsets.UTF_8);
assertEquals(Integer.valueOf(200), rsp.get("responseStatus"));
Map<String, Object> map = (Map<String, Object>) ObjectBuilder.fromJSON(content);
assertEquals(Boolean.TRUE, map.get("kafka"));
assertEquals(Boolean.TRUE, map.get("solr"));
assertEquals(Boolean.TRUE, map.get("running"));
} finally {
httpJettySolrClient.close();
client.close();
Expand Down