Skip to content

feat(migrator): [3/7] Async migration with non-blocking planner, executor, validator, and readiness utilities#562

Open
nkanu17 wants to merge 6 commits intofeat/migrate-executorfrom
feat/migrate-async
Open

feat(migrator): [3/7] Async migration with non-blocking planner, executor, validator, and readiness utilities#562
nkanu17 wants to merge 6 commits intofeat/migrate-executorfrom
feat/migrate-async

Conversation

@nkanu17
Copy link
Copy Markdown
Collaborator

@nkanu17 nkanu17 commented Apr 1, 2026

Summary

Async versions of planner, executor, and validator for non-blocking migration workflows. Async executor mirrors the sync drop/recreate flow with async key enumeration, prefix/field renames, vector re-encoding with checkpoint resume, and readiness polling.

Includes async utilities for index listing, readiness polling, and source snapshot validation.

Files

  • redisvl/migration/async_executor.py, async_planner.py, async_validation.py, async_utils.py
  • Async unit and integration tests

Stack

  1. [1/7] Migration foundation > feat(migrator): [1/7] Migration foundation with models, schema-aware planner, validation, and shared utilities #560
  2. [2/7] Sync executor with reliability and quantization > feat(migrator): [2/7] Sync executor with reliability checkpointing, crash-safe resume, and quantization support #561
  3. [3/7] Async migration (this PR)
  4. [4/7] Batch migration
  5. [5/7] Interactive wizard
  6. [6/7] CLI and documentation
  7. [7/7] Benchmarks

Note

Medium Risk
Adds a new async migration execution path that performs destructive operations (drop/recreate, key/field renames, in-place vector re-encoding) and introduces checkpointed resume logic, which can impact data integrity if edge cases are missed. Changes are mostly additive and covered by new unit/integration tests, but they touch migration reliability concerns and Redis command sequencing.

Overview
Introduces async equivalents of the migration workflow: AsyncMigrationPlanner, AsyncMigrationExecutor, and AsyncMigrationValidator, plus async helpers for listing indexes, readiness polling, and source-snapshot validation.

The async executor mirrors the sync drop/recreate flow but uses non-blocking Redis operations, including FT.AGGREGATE-based key enumeration with SCAN fallback, optional BGSAVE safety snapshot, hash/JSON field renames, prefix key renames with collision fail-fast, vector re-encoding with idempotent skip + per-batch rollback, and optional checkpoint-based resume after crashes.

Exports these async APIs from redisvl.migration and adds substantial unit + integration test coverage for planning, apply/validate flow, enumeration, quantization checkpointing, and readiness utilities.

Written by Cursor Bugbot for commit 7a1ef9a. This will update automatically on new commits. Configure here.

@jit-ci
Copy link
Copy Markdown

jit-ci bot commented Apr 1, 2026

🛡️ Jit Security Scan Results

CRITICAL HIGH MEDIUM

✅ No security findings were detected in this PR


Security scan by Jit

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds an async migration surface (AsyncMigrationPlanner, AsyncMigrationExecutor, AsyncMigrationValidator) to enable non-blocking drop/recreate migrations, plus async readiness/index utilities and corresponding unit/integration tests.

Changes:

  • Introduces async planner/executor/validator implementations mirroring the existing sync migration flow (including renames, vector re-encoding, and readiness polling).
  • Adds async helper utilities for listing indexes, readiness polling, and snapshot comparison.
  • Adds new async unit and integration tests for planning, execution, disk space estimation, and reliability helpers.

Reviewed changes

Copilot reviewed 8 out of 8 changed files in this pull request and generated 6 comments.

Show a summary per file
File Description
redisvl/migration/async_planner.py Async migration planning built on AsyncSearchIndex with sync planner delegation.
redisvl/migration/async_executor.py Async migration apply flow incl. key/field renames, optional vector re-encoding, readiness wait, and validation.
redisvl/migration/async_validation.py Async post-migration validation and query checks.
redisvl/migration/async_utils.py Async index listing, readiness polling, and snapshot match helper.
redisvl/migration/__init__.py Exposes new async migration APIs/utilities from the package.
tests/unit/test_async_migration_planner.py Unit coverage for async planner parity with sync behavior.
tests/unit/test_async_migration_executor.py Unit coverage for async executor + disk estimator + reliability helpers.
tests/integration/test_async_migration_v1.py End-to-end integration coverage for async plan/apply/validate against Redis.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 0087dcf33b

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

@nkanu17
Copy link
Copy Markdown
Collaborator Author

nkanu17 commented Apr 1, 2026

@codex review

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 0087dcf33b

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 8 out of 8 changed files in this pull request and generated 5 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@nkanu17
Copy link
Copy Markdown
Collaborator Author

nkanu17 commented Apr 1, 2026

@codex review

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 0087dcf33b

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

nkanu17 added a commit that referenced this pull request Apr 2, 2026
- Fix unbound 'ready' variable in async_utils.py and async_executor.py
- Fix completed checkpoint: resume from post-drop state
- Pass rename_operations to get_vector_datatype_changes
- Fix has_prefix_change falsy check for empty string prefixes
- Fix partial key renames: fail fast on collision
- Warn when field rename overwrites existing destination field
- Fix async_validation prefix handling and indexing failure delta
nkanu17 added a commit that referenced this pull request Apr 2, 2026
- Fix _quantize_vectors docstring: 'documents quantized' not 'processed'
- Close internally-created Redis client in async_list_indexes
@nkanu17 nkanu17 force-pushed the feat/migrate-executor branch from 50cff88 to 33f2a40 Compare April 2, 2026 00:30
@nkanu17 nkanu17 force-pushed the feat/migrate-async branch from 0087dcf to 8642ec7 Compare April 2, 2026 00:30
@nkanu17
Copy link
Copy Markdown
Collaborator Author

nkanu17 commented Apr 2, 2026

@codex review

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 8642ec7435

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

nkanu17 added 6 commits April 1, 2026 23:55
- Remap datatype_changes keys to post-rename field names before quantization
- Only resume from completed checkpoint when source index is actually gone
…xecutor, validator, and readiness utilities

Async versions of planner, executor, and validator for non-blocking
migration workflows. Async executor mirrors the sync drop/recreate
flow with async key enumeration, prefix/field renames, vector
re-encoding with checkpoint resume, and readiness polling.

Includes async utilities for index listing, readiness polling, and
source snapshot validation. Adds async unit and integration tests.
- Fix unbound 'ready' variable in async_utils.py and async_executor.py
- Fix completed checkpoint: resume from post-drop state
- Pass rename_operations to get_vector_datatype_changes
- Fix has_prefix_change falsy check for empty string prefixes
- Fix partial key renames: fail fast on collision
- Warn when field rename overwrites existing destination field
- Fix async_validation prefix handling and indexing failure delta
- Fix _quantize_vectors docstring: 'documents quantized' not 'processed'
- Close internally-created Redis client in async_list_indexes
- Pass existing snapshot to create_plan_from_patch to avoid double Redis round-trip
- Use _get_client() instead of _redis_client for lazy async client initialization
- Remap datatype_changes keys to post-rename field names before quantization
- Only resume from completed checkpoint when source index is actually gone
@nkanu17 nkanu17 force-pushed the feat/migrate-executor branch from 33f2a40 to 42aa7bb Compare April 2, 2026 03:58
Copilot AI review requested due to automatic review settings April 2, 2026 03:58
@nkanu17 nkanu17 force-pushed the feat/migrate-async branch from 8642ec7 to 7a1ef9a Compare April 2, 2026 03:58
nkanu17 added a commit that referenced this pull request Apr 2, 2026
- Pass existing snapshot to create_plan_from_patch to avoid double Redis round-trip
- Use _get_client() instead of _redis_client for lazy async client initialization
- Remap datatype_changes keys to post-rename field names before quantization
- Only resume from completed checkpoint when source index is actually gone
@nkanu17
Copy link
Copy Markdown
Collaborator Author

nkanu17 commented Apr 2, 2026

@codex review

Copy link
Copy Markdown

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 2 potential issues.

Fix All in Cursor

Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, have a team admin enable autofix in the Cursor dashboard.

percent_indexed = latest_info.get("percent_indexed")

if percent_indexed is not None or indexing is not None:
ready = float(percent_indexed or 0) >= 1.0 and not bool(indexing)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Async readiness check logic diverges from sync version

High Severity

The async readiness check uses float(percent_indexed or 0) >= 1.0 and not bool(indexing), which differs from the sync version's logic. When percent_indexed is None but indexing is present and falsy (e.g., 0), the sync version correctly falls through to ready = not is_indexing (returning True). The async version evaluates float(None or 0) >= 1.0False, so ready stays False, potentially causing a 30-minute timeout instead of detecting the index is ready.

Additional Locations (1)
Fix in Cursor Fix in Web

warnings.append(
"SVS-VAMANA requires Redis >= 8.2.0 and Redis Search >= 2.8.10. "
"Verify your Redis instance supports this algorithm before applying."
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Async SVS check leaks Redis client connection

Medium Severity

When redis_url is provided, _check_svs_vamana_requirements creates a new Redis.from_url() client but never closes it. The sync counterpart in planner.py tracks the created client and calls created_client.close() in a finally block. The async version is missing both the created_client tracking and the finally cleanup with await client.aclose().

Fix in Cursor Fix in Web

@jit-ci
Copy link
Copy Markdown

jit-ci bot commented Apr 2, 2026

❌ Jit Scanner failed - Our team is investigating

Jit Scanner failed - Our team has been notified and is working to resolve the issue. Please contact support if you have any questions.


💡 Need to bypass this check? Comment @sera bypass to override.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 9 out of 9 changed files in this pull request and generated 3 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +63 to +66
ready = float(percent_indexed or 0) >= 1.0 and not bool(indexing)
if progress_callback:
total_docs = int(latest_info.get("num_docs", 0))
pct = float(percent_indexed or 0)
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

async_wait_for_index_ready() treats a missing percent_indexed value as 0.0 even when the indexing flag is present, which can make the loop wait until timeout on Redis/Search versions that omit percent_indexed. Mirror the sync wait_for_index_ready() logic: if percent_indexed is None but indexing is present, consider the index ready when indexing is falsy, and compute progress accordingly.

Suggested change
ready = float(percent_indexed or 0) >= 1.0 and not bool(indexing)
if progress_callback:
total_docs = int(latest_info.get("num_docs", 0))
pct = float(percent_indexed or 0)
# Mirror sync wait_for_index_ready behavior:
# - If percent_indexed is provided, use it directly.
# - If percent_indexed is None but indexing is present, treat the
# index as fully indexed (pct = 1.0) when indexing is falsy.
if percent_indexed is not None:
pct = float(percent_indexed)
else:
pct = 1.0 if not bool(indexing) else 0.0
ready = pct >= 1.0 and not bool(indexing)
if progress_callback:
total_docs = int(latest_info.get("num_docs", 0))

Copilot uses AI. Check for mistakes.
Comment on lines +1071 to +1077
if percent_indexed is not None or indexing is not None:
ready = float(percent_indexed or 0) >= 1.0 and not bool(indexing)
if progress_callback:
total_docs = int(latest_info.get("num_docs", 0))
pct = float(percent_indexed or 0)
indexed_docs = int(total_docs * pct)
progress_callback(indexed_docs, total_docs, pct * 100)
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_async_wait_for_index_ready() can fail to ever report readiness when percent_indexed is missing but the indexing flag exists (it coerces missing percent to 0.0 and requires >= 1.0). Align with the sync readiness utility: if percent_indexed is absent, use the indexing flag alone to decide readiness (ready when indexing is falsy).

Suggested change
if percent_indexed is not None or indexing is not None:
ready = float(percent_indexed or 0) >= 1.0 and not bool(indexing)
if progress_callback:
total_docs = int(latest_info.get("num_docs", 0))
pct = float(percent_indexed or 0)
indexed_docs = int(total_docs * pct)
progress_callback(indexed_docs, total_docs, pct * 100)
if percent_indexed is not None:
pct = float(percent_indexed or 0)
ready = pct >= 1.0 and not bool(indexing)
if progress_callback:
total_docs = int(latest_info.get("num_docs", 0))
indexed_docs = int(total_docs * pct)
progress_callback(indexed_docs, total_docs, pct * 100)
elif indexing is not None:
# When percent_indexed is absent but indexing is present,
# fall back to the indexing flag alone (ready when falsy),
# matching the synchronous readiness utility behavior.
ready = not bool(indexing)
if progress_callback:
total_docs = int(latest_info.get("num_docs", 0))
if ready:
indexed_docs = total_docs
pct_value = 100.0
else:
indexed_docs = 0
pct_value = 0.0
progress_callback(indexed_docs, total_docs, pct_value)

Copilot uses AI. Check for mistakes.
Comment on lines +447 to +454
source_still_exists = (
await self._async_current_source_matches_snapshot(
plan.source.index_name,
plan.source.schema_snapshot,
redis_url=redis_url,
redis_client=redis_client,
)
)
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When checkpoint.status == "completed", this calls _async_current_source_matches_snapshot() to decide whether the source index still exists. If the source index was dropped (a likely post-crash resume scenario), AsyncSearchIndex.from_existing() will raise and apply() will bubble the exception (this code runs before the main try/except), preventing resume. Catch the "index not found" path here and treat it as source_still_exists = False.

Suggested change
source_still_exists = (
await self._async_current_source_matches_snapshot(
plan.source.index_name,
plan.source.schema_snapshot,
redis_url=redis_url,
redis_client=redis_client,
)
)
try:
source_still_exists = await self._async_current_source_matches_snapshot(
plan.source.index_name,
plan.source.schema_snapshot,
redis_url=redis_url,
redis_client=redis_client,
)
except ResponseError as exc:
# If the source index no longer exists, treat this as
# "source_still_exists = False" so we can resume from
# the completed checkpoint in a post-drop crash scenario.
if "Unknown Index name" in str(exc):
logger.info(
"Source index '%s' not found when checking completed "
"checkpoint at %s; treating as dropped",
plan.source.index_name,
checkpoint_path,
)
source_still_exists = False
else:
raise

Copilot uses AI. Check for mistakes.
@nkanu17
Copy link
Copy Markdown
Collaborator Author

nkanu17 commented Apr 2, 2026

@codex review

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 7a1ef9a651

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment on lines +1114 to +1118
current_index = await AsyncSearchIndex.from_existing(
index_name,
redis_url=redis_url,
redis_client=redis_client,
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Return false when async source index is already dropped

This helper is used by apply() to decide whether a completed checkpoint should resume from a post-drop state, but AsyncSearchIndex.from_existing(...) raises when the source index is gone (which is exactly that resume scenario). Because this call happens before the main try/except in apply(), a valid completed-checkpoint resume can crash with an exception instead of continuing or returning a failed MigrationReport. Catch the missing-index error here and return False so resume logic can proceed.

Useful? React with 👍 / 👎.

Comment on lines +168 to +170
from redis.asyncio import Redis

client = Redis.from_url(redis_url)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Close temporary async Redis client after SVS check

When redis_url is provided (without redis_client), this function creates an async Redis client via Redis.from_url(...) but never closes it. Planning runs can be invoked repeatedly, so these unclosed clients accumulate sockets/connections over time and degrade process stability. The sync planner closes its temporary client in finally; the async path should similarly await aclose() for clients it creates.

Useful? React with 👍 / 👎.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants