feat(migrator): [3/7] Async migration with non-blocking planner, executor, validator, and readiness utilities#562
feat(migrator): [3/7] Async migration with non-blocking planner, executor, validator, and readiness utilities#562nkanu17 wants to merge 6 commits intofeat/migrate-executorfrom
Conversation
🛡️ Jit Security Scan Results✅ No security findings were detected in this PR
Security scan by Jit
|
There was a problem hiding this comment.
Pull request overview
Adds an async migration surface (AsyncMigrationPlanner, AsyncMigrationExecutor, AsyncMigrationValidator) to enable non-blocking drop/recreate migrations, plus async readiness/index utilities and corresponding unit/integration tests.
Changes:
- Introduces async planner/executor/validator implementations mirroring the existing sync migration flow (including renames, vector re-encoding, and readiness polling).
- Adds async helper utilities for listing indexes, readiness polling, and snapshot comparison.
- Adds new async unit and integration tests for planning, execution, disk space estimation, and reliability helpers.
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
redisvl/migration/async_planner.py |
Async migration planning built on AsyncSearchIndex with sync planner delegation. |
redisvl/migration/async_executor.py |
Async migration apply flow incl. key/field renames, optional vector re-encoding, readiness wait, and validation. |
redisvl/migration/async_validation.py |
Async post-migration validation and query checks. |
redisvl/migration/async_utils.py |
Async index listing, readiness polling, and snapshot match helper. |
redisvl/migration/__init__.py |
Exposes new async migration APIs/utilities from the package. |
tests/unit/test_async_migration_planner.py |
Unit coverage for async planner parity with sync behavior. |
tests/unit/test_async_migration_executor.py |
Unit coverage for async executor + disk estimator + reliability helpers. |
tests/integration/test_async_migration_v1.py |
End-to-end integration coverage for async plan/apply/validate against Redis. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 0087dcf33b
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
|
@codex review |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 0087dcf33b
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 8 out of 8 changed files in this pull request and generated 5 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
@codex review |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 0087dcf33b
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
- Fix unbound 'ready' variable in async_utils.py and async_executor.py - Fix completed checkpoint: resume from post-drop state - Pass rename_operations to get_vector_datatype_changes - Fix has_prefix_change falsy check for empty string prefixes - Fix partial key renames: fail fast on collision - Warn when field rename overwrites existing destination field - Fix async_validation prefix handling and indexing failure delta
- Fix _quantize_vectors docstring: 'documents quantized' not 'processed' - Close internally-created Redis client in async_list_indexes
50cff88 to
33f2a40
Compare
0087dcf to
8642ec7
Compare
|
@codex review |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 8642ec7435
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
- Remap datatype_changes keys to post-rename field names before quantization - Only resume from completed checkpoint when source index is actually gone
…xecutor, validator, and readiness utilities Async versions of planner, executor, and validator for non-blocking migration workflows. Async executor mirrors the sync drop/recreate flow with async key enumeration, prefix/field renames, vector re-encoding with checkpoint resume, and readiness polling. Includes async utilities for index listing, readiness polling, and source snapshot validation. Adds async unit and integration tests.
- Fix unbound 'ready' variable in async_utils.py and async_executor.py - Fix completed checkpoint: resume from post-drop state - Pass rename_operations to get_vector_datatype_changes - Fix has_prefix_change falsy check for empty string prefixes - Fix partial key renames: fail fast on collision - Warn when field rename overwrites existing destination field - Fix async_validation prefix handling and indexing failure delta
- Fix _quantize_vectors docstring: 'documents quantized' not 'processed' - Close internally-created Redis client in async_list_indexes
- Pass existing snapshot to create_plan_from_patch to avoid double Redis round-trip - Use _get_client() instead of _redis_client for lazy async client initialization - Remap datatype_changes keys to post-rename field names before quantization - Only resume from completed checkpoint when source index is actually gone
33f2a40 to
42aa7bb
Compare
8642ec7 to
7a1ef9a
Compare
- Pass existing snapshot to create_plan_from_patch to avoid double Redis round-trip - Use _get_client() instead of _redis_client for lazy async client initialization - Remap datatype_changes keys to post-rename field names before quantization - Only resume from completed checkpoint when source index is actually gone
|
@codex review |
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 2 potential issues.
Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, have a team admin enable autofix in the Cursor dashboard.
| percent_indexed = latest_info.get("percent_indexed") | ||
|
|
||
| if percent_indexed is not None or indexing is not None: | ||
| ready = float(percent_indexed or 0) >= 1.0 and not bool(indexing) |
There was a problem hiding this comment.
Async readiness check logic diverges from sync version
High Severity
The async readiness check uses float(percent_indexed or 0) >= 1.0 and not bool(indexing), which differs from the sync version's logic. When percent_indexed is None but indexing is present and falsy (e.g., 0), the sync version correctly falls through to ready = not is_indexing (returning True). The async version evaluates float(None or 0) >= 1.0 → False, so ready stays False, potentially causing a 30-minute timeout instead of detecting the index is ready.
Additional Locations (1)
| warnings.append( | ||
| "SVS-VAMANA requires Redis >= 8.2.0 and Redis Search >= 2.8.10. " | ||
| "Verify your Redis instance supports this algorithm before applying." | ||
| ) |
There was a problem hiding this comment.
Async SVS check leaks Redis client connection
Medium Severity
When redis_url is provided, _check_svs_vamana_requirements creates a new Redis.from_url() client but never closes it. The sync counterpart in planner.py tracks the created client and calls created_client.close() in a finally block. The async version is missing both the created_client tracking and the finally cleanup with await client.aclose().
❌ Jit Scanner failed - Our team is investigatingJit Scanner failed - Our team has been notified and is working to resolve the issue. Please contact support if you have any questions. 💡 Need to bypass this check? Comment |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 9 out of 9 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| ready = float(percent_indexed or 0) >= 1.0 and not bool(indexing) | ||
| if progress_callback: | ||
| total_docs = int(latest_info.get("num_docs", 0)) | ||
| pct = float(percent_indexed or 0) |
There was a problem hiding this comment.
async_wait_for_index_ready() treats a missing percent_indexed value as 0.0 even when the indexing flag is present, which can make the loop wait until timeout on Redis/Search versions that omit percent_indexed. Mirror the sync wait_for_index_ready() logic: if percent_indexed is None but indexing is present, consider the index ready when indexing is falsy, and compute progress accordingly.
| ready = float(percent_indexed or 0) >= 1.0 and not bool(indexing) | |
| if progress_callback: | |
| total_docs = int(latest_info.get("num_docs", 0)) | |
| pct = float(percent_indexed or 0) | |
| # Mirror sync wait_for_index_ready behavior: | |
| # - If percent_indexed is provided, use it directly. | |
| # - If percent_indexed is None but indexing is present, treat the | |
| # index as fully indexed (pct = 1.0) when indexing is falsy. | |
| if percent_indexed is not None: | |
| pct = float(percent_indexed) | |
| else: | |
| pct = 1.0 if not bool(indexing) else 0.0 | |
| ready = pct >= 1.0 and not bool(indexing) | |
| if progress_callback: | |
| total_docs = int(latest_info.get("num_docs", 0)) |
| if percent_indexed is not None or indexing is not None: | ||
| ready = float(percent_indexed or 0) >= 1.0 and not bool(indexing) | ||
| if progress_callback: | ||
| total_docs = int(latest_info.get("num_docs", 0)) | ||
| pct = float(percent_indexed or 0) | ||
| indexed_docs = int(total_docs * pct) | ||
| progress_callback(indexed_docs, total_docs, pct * 100) |
There was a problem hiding this comment.
_async_wait_for_index_ready() can fail to ever report readiness when percent_indexed is missing but the indexing flag exists (it coerces missing percent to 0.0 and requires >= 1.0). Align with the sync readiness utility: if percent_indexed is absent, use the indexing flag alone to decide readiness (ready when indexing is falsy).
| if percent_indexed is not None or indexing is not None: | |
| ready = float(percent_indexed or 0) >= 1.0 and not bool(indexing) | |
| if progress_callback: | |
| total_docs = int(latest_info.get("num_docs", 0)) | |
| pct = float(percent_indexed or 0) | |
| indexed_docs = int(total_docs * pct) | |
| progress_callback(indexed_docs, total_docs, pct * 100) | |
| if percent_indexed is not None: | |
| pct = float(percent_indexed or 0) | |
| ready = pct >= 1.0 and not bool(indexing) | |
| if progress_callback: | |
| total_docs = int(latest_info.get("num_docs", 0)) | |
| indexed_docs = int(total_docs * pct) | |
| progress_callback(indexed_docs, total_docs, pct * 100) | |
| elif indexing is not None: | |
| # When percent_indexed is absent but indexing is present, | |
| # fall back to the indexing flag alone (ready when falsy), | |
| # matching the synchronous readiness utility behavior. | |
| ready = not bool(indexing) | |
| if progress_callback: | |
| total_docs = int(latest_info.get("num_docs", 0)) | |
| if ready: | |
| indexed_docs = total_docs | |
| pct_value = 100.0 | |
| else: | |
| indexed_docs = 0 | |
| pct_value = 0.0 | |
| progress_callback(indexed_docs, total_docs, pct_value) |
| source_still_exists = ( | ||
| await self._async_current_source_matches_snapshot( | ||
| plan.source.index_name, | ||
| plan.source.schema_snapshot, | ||
| redis_url=redis_url, | ||
| redis_client=redis_client, | ||
| ) | ||
| ) |
There was a problem hiding this comment.
When checkpoint.status == "completed", this calls _async_current_source_matches_snapshot() to decide whether the source index still exists. If the source index was dropped (a likely post-crash resume scenario), AsyncSearchIndex.from_existing() will raise and apply() will bubble the exception (this code runs before the main try/except), preventing resume. Catch the "index not found" path here and treat it as source_still_exists = False.
| source_still_exists = ( | |
| await self._async_current_source_matches_snapshot( | |
| plan.source.index_name, | |
| plan.source.schema_snapshot, | |
| redis_url=redis_url, | |
| redis_client=redis_client, | |
| ) | |
| ) | |
| try: | |
| source_still_exists = await self._async_current_source_matches_snapshot( | |
| plan.source.index_name, | |
| plan.source.schema_snapshot, | |
| redis_url=redis_url, | |
| redis_client=redis_client, | |
| ) | |
| except ResponseError as exc: | |
| # If the source index no longer exists, treat this as | |
| # "source_still_exists = False" so we can resume from | |
| # the completed checkpoint in a post-drop crash scenario. | |
| if "Unknown Index name" in str(exc): | |
| logger.info( | |
| "Source index '%s' not found when checking completed " | |
| "checkpoint at %s; treating as dropped", | |
| plan.source.index_name, | |
| checkpoint_path, | |
| ) | |
| source_still_exists = False | |
| else: | |
| raise |
|
@codex review |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 7a1ef9a651
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| current_index = await AsyncSearchIndex.from_existing( | ||
| index_name, | ||
| redis_url=redis_url, | ||
| redis_client=redis_client, | ||
| ) |
There was a problem hiding this comment.
Return false when async source index is already dropped
This helper is used by apply() to decide whether a completed checkpoint should resume from a post-drop state, but AsyncSearchIndex.from_existing(...) raises when the source index is gone (which is exactly that resume scenario). Because this call happens before the main try/except in apply(), a valid completed-checkpoint resume can crash with an exception instead of continuing or returning a failed MigrationReport. Catch the missing-index error here and return False so resume logic can proceed.
Useful? React with 👍 / 👎.
| from redis.asyncio import Redis | ||
|
|
||
| client = Redis.from_url(redis_url) |
There was a problem hiding this comment.
Close temporary async Redis client after SVS check
When redis_url is provided (without redis_client), this function creates an async Redis client via Redis.from_url(...) but never closes it. Planning runs can be invoked repeatedly, so these unclosed clients accumulate sockets/connections over time and degrade process stability. The sync planner closes its temporary client in finally; the async path should similarly await aclose() for clients it creates.
Useful? React with 👍 / 👎.


Summary
Async versions of planner, executor, and validator for non-blocking migration workflows. Async executor mirrors the sync drop/recreate flow with async key enumeration, prefix/field renames, vector re-encoding with checkpoint resume, and readiness polling.
Includes async utilities for index listing, readiness polling, and source snapshot validation.
Files
redisvl/migration/async_executor.py,async_planner.py,async_validation.py,async_utils.pyStack
Note
Medium Risk
Adds a new async migration execution path that performs destructive operations (drop/recreate, key/field renames, in-place vector re-encoding) and introduces checkpointed resume logic, which can impact data integrity if edge cases are missed. Changes are mostly additive and covered by new unit/integration tests, but they touch migration reliability concerns and Redis command sequencing.
Overview
Introduces async equivalents of the migration workflow:
AsyncMigrationPlanner,AsyncMigrationExecutor, andAsyncMigrationValidator, plus async helpers for listing indexes, readiness polling, and source-snapshot validation.The async executor mirrors the sync drop/recreate flow but uses non-blocking Redis operations, including FT.AGGREGATE-based key enumeration with SCAN fallback, optional BGSAVE safety snapshot, hash/JSON field renames, prefix key renames with collision fail-fast, vector re-encoding with idempotent skip + per-batch rollback, and optional checkpoint-based resume after crashes.
Exports these async APIs from
redisvl.migrationand adds substantial unit + integration test coverage for planning, apply/validate flow, enumeration, quantization checkpointing, and readiness utilities.Written by Cursor Bugbot for commit 7a1ef9a. This will update automatically on new commits. Configure here.