Skip to content

Reapply change - Fix Entity Promotion #25665

Open
mohityadav766 wants to merge 10 commits intomainfrom
fix-rein
Open

Reapply change - Fix Entity Promotion #25665
mohityadav766 wants to merge 10 commits intomainfrom
fix-rein

Conversation

@mohityadav766
Copy link
Member

@mohityadav766 mohityadav766 commented Feb 2, 2026

Describe your changes:

Reapply changes for entity promotion

I worked on ... because ...

Type of change:

  • Bug fix
  • Improvement
  • New feature
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • Documentation

Checklist:

  • I have read the CONTRIBUTING document.
  • My PR title is Fixes <issue-number>: <short explanation>
  • I have commented on my code, particularly in hard-to-understand areas.
  • For JSON Schema changes: I updated the migration scripts or explained why it is not needed.

Summary by Gitar

  • New tracking mechanism:
    • EntityCompletionTracker class monitors partition completion per entity type using concurrent data structures
  • Per-entity index promotion:
    • Modified DistributedSearchIndexExecutor and DistributedSearchIndexCoordinator to promote each entity's index independently when all partitions complete
  • Enhanced vector operation handling:
    • Updated PartitionWorker to wait for vector embedding tasks with 120s timeout before reporting partition completion
  • Comprehensive test coverage:
    • Added EntityCompletionTrackerTest with 10 test cases covering callbacks, failures, concurrency, and edge cases

This will update automatically on new commits.


@mohityadav766
Copy link
Member Author

@gitar-bot what test are failing in playwright, are they related?

@sonarqubecloud
Copy link

sonarqubecloud bot commented Feb 3, 2026

@gitar-bot
Copy link

gitar-bot bot commented Feb 5, 2026

Code Review ⚠️ Changes requested 1 resolved / 4 findings

Per-entity promotion feature looks solid, but PartitionWorker has duplicate await calls and conditional flush logic bugs from apparent merge/refactor artifacts that need cleanup.

⚠️ Bug: Dead code - duplicate awaitSinkCompletion calls

📄 openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/distributed/PartitionWorker.java

The method waitForSinkOperations contains two consecutive calls to statsTracker.awaitSinkCompletion():

  1. Line 315: boolean statsComplete = statsTracker.awaitSinkCompletion(statsTimeout);
  2. Line 319: boolean completed = statsTracker.awaitSinkCompletion(30000);

The second await call (lines 316-325) appears to be dead code that will never execute because:

  • statsComplete is checked at line 316 (if (!statsComplete))
  • Inside that block, there's another awaitSinkCompletion(30000) call at line 319
  • However, the first call with statsTimeout (30s or 60s) would already have timed out

This looks like a merge/refactor artifact where old code wasn't properly removed. The intent seems to be: wait once with dynamic timeout, then log a warning if it didn't complete.

Suggested fix:

boolean statsComplete = statsTracker.awaitSinkCompletion(statsTimeout);
if (!statsComplete) {
    LOG.warn(
        "Timed out waiting for sink stats completion, {} operations still pending for entity {}",
        statsTracker.getPendingSinkOps(),
        statsTracker.getEntityType());
}
statsTracker.flush();
⚠️ Bug: statsTracker.flush() only called when stats wait times out

📄 openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/distributed/PartitionWorker.java

The statsTracker.flush() call at line 327 is currently inside the if (!statsComplete) block, meaning it will only be called when the stats completion wait times out.

If statsComplete is true (the wait succeeds), the flush is never called, which means stats from this partition may not be properly persisted.

Current structure:

if (!statsComplete) {
    boolean completed = statsTracker.awaitSinkCompletion(30000);
    if (!completed) {
        LOG.warn(...);
    }
    statsTracker.flush();  // Only called when !statsComplete
}

Suggested fix:
Move statsTracker.flush() outside the conditional block so it's always called:

if (!statsComplete) {
    LOG.warn(
        "Timed out waiting for sink stats completion, {} operations still pending for entity {}",
        statsTracker.getPendingSinkOps(),
        statsTracker.getEntityType());
}
statsTracker.flush();  // Always flush
💡 Quality: Promotion callback executes inline - may block partition completion

📄 openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/distributed/DistributedSearchIndexExecutor.java:247

In initializeEntityTracker, the entity completion callback directly invokes promoteEntityIndex:

entityTracker.setOnEntityComplete(
    (entityType, success) -> promoteEntityIndex(entityType, success));

The promoteEntityIndex method performs I/O operations (interacting with Elasticsearch/OpenSearch indices) and could be slow. Since this callback is invoked synchronously from recordPartitionComplete in the coordinator, it may block the thread that is processing partition completions.

Impact: If index promotion is slow, it could delay completion tracking for other partitions/entities being processed on the same thread.

Suggested fix:
Consider executing the promotion asynchronously:

entityTracker.setOnEntityComplete(
    (entityType, success) -> 
        CompletableFuture.runAsync(() -> promoteEntityIndex(entityType, success)));

However, this introduces complexity around error handling and tracking. The current synchronous approach is acceptable if promotion is typically fast, but worth monitoring in production.

✅ 1 resolved
Edge Case: Potential NPE when accessing failedPartitions for uninitialized entity

📄 openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/distributed/EntityCompletionTracker.java:89
In recordPartitionComplete, while the code correctly checks that completed and total are not null for the entity, it accesses failedPartitions.get(entityType) without a null check before calling incrementAndGet():

if (partitionFailed) {
  failedPartitions.get(entityType).incrementAndGet();  // NPE if failedPartitions entry is null
}

Although initializeEntity always initializes all three maps together, if the maps get out of sync (e.g., due to a bug or future refactoring), this could cause a NullPointerException.

Suggested fix:
Add a defensive null check:

if (partitionFailed) {
  AtomicInteger failed = failedPartitions.get(entityType);
  if (failed != null) {
    failed.incrementAndGet();
  }
}
Options

Auto-apply is off → Gitar will not commit updates to this branch.
Display: compact → Showing less information.

Comment with these commands to change:

Auto-apply Compact
gitar auto-apply:on         
gitar display:verbose         

Was this helpful? React with 👍 / 👎 | Gitar

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

backend safe to test Add this label to run secure Github workflows on PRs

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant