Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ def provider_update_ingest_handler(records: list[dict]) -> dict:
# Track which message IDs correspond to which compact/provider for failure reporting
record_mapping: dict[str, tuple[str, str]] = {} # message_id -> (compact, provider_id)

batch_item_failures = []

# Extract compact and providerId from each record
for record in records:
message_id = record['messageId']
Expand All @@ -60,22 +62,31 @@ def provider_update_ingest_handler(records: list[dict]) -> dict:
image = stream_record.get('dynamodb', {}).get('NewImage') or stream_record.get('dynamodb', {}).get('OldImage')

if not image:
logger.error('Record has no image data', message_id=message_id)
logger.warning('Record has no image data', message_id=message_id)
batch_item_failures.append({'itemIdentifier': message_id})
continue

# Extract compact and providerId from the DynamoDB image
# The format is {'S': 'value'} for string attributes
deserialized_image = TypeDeserializer().deserialize(value={'M': image})

# Check if this is a privilege counter record (used to track the next privilege number)
pk = deserialized_image.get('pk', '')
if pk and 'PRIVILEGE_COUNT' in pk:
logger.info('Skipping privilege count record', message_id=message_id, pk=pk)
continue

compact = deserialized_image.get('compact')
provider_id = deserialized_image.get('providerId')
record_type = deserialized_image.get('type')

if not compact or not provider_id:
logger.error(
logger.warning(
'Record missing required fields',
record_type=record_type,
message_id=message_id,
)
batch_item_failures.append({'itemIdentifier': message_id})
continue

# Add to the appropriate compact's set to dedup provider ids
Expand All @@ -86,7 +97,6 @@ def provider_update_ingest_handler(records: list[dict]) -> dict:
logger.warning('Unknown compact in record', compact=compact, provider_id=provider_id)

# Process providers and bulk index by compact
batch_item_failures = []
failed_providers: dict[str, set] = {compact: set() for compact in config.compacts}

for compact, provider_ids in providers_by_compact.items():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -789,3 +789,60 @@ def test_bulk_delete_404_not_found_does_not_return_batch_item_failure(self, mock

# Verify NO batch item failures - 404 is not treated as an error
self.assertEqual({'batchItemFailures': []}, result)

def _create_privilege_count_stream_record(self) -> dict:
"""Create a DynamoDB stream record for privilege count record which tracks numbers to issue."""

return {
'eventID': '6827962b93fa0431321996a5cb9d245c',
'eventName': 'MODIFY',
'eventVersion': '1.1',
'eventSource': 'aws:dynamodb',
'awsRegion': 'us-east-1',
'dynamodb': {
'ApproximateCreationDateTime': 1767934990,
'Keys': {'sk': {'S': 'coun#PRIVILEGE_COUNT'}, 'pk': {'S': 'coun#PRIVILEGE_COUNT'}},
'NewImage': {
'privilegeCount': {'N': '322'},
'sk': {'S': 'coun#PRIVILEGE_COUNT'},
'pk': {'S': 'coun#PRIVILEGE_COUNT'},
},
'OldImage': {
'privilegeCount': {'N': '321'},
'sk': {'S': 'coun#PRIVILEGE_COUNT'},
'pk': {'S': 'coun#PRIVILEGE_COUNT'},
},
'SequenceNumber': '1275112700001832307719225248',
'SizeBytes': 166,
'StreamViewType': 'NEW_AND_OLD_IMAGES',
},
'eventSourceARN': 'some-source-arn',
}

@patch('handlers.provider_update_ingest.opensearch_client')
def test_privilege_count_record_skipped_without_batch_item_failure(self, mock_opensearch_client):
from handlers.provider_update_ingest import provider_update_ingest_handler

# Set up mock OpenSearch client
self._when_testing_mock_opensearch_client(mock_opensearch_client)

# Create an SQS event with a privilege count DynamoDB stream record
event = {
'Records': [
{
'messageId': '12345',
'body': json.dumps(self._create_privilege_count_stream_record()),
}
]
}

# Run the handler
mock_context = MagicMock()
result = provider_update_ingest_handler(event, mock_context)

# Verify that OpenSearch was never called (no provider to index)
mock_opensearch_client.bulk_index.assert_not_called()
mock_opensearch_client.bulk_delete.assert_not_called()

# Verify no batch item failures (privilege count records are skipped, not failed)
self.assertEqual({'batchItemFailures': []}, result)
Loading