Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2905,7 +2905,7 @@ def encumber_home_jurisdiction_license_privileges(
compact: str,
provider_id: str,
jurisdiction: str,
adverse_action_id: str,
adverse_action_id: UUID,
license_type_abbreviation: str,
effective_date: date,
) -> list[PrivilegeData]:
Expand All @@ -2918,7 +2918,7 @@ def encumber_home_jurisdiction_license_privileges(
:param str compact: The compact name.
:param str provider_id: The provider ID.
:param str jurisdiction: The jurisdiction of the license.
:param adverse_action_id: The ID of the adverse action.
:param UUID adverse_action_id: The ID of the adverse action.
:param str license_type_abbreviation: The license type abbreviation.
:param date effective_date: effective date of the encumbrance on the license and therefore privilege.
:return: List of privileges that were encumbered
Expand Down Expand Up @@ -2992,6 +2992,27 @@ def encumber_home_jurisdiction_license_privileges(
)

for privilege_data in unencumbered_privileges_associated_with_license:
# Check if an update record already exists for this adverse action
# to avoid creating duplicate update records if the event flow is re-run
existing_updates = provider_user_records.get_update_records_for_privilege(
jurisdiction=privilege_data.jurisdiction,
license_type=privilege_data.licenseType,
filter_condition=lambda update: (
update.updateType == UpdateCategory.ENCUMBRANCE
and update.encumbranceDetails is not None
and update.encumbranceDetails.get('adverseActionId') == adverse_action_id
),
)

if existing_updates:
logger.info(
'Update record already exists for this adverse action. Skipping duplicate creation.',
privilege_jurisdiction=privilege_data.jurisdiction,
privilege_license_type=privilege_data.licenseType,
adverse_action_id=adverse_action_id,
)
continue

now = config.current_standard_datetime

# Create privilege update record
Expand Down Expand Up @@ -3025,6 +3046,27 @@ def encumber_home_jurisdiction_license_privileges(
)

for encumbered_privilege in previously_encumbered_privileges_associated_with_license:
# Check if an update record already exists for this adverse action
# to avoid creating duplicate update records if the event flow is re-run
existing_updates = provider_user_records.get_update_records_for_privilege(
jurisdiction=encumbered_privilege.jurisdiction,
license_type=encumbered_privilege.licenseType,
filter_condition=lambda update: (
update.updateType == UpdateCategory.ENCUMBRANCE
and update.encumbranceDetails is not None
and update.encumbranceDetails.get('adverseActionId') == adverse_action_id
),
)

if existing_updates:
logger.info(
'Update record already exists for this adverse action. Skipping duplicate creation.',
privilege_jurisdiction=encumbered_privilege.jurisdiction,
privilege_license_type=encumbered_privilege.licenseType,
adverse_action_id=adverse_action_id,
)
continue

now = config.current_standard_datetime

# Create privilege update record
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
timedelta,
)
from enum import StrEnum
from uuid import UUID

from cc_common.config import config, logger
from cc_common.data_model.schema.adverse_action import AdverseActionData
Expand Down Expand Up @@ -534,11 +535,11 @@ def get_adverse_action_records_for_license(
and (filter_condition is None or filter_condition(record))
]

def get_adverse_action_by_id(self, adverse_action_id: str) -> AdverseActionData | None:
def get_adverse_action_by_id(self, adverse_action_id: UUID) -> AdverseActionData | None:
"""
Get an adverse action record by its ID.

:param str adverse_action_id: The ID of the adverse action to find
:param UUID adverse_action_id: The ID of the adverse action to find
:return: The found adverse action record if found, else None
"""
return next(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3033,6 +3033,240 @@ def test_license_encumbrance_lifting_notification_listener_skips_notifications_w
mock_provider_email.assert_not_called()
mock_state_email.assert_not_called()

@patch('cc_common.event_bus_client.EventBusClient._publish_event')
def test_license_encumbrance_listener_does_not_create_duplicate_update_records_for_unencumbered_privileges_on_retry(
self,
_mock_publish_event,
):
"""Test that license encumbrance event does not create duplicate update records
when re-run for unencumbered privileges."""
from cc_common.data_model.schema.common import UpdateCategory
from handlers.encumbrance_events import license_encumbrance_listener

# Set up test data
self.test_data_generator.put_default_provider_record_in_provider_table()
privilege = self.test_data_generator.put_default_privilege_record_in_provider_table(
value_overrides={
'licenseJurisdiction': DEFAULT_LICENSE_JURISDICTION,
'licenseTypeAbbreviation': DEFAULT_LICENSE_TYPE_ABBREVIATION,
'encumberedStatus': 'unencumbered',
'jurisdiction': 'ne',
}
)

# Add adverse action for license
self.test_data_generator.put_default_adverse_action_record_in_provider_table(
value_overrides={'actionAgainst': 'license'}
)

message = self._generate_license_encumbrance_message()
event = self._create_sqs_event(message)

# Execute the handler FIRST TIME
license_encumbrance_listener(event, self.mock_context)

# Verify privilege update record was created using the test helper
update_records = self.test_data_generator.query_privilege_update_records_for_given_record_from_database(
privilege
)
matching_updates = [
update
for update in update_records
if update.updateType == UpdateCategory.ENCUMBRANCE
and update.encumbranceDetails is not None
and update.encumbranceDetails.get('adverseActionId') == UUID(DEFAULT_ADVERSE_ACTION_ID)
]

self.assertEqual(1, len(matching_updates))
first_update_record = matching_updates[0]
self.assertEqual(UpdateCategory.ENCUMBRANCE, first_update_record.updateType)
self.assertEqual({'encumberedStatus': 'licenseEncumbered'}, first_update_record.updatedValues)
self.assertEqual(UUID(DEFAULT_ADVERSE_ACTION_ID), first_update_record.encumbranceDetails.get('adverseActionId'))

# Execute the handler SECOND TIME (simulating re-run of same event)
license_encumbrance_listener(event, self.mock_context)

# Verify STILL only one update record exists (no duplicate created)
update_records_after_retry = (
self.test_data_generator.query_privilege_update_records_for_given_record_from_database(
privilege
)
)
matching_updates_after_retry = [
update
for update in update_records_after_retry
if update.updateType == UpdateCategory.ENCUMBRANCE
and update.encumbranceDetails is not None
and update.encumbranceDetails.get('adverseActionId') == UUID(DEFAULT_ADVERSE_ACTION_ID)
]

self.assertEqual(1, len(matching_updates_after_retry))
# Verify it's the same record (same createDate as a proxy for same record)
self.assertEqual(first_update_record.createDate, matching_updates_after_retry[0].createDate)

@patch('cc_common.event_bus_client.EventBusClient._publish_event')
def test_license_encumbrance_listener_does_not_create_duplicate_update_records_for_already_encumbered_privileges(
self,
_mock_publish_event,
):
"""Test that license encumbrance event does not create duplicate update records when
re-run for already encumbered privileges."""
from cc_common.data_model.schema.common import UpdateCategory
from handlers.encumbrance_events import license_encumbrance_listener

# Set up test data
self.test_data_generator.put_default_provider_record_in_provider_table()
privilege = self.test_data_generator.put_default_privilege_record_in_provider_table(
value_overrides={
'licenseJurisdiction': DEFAULT_LICENSE_JURISDICTION,
'licenseTypeAbbreviation': DEFAULT_LICENSE_TYPE_ABBREVIATION,
'encumberedStatus': 'encumbered', # Already encumbered by a different adverse action
'jurisdiction': 'ky',
}
)

# Add adverse action for license
self.test_data_generator.put_default_adverse_action_record_in_provider_table(
value_overrides={'actionAgainst': 'license'}
)

message = self._generate_license_encumbrance_message()
event = self._create_sqs_event(message)

# Execute the handler FIRST TIME
license_encumbrance_listener(event, self.mock_context)

# Verify privilege update record was created (even though privilege was already encumbered)
# using the test helper
update_records = self.test_data_generator.query_privilege_update_records_for_given_record_from_database(
privilege
)
matching_updates = [
update
for update in update_records
if update.updateType == UpdateCategory.ENCUMBRANCE
and update.encumbranceDetails is not None
and update.encumbranceDetails.get('adverseActionId') == UUID(DEFAULT_ADVERSE_ACTION_ID)
]

self.assertEqual(1, len(matching_updates))
first_update_record = matching_updates[0]
self.assertEqual(UpdateCategory.ENCUMBRANCE, first_update_record.updateType)
# For already encumbered privileges, updatedValues is empty but we still track the encumbrance event
self.assertEqual({}, first_update_record.updatedValues)
self.assertEqual(UUID(DEFAULT_ADVERSE_ACTION_ID), first_update_record.encumbranceDetails.get('adverseActionId'))

# Execute the handler SECOND TIME (simulating re-run of same event)
license_encumbrance_listener(event, self.mock_context)

# Verify STILL only one update record exists (no duplicate created)
update_records_after_retry = (
self.test_data_generator.query_privilege_update_records_for_given_record_from_database(
privilege
)
)
matching_updates_after_retry = [
update
for update in update_records_after_retry
if update.updateType == UpdateCategory.ENCUMBRANCE
and update.encumbranceDetails is not None
and update.encumbranceDetails.get('adverseActionId') == UUID(DEFAULT_ADVERSE_ACTION_ID)
]

self.assertEqual(1, len(matching_updates_after_retry))
# Verify it's the same record (same createDate as a proxy for same record)
self.assertEqual(first_update_record.createDate, matching_updates_after_retry[0].createDate)

@patch('cc_common.event_bus_client.EventBusClient._publish_event')
def test_license_encumbrance_lifted_listener_does_not_create_duplicate_update_records_on_retry(
self, _mock_publish_event
):
"""Test that license encumbrance lifting event does not create duplicate update records when re-run.

This test confirms that the early return logic when no LICENSE_ENCUMBERED privileges are found
prevents duplicate update record creation on retry.
"""
from cc_common.data_model.schema.common import PrivilegeEncumberedStatusEnum, UpdateCategory
from handlers.encumbrance_events import license_encumbrance_lifted_listener

# Set up test data
self.test_data_generator.put_default_provider_record_in_provider_table()

# Set up license record that is unencumbered (all adverse actions lifted)
self.test_data_generator.put_default_license_record_in_provider_table(
value_overrides={
'jurisdiction': DEFAULT_LICENSE_JURISDICTION,
'licenseType': DEFAULT_LICENSE_TYPE,
'encumberedStatus': 'unencumbered',
}
)

privilege = self.test_data_generator.put_default_privilege_record_in_provider_table(
value_overrides={
'licenseJurisdiction': DEFAULT_LICENSE_JURISDICTION,
'licenseTypeAbbreviation': DEFAULT_LICENSE_TYPE_ABBREVIATION,
'encumberedStatus': 'licenseEncumbered', # Will be lifted
}
)

# Add adverse action with effectiveLiftDate set
self.test_data_generator.put_default_adverse_action_record_in_provider_table(
value_overrides={
'actionAgainst': 'license',
'effectiveLiftDate': date.fromisoformat(DEFAULT_EFFECTIVE_DATE),
'jurisdiction': DEFAULT_LICENSE_JURISDICTION,
'licenseTypeAbbreviation': DEFAULT_LICENSE_TYPE_ABBREVIATION,
'licenseType': DEFAULT_LICENSE_TYPE,
}
)

message = self._generate_license_encumbrance_lifting_message()
event = self._create_sqs_event(message)

# Execute the handler FIRST TIME
license_encumbrance_lifted_listener(event, self.mock_context)

# Verify privilege is now unencumbered
provider_records = self.config.data_client.get_provider_user_records(
compact=DEFAULT_COMPACT,
provider_id=DEFAULT_PROVIDER_ID,
)
privileges = provider_records.get_privilege_records()
self.assertEqual(1, len(privileges))
self.assertEqual(PrivilegeEncumberedStatusEnum.UNENCUMBERED, privileges[0].encumberedStatus)

# Verify privilege update record was created using the test helper
update_records = self.test_data_generator.query_privilege_update_records_for_given_record_from_database(
privilege
)
matching_updates = [
update for update in update_records if update.updateType == UpdateCategory.LIFTING_ENCUMBRANCE
]

self.assertEqual(1, len(matching_updates))
first_update_record = matching_updates[0]
self.assertEqual(UpdateCategory.LIFTING_ENCUMBRANCE, first_update_record.updateType)
self.assertEqual({'encumberedStatus': 'unencumbered'}, first_update_record.updatedValues)

# Execute the handler SECOND TIME (simulating re-run of same event)
license_encumbrance_lifted_listener(event, self.mock_context)

# Verify STILL only one update record exists (no duplicate created)
# license_encumbrance_lifted_listener will skip creating privilege updates because it only
# does so on LICENSE_ENCUMBERED privileges and none of those would remain
update_records_after_retry = (
self.test_data_generator.query_privilege_update_records_for_given_record_from_database(
privilege
)
)
matching_updates_after_retry = [
update for update in update_records_after_retry if update.updateType == UpdateCategory.LIFTING_ENCUMBRANCE
]

self.assertEqual(1, len(matching_updates_after_retry))
# Verify it's the same record (same createDate as a proxy for same record)
self.assertEqual(first_update_record.createDate, matching_updates_after_retry[0].createDate)

@patch('cc_common.email_service_client.EmailServiceClient.send_license_encumbrance_state_notification_email')
@patch('cc_common.email_service_client.EmailServiceClient.send_license_encumbrance_provider_notification_email')
def test_license_encumbrance_notification_listener_skips_already_sent_notifications_and_retries_failed(
Expand Down Expand Up @@ -3149,3 +3383,4 @@ def test_license_encumbrance_notification_listener_creates_notification_events_t
self.assertEqual(expected_sks, list(notification_records.keys()))
for sk in expected_sks:
self.assertEqual(NotificationStatus.SUCCESS, notification_records.get(sk).get('status'))

Loading