Fixes #4003: bulk + async restore for large entity hierarchies#27997
Fixes #4003: bulk + async restore for large entity hierarchies#27997
Conversation
EntityRepository.restoreEntity walked descendants synchronously, taking 4+ minutes on a 12k-table database and exceeding typical proxy timeouts. restoreChildren now groups CONTAINS children by type and dispatches one bulkRestoreSubtree per type, batching DB writes, version history, change events, and cache invalidation; the existing ES cascade handles descendant index updates in one update_by_query. Adds an async option (?async=true) on the deep-hierarchy restore endpoints that returns 202 Accepted with a job id and runs the restore on AsyncService, emitting WebSocket notifications on restoreEntityChannel. Java SDK adds .restore().async().execute() fluent builders on Tables/Databases plus restoreServerAsync on EntityServiceBase; Python SDK mirrors this with restore_request().with_async().execute() and restore_async() helpers on BaseEntity, exposing a new AsyncJobResponse type. Tests: EntityRepositoryRestoreTest verifies the per-type grouping and bulk dispatch path; RestoreFluentAPITest covers the Java SDK fluent behavior; RestoreHierarchyIT exercises sync and async restore against a real DB→schemas→tables tree end-to-end; test_restore_async.py covers the Python SDK paths. Fixes #4003 Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
| return; | ||
| } | ||
| repository.restoreFromSearch(response.getEntity()); | ||
| addHref(uriInfo, response.getEntity()); |
There was a problem hiding this comment.
⚠️ Bug: uriInfo used in async lambda after request scope ends
In restoreEntityAsync, the request-scoped uriInfo object is captured by the lambda submitted to the executor (line 822: addHref(uriInfo, response.getEntity())). After the HTTP response is sent (202 Accepted), the JAX-RS container may invalidate or recycle the UriInfo instance, leading to IllegalStateException or incorrect URL generation when addHref calls uriInfo.getBaseUri().
The existing deleteByIdAsync method does NOT use uriInfo inside its async lambda, confirming this is an inconsistency introduced by this PR. The addHref call is non-essential for the async path since the restored entity is only sent via WebSocket notification (not as an HTTP response body), and the WebSocket message uses entity.getName() not HREFs.
Suggested fix:
Remove the `addHref` call from the async lambda. The WebSocket notification only uses `entity.getName()`, so HREFs are unnecessary:
try {
PutResponse<T> response = repository.restoreEntity(userName, id);
if (response == null) {
// ... existing error handling
return;
}
repository.restoreFromSearch(response.getEntity());
- addHref(uriInfo, response.getEntity());
LOG.info(...);
WebsocketNotificationHandler.sendRestoreOperationCompleteNotification(
jobId, securityContext, response.getEntity());
Was this helpful? React with 👍 / 👎 | Reply gitar fix to apply this suggestion
| public static class TableRestorer { | ||
| private final OpenMetadataClient client; | ||
| private final String id; | ||
|
|
||
| public TableRestorer(OpenMetadataClient client, String id) { | ||
| this.client = client; | ||
| this.id = id; | ||
| } | ||
|
|
||
| public AsyncTableRestorer async() { | ||
| return new AsyncTableRestorer(client, id); | ||
| } | ||
|
|
||
| public Table execute() { | ||
| try { |
There was a problem hiding this comment.
💡 Quality: Fluent restorer classes duplicate identical pattern across entities
The TableRestorer/AsyncTableRestorer and DatabaseRestorer/AsyncDatabaseRestorer classes are structurally identical — only the entity type and service accessor differ. This will grow linearly as more entity types gain the async restore feature (the PR already lists 8 resources). Consider extracting a generic EntityRestorer<T> to avoid duplication per the project's 'No Duplication' principle.
Was this helpful? React with 👍 / 👎 | Reply gitar fix to apply this suggestion
|
The Python checkstyle failed. Please run You can install the pre-commit hooks with |
There was a problem hiding this comment.
Pull request overview
This PR addresses restore timeouts for large entity hierarchies by replacing per-entity recursive restores with a bulk restore path, and adds an optional server-side async restore mode that returns 202 Accepted and notifies completion via WebSockets. It also updates the Java and Python SDKs to expose the async restore option and adds unit/integration tests covering the new behavior.
Changes:
- Implement bulk subtree restore (
bulkRestoreSubtree) and type-grouped child restore dispatch to reduce DB/ES work per restore. - Add
?async=truerestore option on deep-hierarchy endpoints, backed byAsyncServiceand a newrestoreEntityChannelWebSocket notification channel. - Update Java/Python SDKs with async restore helpers + fluent builders, and add unit/integration tests for sync/async restore flows.
Reviewed changes
Copilot reviewed 23 out of 23 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java | Adds grouped restoreChildren and new bulkRestoreSubtree bulk restore implementation plus extension hook. |
| openmetadata-service/src/main/java/org/openmetadata/service/resources/EntityResource.java | Adds restoreEntity(..., async) overload and server-side async restore execution + 202 response. |
| openmetadata-service/src/main/java/org/openmetadata/service/util/WebsocketNotificationHandler.java | Adds restore completion/failure WebSocket notification helpers. |
| openmetadata-service/src/main/java/org/openmetadata/service/socket/WebSocketManager.java | Introduces RESTORE_ENTITY_CHANNEL constant. |
| openmetadata-service/src/main/java/org/openmetadata/service/util/RestoreEntityResponse.java | New 202 response DTO for async restore requests. |
| openmetadata-service/src/main/java/org/openmetadata/service/util/RestoreEntityMessage.java | New WebSocket payload DTO for restore status notifications. |
| openmetadata-service/src/main/java/org/openmetadata/service/resources/databases/DatabaseResource.java | Wires ?async=true into database restore endpoint + OpenAPI 202 response. |
| openmetadata-service/src/main/java/org/openmetadata/service/resources/databases/DatabaseSchemaResource.java | Wires ?async=true into schema restore endpoint + OpenAPI 202 response. |
| openmetadata-service/src/main/java/org/openmetadata/service/resources/databases/TableResource.java | Wires ?async=true into table restore endpoint + OpenAPI 202 response. |
| openmetadata-service/src/main/java/org/openmetadata/service/resources/databases/StoredProcedureResource.java | Wires ?async=true into stored procedure restore endpoint + OpenAPI 202 response. |
| openmetadata-service/src/main/java/org/openmetadata/service/resources/services/database/DatabaseServiceResource.java | Wires ?async=true into database service restore endpoint + OpenAPI 202 response. |
| openmetadata-service/src/main/java/org/openmetadata/service/resources/dashboards/DashboardResource.java | Wires ?async=true into dashboard restore endpoint + OpenAPI 202 response. |
| openmetadata-service/src/main/java/org/openmetadata/service/resources/datamodels/DashboardDataModelResource.java | Wires ?async=true into data model restore endpoint + OpenAPI 202 response. |
| openmetadata-service/src/main/java/org/openmetadata/service/resources/storages/ContainerResource.java | Wires ?async=true into container restore endpoint + OpenAPI 202 response. |
| openmetadata-sdk/src/main/java/org/openmetadata/sdk/services/EntityServiceBase.java | Adds restoreServerAsync(...) calling PUT /restore?async=true. |
| openmetadata-sdk/src/main/java/org/openmetadata/sdk/models/AsyncJobResponse.java | New Java SDK model for 202 async job response. |
| openmetadata-sdk/src/main/java/org/openmetadata/sdk/fluent/Tables.java | Adds fluent restore builder with .async().execute() for server-side async restore. |
| openmetadata-sdk/src/main/java/org/openmetadata/sdk/fluent/Databases.java | Adds fluent restore builder with .async().execute() for server-side async restore. |
| ingestion/src/metadata/sdk/entities/base.py | Adds Python AsyncJobResponse + restore fluent operation + restore_async. |
| openmetadata-service/src/test/java/org/openmetadata/service/jdbi3/EntityRepositoryRestoreTest.java | Unit tests for grouped restore children + bulk restore no-op behaviors. |
| openmetadata-sdk/src/test/java/org/openmetadata/sdk/fluent/RestoreFluentAPITest.java | Unit tests validating fluent restore routes to sync vs server-async SDK calls. |
| ingestion/tests/unit/sdk/test_restore_async.py | Python unit tests for restore_async and fluent restore request behavior. |
| openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/RestoreHierarchyIT.java | Integration test validating sync restore and async restore (202 + eventual restored state). |
Comments suppressed due to low confidence (1)
openmetadata-service/src/main/java/org/openmetadata/service/resources/EntityResource.java:790
repository.restoreEntity(...)can returnnullwhen the entity is not in the deleted state (see EntityRepository.restoreEntity catch). The sync restore path immediately dereferencesresponse.getEntity()which will throw NPE and return 500. Handle the null case explicitly (e.g., return 404/400 with a clear message) before callingrestoreFromSearch/addHref.
OperationContext operationContext =
new OperationContext(entityType, MetadataOperation.EDIT_ALL);
authorizer.authorize(securityContext, operationContext, getResourceContextById(id));
PutResponse<T> response =
repository.restoreEntity(securityContext.getUserPrincipal().getName(), id);
repository.restoreFromSearch(response.getEntity());
addHref(uriInfo, response.getEntity());
LOG.info(
| public Response restoreEntityAsync(UriInfo uriInfo, SecurityContext securityContext, UUID id) { | ||
| OperationContext operationContext = | ||
| new OperationContext(entityType, MetadataOperation.EDIT_ALL); | ||
| authorizer.authorize(securityContext, operationContext, getResourceContextById(id)); | ||
| String jobId = UUID.randomUUID().toString(); | ||
| String userName = securityContext.getUserPrincipal().getName(); | ||
| ExecutorService executorService = AsyncService.getInstance().getExecutorService(); | ||
| executorService.submit( | ||
| RequestLatencyContext.wrapWithContext( | ||
| () -> { | ||
| try { | ||
| PutResponse<T> response = repository.restoreEntity(userName, id); | ||
| if (response == null) { | ||
| WebsocketNotificationHandler.sendRestoreOperationFailedNotification( | ||
| jobId, securityContext, id.toString(), "Entity is not in deleted state"); | ||
| return; | ||
| } | ||
| repository.restoreFromSearch(response.getEntity()); | ||
| addHref(uriInfo, response.getEntity()); | ||
| LOG.info( | ||
| "[AsyncRestore] Restored {}:{} (jobId={})", | ||
| Entity.getEntityTypeFromObject(response.getEntity()), | ||
| response.getEntity().getId(), | ||
| jobId); | ||
| WebsocketNotificationHandler.sendRestoreOperationCompleteNotification( | ||
| jobId, securityContext, response.getEntity()); | ||
| } catch (Exception e) { | ||
| LOG.error("[AsyncRestore] Failed to restore {}:{}", entityType, id, e); | ||
| WebsocketNotificationHandler.sendRestoreOperationFailedNotification( | ||
| jobId, | ||
| securityContext, | ||
| id.toString(), | ||
| e.getMessage() == null ? e.toString() : e.getMessage()); | ||
| } | ||
| })); | ||
| RestoreEntityResponse response = | ||
| new RestoreEntityResponse(jobId, "Restore initiated successfully."); | ||
| return Response.accepted().entity(response).type(MediaType.APPLICATION_JSON).build(); | ||
| } |
| public Table execute() { | ||
| try { | ||
| return client.tables().restore(id); | ||
| } catch (org.openmetadata.sdk.exceptions.OpenMetadataException e) { | ||
| throw new RuntimeException(e); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| public static class AsyncTableRestorer { | ||
| private final OpenMetadataClient client; | ||
| private final String id; | ||
|
|
||
| public AsyncTableRestorer(OpenMetadataClient client, String id) { | ||
| this.client = client; | ||
| this.id = id; | ||
| } | ||
|
|
||
| public org.openmetadata.sdk.models.AsyncJobResponse execute() { | ||
| try { | ||
| return client.tables().restoreServerAsync(id); | ||
| } catch (org.openmetadata.sdk.exceptions.OpenMetadataException e) { | ||
| throw new RuntimeException(e); | ||
| } | ||
| } |
| public Database execute() { | ||
| try { | ||
| return client.databases().restore(id); | ||
| } catch (org.openmetadata.sdk.exceptions.OpenMetadataException e) { | ||
| throw new RuntimeException(e); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| public static class AsyncDatabaseRestorer { | ||
| private final OpenMetadataClient client; | ||
| private final String id; | ||
|
|
||
| public AsyncDatabaseRestorer(OpenMetadataClient client, String id) { | ||
| this.client = client; | ||
| this.id = id; | ||
| } | ||
|
|
||
| public org.openmetadata.sdk.models.AsyncJobResponse execute() { | ||
| try { | ||
| return client.databases().restoreServerAsync(id); | ||
| } catch (org.openmetadata.sdk.exceptions.OpenMetadataException e) { | ||
| throw new RuntimeException(e); | ||
| } | ||
| } |
| return cls( | ||
| job_id=str(payload.get("jobId", "")), |
| * empty inputs and invokes the {@code restoreAdditionalChildren} extension hook once per | ||
| * restored entity. |
…cade Two follow-up improvements to the bulk restore introduced for #4003. Batched findTo per tree level: bulkRestoreSubtree previously issued one findTo per parent during recursion, which at the schemas → tables level of a 12k-table database meant 12k DB round trips just to enumerate children. The new bulkRestoreContainedChildren helper does one findToBatchAllTypes per tree level regardless of fan-out, then groups the results by child type and dispatches to each repo's bulkRestoreSubtree. DashboardRepository's chart-restore logic moves from the now-bypassed restoreChildren override to the existing restoreAdditionalChildren extension hook so it still runs both for direct dashboard restores and when dashboards are descendants of a larger restore. Symmetric bulk soft-delete cascade: deleteByName/deleteById had the same per-entity recursion that this PR fixed for restore — soft- deleting a database with 12k tables ran 12k recursive Entity.deleteEntity calls, each writing one row + one ES update + one change event. New bulkSoftDeleteSubtree mirrors bulkRestoreSubtree: one batched findToBatchAllTypes per level, deferred-store DB writes, batched version history, batched change events, batched cache invalidation; per-descendant ES writes are skipped because the existing deleteFromSearch cascade flips the deleted flag on descendant indexes in one update_by_query. deleteChildren(List, hardDelete=false, ...) now dispatches to the bulk path; hard-delete keeps the existing batchDeleteChildren path. New softDeleteAdditionalChildren extension hook mirrors restoreAdditionalChildren; DashboardRepository's chart soft-delete migrates onto it for the same reason. Tests: extends EntityRepositoryRestoreTest with cases that verify findToBatchAllTypes is invoked exactly once per level (not once per parent) for both bulk operations, plus the existing grouping/dispatch shape for the soft-delete entry point. Extends RestoreHierarchyIT with a recursive soft-delete cascade assertion. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
|
The Java checkstyle failed. Please run You can install the pre-commit hooks with |
| private void bulkRestoreContainedChildren(List<T> parents, String updatedBy) { | ||
| List<String> parentIds = new ArrayList<>(parents.size()); | ||
| for (T parent : parents) { | ||
| parentIds.add(parent.getId().toString()); | ||
| } | ||
| List<CollectionDAO.EntityRelationshipObject> relationships; | ||
| try (var ignored = phase("bulkRestoreFindChildren")) { | ||
| relationships = | ||
| daoCollection | ||
| .relationshipDAO() | ||
| .findToBatchAllTypes(parentIds, Relationship.CONTAINS.ordinal(), ALL); | ||
| } | ||
| if (relationships.isEmpty()) { | ||
| return; | ||
| } |
There was a problem hiding this comment.
💡 Quality: bulkRestore/SoftDeleteContainedChildren are near-identical duplicates
The two private methods bulkRestoreContainedChildren (lines 5677-5705) and bulkSoftDeleteContainedChildren (lines 5825-5853) share identical logic — collect parent IDs, call findToBatchAllTypes, filter by entityType, group by child type, dispatch to the child repo. The only difference is the terminal call (bulkRestoreSubtree vs bulkSoftDeleteSubtree). Per the 'No Duplication' principle, extract a shared helper that accepts the dispatch function.
Suggested fix:
private void dispatchToContainedChildren(
List<T> parents, String updatedBy, String phaseName,
BiConsumer<EntityRepository<?>, List<UUID>> dispatcher) {
List<String> parentIds = parents.stream()
.map(p -> p.getId().toString()).toList();
List<CollectionDAO.EntityRelationshipObject> rels;
try (var ignored = phase(phaseName)) {
rels = daoCollection.relationshipDAO()
.findToBatchAllTypes(parentIds, Relationship.CONTAINS.ordinal(), ALL);
}
if (rels.isEmpty()) return;
Map<String, List<UUID>> idsByType = new HashMap<>();
for (var rel : rels) {
if (!entityType.equals(rel.getFromEntity())) continue;
idsByType.computeIfAbsent(rel.getToEntity(), k -> new ArrayList<>())
.add(UUID.fromString(rel.getToId()));
}
idsByType.forEach((type, ids) -> dispatcher.accept(
Entity.getEntityRepository(type), ids));
}
Was this helpful? React with 👍 / 👎 | Reply gitar fix to apply this suggestion
| @Transaction | ||
| public final void bulkSoftDeleteSubtree(List<UUID> ids, String updatedBy) { | ||
| if (ids == null || ids.isEmpty()) { | ||
| return; | ||
| } | ||
| if (!supportsSoftDelete) { | ||
| for (UUID id : ids) { | ||
| Entity.deleteEntity(updatedBy, entityType, id, true, true); | ||
| } | ||
| return; | ||
| } | ||
| List<T> entities; | ||
| try (var ignored = phase("bulkSoftDeleteLoad")) { | ||
| entities = find(ids, NON_DELETED); | ||
| } |
There was a problem hiding this comment.
💡 Quality: bulkSoftDeleteSubtree exceeds 15-line method limit (83 lines)
bulkSoftDeleteSubtree spans lines 5736-5818 (83 lines), well above the project's 15-line method guideline. The method has clear phases (guard/load, pre-delete hooks, child cascade, updater creation, version history, persistence, cache invalidation, change events) that can each be extracted into focused private methods, matching the phase-based structure already hinted at by the try (var ignored = phase(...)) calls.
Was this helpful? React with 👍 / 👎 | Reply gitar fix to apply this suggestion
Code Review
|
| Compact |
|
Was this helpful? React with 👍 / 👎 | Gitar
|
The Python checkstyle failed. Please run You can install the pre-commit hooks with |
|
🔴 Playwright Results — 1 failure(s), 12 flaky✅ 4015 passed · ❌ 1 failed · 🟡 12 flaky · ⏭️ 86 skipped
Genuine Failures (failed on all attempts)❌
|



Describe your changes:
Fixes #4003
EntityRepository.restoreEntitywalked the descendant tree synchronously inside one HTTP request — on a database with 12k tables / 1.1k stored procedures the customer reported 4m37s response times that exceeded typical proxy / ALB idle timeouts. This PR replaces the per-entity recursion with an iterative bulk path and adds a server-side async option for hierarchies large enough that the synchronous response would still hit timeouts.Type of change:
High-level design:
Bulk synchronous path.
restoreChildrennow groups CONTAINS children by entity type and dispatches a singlebulkRestoreSubtree(ids, updatedBy)per type rather than loopingEntity.restoreEntityper descendant.bulkRestoreSubtreereuses the existing deferred-store bulk infrastructure (updateMany,entityExtensionDAO.insertMany,invalidateMany,insertChangeEventsBatch) for one batched DB write per type. Per-descendant ES writes are skipped becauserestoreFromSearchalready cascades the deleted-flag flip across child indexes in one ESupdate_by_query. Subclass extension hookrestoreAdditionalChildren(UUID, String)is provided for repos that link non-CONTAINS related entities.Async option. New
EntityResource.restoreEntityAsync(...)returns 202 Accepted with aRestoreEntityResponse(jobId, message)and runs the restore onAsyncService(virtual-thread executor with semaphore-bounded concurrency); completion / failure is broadcast on the newRESTORE_ENTITY_CHANNELWebSocket channel. TherestoreEntity(uri, sec, id, async)overload is wired into the deep-hierarchy endpoints (Database,DatabaseSchema,Table,StoredProcedure,Dashboard,DashboardDataModel,Container,DatabaseService) via?async=true.SDK updates. Java SDK adds an
AsyncJobResponsemodel andEntityServiceBase.restoreServerAsync(id); new fluent builders onTablesandDatabasesgiveTables.find(id).restore().execute()(sync, returnsTable) andTables.find(id).restore().async().execute()(returnsAsyncJobResponse) with type-safe terminal operations. Python SDK mirrors this withTables.restore_async(id)and the fluentTables.restore_request(id).with_async().execute()plus anAsyncJobResponsedataclass.Tests:
Use cases covered
?async=true, observe 202 + jobId, then poll until the database flips todeleted=false.restoreChildrencorrectly groups mixed-type children (DatabaseSchema + StoredProcedure) and dispatches onebulkRestoreSubtreeper type with no per-entityEntity.restoreEntitycalls.bulkRestoreSubtreeis a no-op for null / empty IDs and when no deleted entities are found.Tables.find(id).restore().async().execute()andDatabases.find(id).restore().async().execute()route torestoreServerAsync; without.async()they go to the sync path.Tables.restore_async(id)appends?async=true, returns anAsyncJobResponse; the fluentrestore_request(id).with_async().execute()does the same.Unit tests
openmetadata-service/src/test/java/.../jdbi3/EntityRepositoryRestoreTest.javaopenmetadata-sdk/src/test/java/.../fluent/RestoreFluentAPITest.javaingestion/tests/unit/sdk/test_restore_async.pyBackend integration tests
openmetadata-integration-tests/for new/changed API endpoints.openmetadata-integration-tests/src/test/java/.../tests/RestoreHierarchyIT.javaIngestion integration tests
Playwright (UI) tests
Manual testing performed
mvn -pl openmetadata-service,openmetadata-sdk -DskipTests compile— clean build.mvn -pl openmetadata-integration-tests test-compile— clean build.mvn -pl openmetadata-service test -Dtest=EntityRepositoryRestoreTest— 4/4 pass.mvn -pl openmetadata-sdk test -Dtest=RestoreFluentAPITest— 4/4 pass.UI screen recording / screenshots:
Not applicable.
Checklist:
Fixes <issue-number>: <short explanation>Fixes #<issue-number>above.RestoreHierarchyITbuilds the multi-level hierarchy and exercises both the bulk-sync and async paths).🤖 Generated with Claude Code