Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

public class ClusterEventService extends AbstractExecutionThreadService {
private static final Logger LOG = LoggerFactory.getLogger(ClusterEventService.class);
Expand All @@ -64,7 +63,6 @@ public class ClusterEventService extends AbstractExecutionThreadService {
private final ObjectMapper objectMapper;
private final EventBus serverEventBus;
private final RestrictedChainingClassLoader chainingClassLoader;
private final AtomicReference<MongoCursor<ClusterEvent>> eventsCursor = new AtomicReference<>();
private Offset offset;

@Inject
Expand Down Expand Up @@ -123,9 +121,9 @@ protected void run() {
while (isRunning()) {
final var events = eventsIterable(this.offset)
.cursorType(CursorType.TailableAwait)
.maxAwaitTime(1, TimeUnit.SECONDS)
.noCursorTimeout(true);
try (final var cursor = events.iterator()) {
this.eventsCursor.set(cursor);
if (!isRunning()) {
return;
}
Expand All @@ -149,20 +147,24 @@ protected void run() {
@VisibleForTesting
void iterateEvents(MongoCursor<ClusterEvent> cursor) {
LOG.debug("Opened MongoDB cursor on \"{}\"", COLLECTION_NAME);
while (cursor.hasNext()) {
while (isRunning()) {
final var clusterEvent = cursor.tryNext();
if (clusterEvent != null) {
LOG.trace("Processing cluster event: {}", clusterEvent);
Object payload = extractPayload(clusterEvent.payload(), clusterEvent.eventClass());
if (payload != null) {
serverEventBus.post(payload);
} else {
LOG.warn("Couldn't extract payload of cluster event with ID <{}>", clusterEvent.id());
LOG.debug("Invalid payload in cluster event: {}", clusterEvent);
if (clusterEvent == null) {
if (cursor.getServerCursor() == null) {
return;
}

this.offset = new Offset(clusterEvent.timestamp(), clusterEvent.id());
continue;
}
LOG.trace("Processing cluster event: {}", clusterEvent);
Object payload = extractPayload(clusterEvent.payload(), clusterEvent.eventClass());
if (payload != null) {
serverEventBus.post(payload);
} else {
LOG.warn("Couldn't extract payload of cluster event with ID <{}>", clusterEvent.id());
LOG.debug("Invalid payload in cluster event: {}", clusterEvent);
}

this.offset = new Offset(clusterEvent.timestamp(), clusterEvent.id());
}
}

Expand Down Expand Up @@ -226,11 +228,4 @@ private Object extractPayload(Object payload, String eventClass) {
return null;
}

@Override
protected void triggerShutdown() {
final var cursor = this.eventsCursor.get();
if (cursor != null) {
cursor.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,17 @@

import java.util.Date;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
@ExtendWith(MongoDBExtension.class)
Expand Down Expand Up @@ -131,7 +132,15 @@ public void runHandlesInvalidPayloadsGracefully() throws Exception {
}

private void runService() {
clusterEventService.iterateEvents(clusterEventService.eventsIterable(initialOffset).iterator());
runService(initialOffset);
}

private void runService(Offset offset) {
clusterEventService = spy(clusterEventService);
when(clusterEventService.isRunning()).thenReturn(true);
try (final var cursor = clusterEventService.eventsIterable(offset).iterator()) {
clusterEventService.iterateEvents(cursor);
}
}

@Test
Expand Down Expand Up @@ -165,7 +174,7 @@ public void cursorReopenDoesNotReprocessLastEvent() {

// Simulate a cursor reopen from the offset of the just-processed event.
final var offsetAfterProcessing = new Offset(TIME.toDate(), lastId);
clusterEventService.iterateEvents(clusterEventService.eventsIterable(offsetAfterProcessing).iterator());
runService(offsetAfterProcessing);

assertThat(handler.invocations).hasValue(0);
verify(serverEventBus, never()).post(any(SimpleEvent.class));
Expand Down
Loading