Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ public EnabledState(final boolean enabled) {
this.enabled = enabled;
}

/** @return Time to (re-)enable */
/** @return Time to (re-)enable, or null if no date is set */
public String getDateString() {
return enabled_date.format(formatter);
return enabled_date != null ? enabled_date.format(formatter) : "";
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,8 @@
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalUnit;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;

import org.apache.kafka.clients.consumer.ConsumerRecord;
Expand All @@ -18,6 +16,7 @@
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Transformer;
Expand Down Expand Up @@ -48,6 +47,11 @@ public class AlarmCmdLogger implements Runnable {

private IndexNameHelper indexNameHelper;

private volatile boolean shouldReconnect = true;
private volatile KafkaStreams currentStreams = null;
private final long reconnectDelayMs;
private Thread shutdownHook = null;

/**
* Create a alarm command message logger for the given topic.
* This runnable will create the kafka streams for the given alarm messages which match the format 'topicCommand'
Expand All @@ -60,25 +64,90 @@ public AlarmCmdLogger(String topic) throws Exception {

MessageParser<AlarmCommandMessage> messageParser = new MessageParser<AlarmCommandMessage>(AlarmCommandMessage.class);
alarmCommandMessageSerde = Serdes.serdeFrom(messageParser, messageParser);

// Read reconnect delay from system property, default to 30 seconds
this.reconnectDelayMs = Long.parseLong(
System.getProperty("kafka.reconnect.delay.ms", "30000")
);
}

@Override
public void run() {

logger.info("Starting the cmd stream consumer for " + topic);

Properties props = new Properties();
props.putAll(PropertiesHelper.getProperties());

final String indexDateSpanUnits = props.getProperty("date_span_units");
final boolean useDatedIndexNames = Boolean.parseBoolean(props.getProperty("use_dated_index_names"));

try {
indexNameHelper = new IndexNameHelper(topic + INDEX_FORMAT, useDatedIndexNames, indexDateSpanUnits);
} catch (Exception ex) {
logger.log(Level.SEVERE, "Time based index creation failed.", ex);
}

// Register shutdown hook once before retry loop
shutdownHook = new Thread("streams-" + topic + "-alarm-cmd-shutdown-hook") {
@Override
public void run() {
logger.info("Shutdown hook triggered for topic " + topic);
shouldReconnect = false;
if (currentStreams != null) {
logger.info("Closing Kafka Streams for topic " + topic);
currentStreams.close(Duration.of(10, ChronoUnit.SECONDS));
currentStreams = null;
}
logger.info("Shutting cmd streams down for topic " + topic);
}
};
Runtime.getRuntime().addShutdownHook(shutdownHook);

// Retry loop for handling missing topics
while (shouldReconnect) {
try {
startKafkaStreams(props);
// If we get here, streams shut down normally
break;
} catch (Exception e) {
logger.log(Level.SEVERE, "Failed to start Kafka Streams for topic " + topic +
", will retry in " + reconnectDelayMs + "ms", e);

if (!shouldReconnect) {
break;
}

try {
Thread.sleep(reconnectDelayMs);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
logger.info("Reconnection loop interrupted for topic " + topic);
break;
}
}
}

// Clean up shutdown hook when we're done
try {
if (shutdownHook != null) {
Runtime.getRuntime().removeShutdownHook(shutdownHook);
shutdownHook = null;
}
} catch (IllegalStateException e) {
// Ignore - shutdown already in progress
}

logger.info("Alarm cmd logger for topic " + topic + " has shut down");
}

private void startKafkaStreams(Properties props) throws Exception {
logger.info("Attempting to start Kafka Streams for topic " + topic);

Properties kafkaProps = KafkaHelper.loadPropsFromFile(props.getProperty("kafka_properties",""));
kafkaProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-" + topic + "-alarm-cmd");

if (props.containsKey(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG)){
kafkaProps.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
props.get(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG));
} else {
kafkaProps.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
}
kafkaProps.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
props.getOrDefault(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"));

StreamsBuilder builder = new StreamsBuilder();
KStream<String, AlarmCommandMessage> alarms = builder.stream(topic + "Command", Consumed
Expand All @@ -91,14 +160,6 @@ public long extract(ConsumerRecord<Object, Object> record, long previousTimestam
}
}));

final String indexDateSpanUnits = props.getProperty("date_span_units");
final boolean useDatedIndexNames = Boolean.parseBoolean(props.getProperty("use_dated_index_names"));

try {
indexNameHelper = new IndexNameHelper(topic + INDEX_FORMAT, useDatedIndexNames, indexDateSpanUnits);
} catch (Exception ex) {
logger.log(Level.SEVERE, "Time based index creation failed.", ex);
}
KStream<String, AlarmCommandMessage> timeStampedAlarms = alarms.transform(new TransformerSupplier<String, AlarmCommandMessage, KeyValue<String,AlarmCommandMessage>>() {

@Override
Expand Down Expand Up @@ -132,27 +193,58 @@ public void close() {
String topic_name = indexNameHelper.getIndexName(v.getMessage_time());
ElasticClientHelper.getInstance().indexAlarmCmdDocument(topic_name, v);
});

final KafkaStreams streams = new KafkaStreams(builder.build(), kafkaProps);
final CountDownLatch latch = new CountDownLatch(1);

// attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new Thread("streams-" + topic + "-alarm-cmd-shutdown-hook") {
@Override
public void run() {
streams.close(Duration.of(10, ChronoUnit.SECONDS));
System.out.println("\nShutting cmd streams Done.");
// Store reference for cleanup (volatile ensures visibility across threads)
currentStreams = streams;

streams.setUncaughtExceptionHandler(exception -> {
logger.log(Level.SEVERE, "Stream exception encountered for topic " + topic + ": " +
exception.getMessage(), exception);

// Check if it's a missing source topic exception
if (exception.getCause() instanceof org.apache.kafka.streams.errors.MissingSourceTopicException ||
exception instanceof org.apache.kafka.streams.errors.MissingSourceTopicException) {
logger.log(Level.WARNING, "Missing source topic detected for " + topic +
". Will retry connection in " + reconnectDelayMs + "ms");
return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
}

// For other exceptions, stop retry
logger.log(Level.SEVERE, "Unrecoverable stream exception for topic " + topic, exception);
shouldReconnect = false;
return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
});

// Simple latch to wait for streams to stop
final CountDownLatch latch = new CountDownLatch(1);
streams.setStateListener((newState, oldState) -> {
if (newState == KafkaStreams.State.NOT_RUNNING || newState == KafkaStreams.State.ERROR) {
latch.countDown();
}
});

try {
streams.start();
logger.info("Kafka Streams started for topic " + topic);

// Wait for streams to stop (either due to exception or shutdown)
latch.await();
} catch (Throwable e) {
System.exit(1);
}
System.exit(0);

// If stopped due to error, throw to trigger retry
if (streams.state() == KafkaStreams.State.ERROR) {
throw new Exception("Streams stopped with ERROR state");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new Exception("Interrupted", e);
} finally {
if (currentStreams != null) {
currentStreams.close(Duration.of(10, ChronoUnit.SECONDS));
currentStreams = null;
}
}
}

}
Loading