-
Notifications
You must be signed in to change notification settings - Fork 1.6k
BulkAPIs should use bulkWrite/bulkUpdate methods to reduce the no.of queries and db connections #25709
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
...tadata-service/src/main/java/org/openmetadata/service/config/BulkOperationConfiguration.java
Outdated
Show resolved
Hide resolved
openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/BulkExecutor.java
Outdated
Show resolved
Hide resolved
…many db connections
40bdced to
5aec9c9
Compare
| .withRequest(updater.getUpdated().getFullyQualifiedName()) | ||
| .withStatus(Status.OK.getStatusCode())); | ||
| } | ||
| } catch (Exception batchError) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
⚠️ Bug: Fallback storeUpdate() may duplicate version history entries
When the batch DB write in bulkUpdateEntities partially succeeds — for example, the batch history inserts via insertMany succeed but the subsequent updateMany for entity rows fails — the catch block falls back to per-entity storeUpdate() for all updaters.
storeUpdate() calls storeEntityHistory() which inserts the same version extension record that was already inserted by the batch insertMany. This can produce duplicate version history entries for entities whose history was already recorded in the successful batch insert.
Additionally, storeUpdate() calls updateVersion() which re-computes the version from original.getVersion(). While the version computation is idempotent (same input → same output), the combination of already-written DB state + full storeUpdate() is fragile.
Suggested fix: Track which batch operations succeeded to avoid re-processing. For example:
- Wrap the batch history insert and entity update in a single DB transaction so they either both succeed or both fail.
- Or, split the fallback to skip history insertion if it was already done, e.g. by catching exceptions separately for each batch step and only falling back on the step that failed.
Was this helpful? React with 👍 / 👎
| .withRequest(entity.getFullyQualifiedName()) | ||
| .withStatus(Status.BAD_REQUEST.getStatusCode()) | ||
| .withMessage(e.getMessage())); | ||
| createManyEntities(newEntities); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
⚠️ Bug: Batch create path skips writeThroughCache for new entities
The batch create path calls createManyEntities(newEntities) at line 7877 which does NOT call writeThroughCache or writeThroughCacheMany. In contrast, the single-entity createNewEntity() (used in the fallback path at line 7894 via createOrUpdate) does call writeThroughCache(entity, false).
This means when batch creation succeeds (the happy path), newly created entities are never written to the Redis distributed cache. Subsequent reads may experience cache misses and hit the database, partially negating the performance benefit of batch operations. In a multi-node deployment, other nodes won't see the new entities in their cache until they're naturally loaded.
Suggested fix: Add a writeThroughCacheMany(newEntities, false) call after the successful createManyEntities call, analogous to how the import path handles it:
createManyEntities(newEntities);
writeThroughCacheMany(newEntities, false); // Add thisWas this helpful? React with 👍 / 👎
🔍 CI failure analysis for 206fbed: 1 test failure in TableResourceIT.test_tableLineage - Elasticsearch version conflict during lineage update. Likely unrelated to bulk operations changes.Test Failure (Likely Unrelated to PR)IssueSingle test failure in table lineage functionality Failed Test
Root CauseElasticsearch version conflict during search index update. The test attempts to update a database service in the search index but encounters a version conflict: Error Details
Relation to PR❓ LIKELY UNRELATED - This PR modifies bulk operations code (EntityRepository, EntityResource, BulkExecutor). The failure is in lineage functionality with Elasticsearch search index versioning. No lineage-related code was modified. AnalysisThis appears to be a race condition or timing issue in the search index update mechanism during lineage operations, not related to the bulk operations implementation. The PR does modify Context✅ Major Progress: The 17+ compilation errors that blocked all CI jobs have been resolved. Tests are now actually running and only 1 test is failing (out of 162 tests in this suite). Code Review
|
| Auto-apply | Compact |
|
|
Was this helpful? React with 👍 / 👎 | Gitar
|



This pull request introduces significant improvements to how bulk operations are managed, focusing on reducing database queries and connections through batch processing. The implementation uses batch database operations with proper authorization checks and graceful fallback mechanisms to handle bulk imports/exports efficiently.
Bulk Operation Improvements:
updateWithDeferredStore) that enables batch database writes while maintaining proper lifecycle event handling.Bulk Write/Update Methods:
bulkWrite()andbulkUpdate()methods to EntityDAO for efficient batch database operations.Configuration and Setup:
Testing and Code Quality:
These changes together provide more robust, predictable, and scalable handling of bulk operations by reducing database query overhead and connection usage through efficient batch processing.