Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.openmetadata.service.apps.bundles.changeEvent.Destination;
import org.openmetadata.service.events.errors.EventPublisherException;
import org.openmetadata.service.exception.CatalogExceptionMessage;
import org.openmetadata.service.exception.EntityNotFoundException;
import org.openmetadata.service.formatter.decorators.FeedMessageDecorator;
import org.openmetadata.service.jdbi3.FeedRepository;
import org.openmetadata.service.notifications.recipients.context.Recipient;
Expand Down Expand Up @@ -70,6 +71,12 @@ public void sendMessage(ChangeEvent changeEvent, Set<Recipient> recipients)
}
}
}
} catch (EntityNotFoundException e) {
LOG.debug(
"Skipping activity feed for {} {} - entity {} was deleted before processing",
changeEvent.getEventType(),
changeEvent.getEntityType(),
changeEvent.getEntityId());
Comment thread
gitar-bot[bot] marked this conversation as resolved.
} catch (Exception ex) {
String message =
CatalogExceptionMessage.eventPublisherFailedToPublish(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,12 @@ public void sendMessage(ChangeEvent event, Set<Recipient> recipients)
retry, () -> WorkflowHandler.getInstance().triggerWithSignal(signal, variables))
.run();
}
} catch (EntityNotFoundException e) {
LOG.debug(
"Skipping workflow event for {} {} - entity {} was deleted before processing",
eventType,
entityType,
event.getEntityId());
} catch (Exception exc) {
LOG.error("WorkflowEventConsumer - Error processing event", exc);
String message =
Expand Down Expand Up @@ -213,7 +219,7 @@ public static Map<String, Object> defaultHandler(ChangeEvent event) {
} catch (EntityNotFoundException e) {
// Entity was deleted between event creation and processing - skip workflow trigger
LOG.debug(
"Skipping workflow trigger for event {} on {} - entity {} no longer exists",
"Skipping workflow trigger for event {} on {} - entity {} no longer exists",
eventType,
entityType,
event.getEntityFullyQualifiedName());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Copyright 2024 Collate
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.openmetadata.service.apps.bundles.changeEvent.feed;

import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.when;

import java.util.Collections;
import java.util.UUID;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.MockedStatic;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
import org.openmetadata.schema.entity.events.EventSubscription;
import org.openmetadata.schema.entity.events.SubscriptionDestination;
import org.openmetadata.schema.type.ChangeEvent;
import org.openmetadata.schema.type.EventType;
import org.openmetadata.service.events.errors.EventPublisherException;
import org.openmetadata.service.exception.EntityNotFoundException;
import org.openmetadata.service.util.FeedUtils;

@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
class ActivityFeedPublisherTest {

@Mock private EventSubscription eventSubscription;
@Mock private SubscriptionDestination subscriptionDestination;

private ActivityFeedPublisher publisher;

@BeforeEach
void setUp() {
when(subscriptionDestination.getType())
.thenReturn(SubscriptionDestination.SubscriptionType.ACTIVITY_FEED);
when(subscriptionDestination.getId()).thenReturn(UUID.randomUUID());

publisher = new ActivityFeedPublisher(eventSubscription, subscriptionDestination);
}

@Test
void testSendMessage_SkipsGracefullyWhenEntityDeleted() {
ChangeEvent event = createChangeEvent(EventType.ENTITY_CREATED);

try (MockedStatic<FeedUtils> mockedFeedUtils = mockStatic(FeedUtils.class)) {
mockedFeedUtils
.when(() -> FeedUtils.getThreadWithMessage(any(), any()))
.thenThrow(EntityNotFoundException.byMessage("table instance for test-id not found"));

assertDoesNotThrow(() -> publisher.sendMessage(event, Collections.emptySet()));
}
}

@Test
void testSendMessage_ThrowsOnNonEntityNotFoundException() {
ChangeEvent event = createChangeEvent(EventType.ENTITY_CREATED);

try (MockedStatic<FeedUtils> mockedFeedUtils = mockStatic(FeedUtils.class)) {
mockedFeedUtils
.when(() -> FeedUtils.getThreadWithMessage(any(), any()))
.thenThrow(new RuntimeException("unexpected error"));

assertThrows(
EventPublisherException.class,
() -> publisher.sendMessage(event, Collections.emptySet()));
}
}

private ChangeEvent createChangeEvent(EventType eventType) {
ChangeEvent event = new ChangeEvent();
event.setEventType(eventType);
event.setEntityType("table");
event.setEntityId(UUID.randomUUID());
event.setEntityFullyQualifiedName("test.db.schema.table");
event.setUserName("admin");
return event;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.openmetadata.schema.type.EventType;
import org.openmetadata.service.Entity;
import org.openmetadata.service.events.errors.EventPublisherException;
import org.openmetadata.service.exception.EntityNotFoundException;

@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
Expand Down Expand Up @@ -250,6 +251,45 @@ void testSendMessage_FailsAfterMaxRetries() throws Exception {
}
}

@Test
void testSendMessage_SkipsGracefullyWhenEntityDeleted() throws Exception {
ChangeEvent event = createChangeEvent("admin", EventType.ENTITY_CREATED);

try (MockedStatic<WorkflowHandler> mockedHandler = mockStatic(WorkflowHandler.class);
MockedStatic<Entity> mockedEntity = mockStatic(Entity.class)) {

mockedHandler.when(WorkflowHandler::getInstance).thenReturn(workflowHandler);
mockedEntity
.when(() -> Entity.getEntityReferenceById(anyString(), any(UUID.class), any()))
.thenThrow(EntityNotFoundException.byMessage("table instance for test-id not found"));

assertDoesNotThrow(() -> consumer.sendMessage(event, Collections.emptySet()));

verify(workflowHandler, never()).triggerWithSignal(anyString(), anyMap());
}
}

@Test
void testSendMessage_SafetyNetCatchesEntityNotFoundFromTrigger() throws Exception {
ChangeEvent event = createChangeEvent("admin", EventType.ENTITY_UPDATED);
EntityReference entityRef = createEntityReference();

try (MockedStatic<WorkflowHandler> mockedHandler = mockStatic(WorkflowHandler.class);
MockedStatic<Entity> mockedEntity = mockStatic(Entity.class)) {

mockedHandler.when(WorkflowHandler::getInstance).thenReturn(workflowHandler);
mockedEntity
.when(() -> Entity.getEntityReferenceById(anyString(), any(UUID.class), any()))
.thenReturn(entityRef);

doThrow(EntityNotFoundException.byMessage("table instance for test-id not found"))
.when(workflowHandler)
.triggerWithSignal(anyString(), anyMap());

assertDoesNotThrow(() -> consumer.sendMessage(event, Collections.emptySet()));
}
}

@Test
void testSendMessage_SkipsInvalidEventTypes() throws Exception {
ChangeEvent event = createChangeEvent("admin", EventType.ENTITY_DELETED);
Expand Down
Loading