You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
My PR title is Fixes <issue-number>: <short explanation>
I have commented on my code, particularly in hard-to-understand areas.
For JSON Schema changes: I updated the migration scripts or explained why it is not needed.
Summary by Gitar
New tracking mechanism:
EntityCompletionTracker class monitors partition completion per entity type using concurrent data structures
Per-entity index promotion:
Modified DistributedSearchIndexExecutor and DistributedSearchIndexCoordinator to promote each entity's index independently when all partitions complete
Enhanced vector operation handling:
Updated PartitionWorker to wait for vector embedding tasks with 120s timeout before reporting partition completion
Comprehensive test coverage:
Added EntityCompletionTrackerTest with 10 test cases covering callbacks, failures, concurrency, and edge cases
Per-entity promotion feature looks solid, but PartitionWorker has duplicate await calls and conditional flush logic bugs from apparent merge/refactor artifacts that need cleanup.
⚠️Bug: Dead code - duplicate awaitSinkCompletion calls
The method waitForSinkOperations contains two consecutive calls to statsTracker.awaitSinkCompletion():
Line 315: boolean statsComplete = statsTracker.awaitSinkCompletion(statsTimeout);
Line 319: boolean completed = statsTracker.awaitSinkCompletion(30000);
The second await call (lines 316-325) appears to be dead code that will never execute because:
statsComplete is checked at line 316 (if (!statsComplete))
Inside that block, there's another awaitSinkCompletion(30000) call at line 319
However, the first call with statsTimeout (30s or 60s) would already have timed out
This looks like a merge/refactor artifact where old code wasn't properly removed. The intent seems to be: wait once with dynamic timeout, then log a warning if it didn't complete.
Suggested fix:
booleanstatsComplete = statsTracker.awaitSinkCompletion(statsTimeout);
if (!statsComplete) {
LOG.warn(
"Timed out waiting for sink stats completion, {} operations still pending for entity {}",
statsTracker.getPendingSinkOps(),
statsTracker.getEntityType());
}
statsTracker.flush();
⚠️Bug:statsTracker.flush() only called when stats wait times out
The statsTracker.flush() call at line 327 is currently inside the if (!statsComplete) block, meaning it will only be called when the stats completion wait times out.
If statsComplete is true (the wait succeeds), the flush is never called, which means stats from this partition may not be properly persisted.
Current structure:
if (!statsComplete) {
booleancompleted = statsTracker.awaitSinkCompletion(30000);
if (!completed) {
LOG.warn(...);
}
statsTracker.flush(); // Only called when !statsComplete
}
Suggested fix:
Move statsTracker.flush() outside the conditional block so it's always called:
if (!statsComplete) {
LOG.warn(
"Timed out waiting for sink stats completion, {} operations still pending for entity {}",
statsTracker.getPendingSinkOps(),
statsTracker.getEntityType());
}
statsTracker.flush(); // Always flush
The promoteEntityIndex method performs I/O operations (interacting with Elasticsearch/OpenSearch indices) and could be slow. Since this callback is invoked synchronously from recordPartitionComplete in the coordinator, it may block the thread that is processing partition completions.
Impact: If index promotion is slow, it could delay completion tracking for other partitions/entities being processed on the same thread.
Suggested fix:
Consider executing the promotion asynchronously:
However, this introduces complexity around error handling and tracking. The current synchronous approach is acceptable if promotion is typically fast, but worth monitoring in production.
✅ 1 resolved✅ Edge Case: Potential NPE when accessing failedPartitions for uninitialized entity
📄 openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/distributed/EntityCompletionTracker.java:89
In recordPartitionComplete, while the code correctly checks that completed and total are not null for the entity, it accesses failedPartitions.get(entityType) without a null check before calling incrementAndGet():
if (partitionFailed) {
failedPartitions.get(entityType).incrementAndGet(); // NPE if failedPartitions entry is null
}
Although initializeEntity always initializes all three maps together, if the maps get out of sync (e.g., due to a bug or future refactoring), this could cause a NullPointerException.
Suggested fix:
Add a defensive null check:
if (partitionFailed) {
AtomicIntegerfailed = failedPartitions.get(entityType);
if (failed != null) {
failed.incrementAndGet();
}
}
Options
Auto-apply is off → Gitar will not commit updates to this branch. Display: compact → Showing less information.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Describe your changes:
Reapply changes for entity promotion
I worked on ... because ...
Type of change:
Checklist:
Fixes <issue-number>: <short explanation>Summary by Gitar
EntityCompletionTrackerclass monitors partition completion per entity type using concurrent data structuresDistributedSearchIndexExecutorandDistributedSearchIndexCoordinatorto promote each entity's index independently when all partitions completePartitionWorkerto wait for vector embedding tasks with 120s timeout before reporting partition completionEntityCompletionTrackerTestwith 10 test cases covering callbacks, failures, concurrency, and edge casesThis will update automatically on new commits.