Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -132,15 +132,15 @@ protected void handleStopPartitionSession(StopPartitionSessionEvent event) {
}

@Override
protected void handleClosePartitionSession(PartitionSession partition) {
handlerExecutor.execute(() -> {
protected CompletableFuture<Void> handleClosePartitionSession(PartitionSession partition) {
return CompletableFuture.runAsync(() -> {
try {
eventHandler.onPartitionSessionClosed(new PartitionSessionClosedEventImpl(partition));
} catch (Throwable th) {
logUserThrowableAndStopWorking(th, "onPartitionSessionClosed");
throw th;
}
});
}, handlerExecutor);
}

protected CompletableFuture<Void> handleReaderClosed() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,14 @@ protected void onStop() {
sessions.values().forEach(ReadPartitionSession::shutdown);
sessions.clear();

partitions.values().forEach(reader::handleClosePartitionSession);
List<CompletableFuture<Void>> closeFutures = new ArrayList<>();

partitions.values().forEach(partitionSession ->
closeFutures.add(reader.handleClosePartitionSession(partitionSession))
);

CompletableFuture.allOf(closeFutures.toArray(new CompletableFuture[0])).join();

partitions.clear();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public ReaderImpl(TopicRpc topicRpc, ReaderSettings settings, @Nonnull CodecRegi
protected abstract void handleCommitResponse(long committedOffset, PartitionSession partition);
protected abstract void handleStartPartitionSessionRequest(StartPartitionSessionEvent event);
protected abstract void handleStopPartitionSession(StopPartitionSessionEvent event);
protected abstract void handleClosePartitionSession(PartitionSession partition);
protected abstract CompletableFuture<Void> handleClosePartitionSession(PartitionSession partition);

@Override
protected Logger getLogger() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,9 +192,10 @@ protected void handleStopPartitionSession(StopPartitionSessionEvent event) {
}

@Override
protected void handleClosePartitionSession(PartitionSession partition) {
protected CompletableFuture<Void> handleClosePartitionSession(PartitionSession partition) {
// TODO: clean reading queue
logger.debug("ClosePartitionSession event received. Ignoring.");
return CompletableFuture.completedFuture(null);
}

@Override
Expand Down
Loading
Loading