Skip to content

feat(migrator): [2/7] Sync executor with reliability checkpointing, crash-safe resume, and quantization support#561

Open
nkanu17 wants to merge 5 commits intofeat/migrate-corefrom
feat/migrate-executor
Open

feat(migrator): [2/7] Sync executor with reliability checkpointing, crash-safe resume, and quantization support#561
nkanu17 wants to merge 5 commits intofeat/migrate-corefrom
feat/migrate-executor

Conversation

@nkanu17
Copy link
Copy Markdown
Collaborator

@nkanu17 nkanu17 commented Apr 1, 2026

Summary

Migration executor that applies plans via drop/recreate with prefix rename, field operations, and vector datatype re-encoding. Reliability module provides crash-safe checkpointing, idempotent dtype detection, and resumable quantization for large indexes. Includes BGSAVE safety snapshots before mutations.

Files

  • redisvl/migration/executor.py - sync migration executor
  • redisvl/migration/reliability.py - checkpointing, idempotent dtype detection, resume logic
  • Integration tests for migration routes, comprehensive field changes, and JSON storage

Stack

  1. [1/7] Migration foundation > feat(migrator): [1/7] Migration foundation with models, schema-aware planner, validation, and shared utilities #560
  2. [2/7] Sync executor with reliability and quantization (this PR)
  3. [3/7] Async migration
  4. [4/7] Batch migration
  5. [5/7] Interactive wizard
  6. [6/7] CLI and documentation
  7. [7/7] Benchmarks

Note

High Risk
High risk because it introduces a new migration execution path that mutates live Redis data (dropping/recreating indexes, renaming keys/fields, and rewriting vector payloads) with checkpoint/resume logic that must be correct to avoid data loss or partial migrations.

Overview
Adds a new synchronous MigrationExecutor that can apply drop_recreate migration plans end-to-end: enumerating documents (via FT.AGGREGATE cursor with SCAN fallback), taking an optional BGSAVE safety snapshot, performing hash/JSON field renames, dropping the index definition, optionally renaming key prefixes with collision detection, recreating the index, waiting for reindexing, and running post-migration validation/benchmark reporting.

Introduces migration/reliability.py to make vector datatype migrations more robust, including idempotent “already quantized” detection, per-batch rollback buffering, and YAML checkpointing to resume long-running quantization after crashes (with explicit guardrails for same-width dtype conversions). Adds warnings/logging around SCAN pattern fallbacks, exports MigrationExecutor from migration/__init__.py, and adds large integration suites covering comprehensive migration operations and route-level scenarios.

Written by Cursor Bugbot for commit 42aa7bb. This will update automatically on new commits. Configure here.

@jit-ci
Copy link
Copy Markdown

jit-ci bot commented Apr 1, 2026

🛡️ Jit Security Scan Results

CRITICAL HIGH MEDIUM

✅ No security findings were detected in this PR


Security scan by Jit

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 50cff884e6

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds the synchronous migration execution layer for redisvl.migration, including crash-safe reliability primitives (checkpointed/resumable vector re-encoding) and integration tests to validate supported migration routes end-to-end.

Changes:

  • Introduces MigrationExecutor implementing drop/recreate migrations with prefix renames, field renames, and vector datatype re-encoding.
  • Adds reliability.py with quantization checkpointing, idempotent dtype detection, bounded undo buffering, and BGSAVE safety snapshot helpers.
  • Adds integration test coverage for the apply+validate flow and a broad set of supported migration operations (including JSON storage and quantized dtypes).

Reviewed changes

Copilot reviewed 6 out of 6 changed files in this pull request and generated 8 comments.

Show a summary per file
File Description
redisvl/migration/executor.py Sync migration executor implementing key enumeration, renames, quantization, drop/recreate, and validation orchestration.
redisvl/migration/reliability.py Crash-safety utilities: dtype detection, checkpoint persistence, BGSAVE wait helpers, and rollback buffer.
redisvl/migration/__init__.py Exposes MigrationExecutor as part of the public migration API.
tests/integration/test_migration_v1.py Adds a basic end-to-end plan→apply→validate integration test.
tests/integration/test_migration_routes.py Adds integration tests for supported algorithm/dtype/distance-metric migration routes.
tests/integration/test_migration_comprehensive.py Adds a comprehensive integration suite covering many supported migration operations and JSON storage cases.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@nkanu17
Copy link
Copy Markdown
Collaborator Author

nkanu17 commented Apr 1, 2026

@codex review

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 50cff884e6

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 6 out of 6 changed files in this pull request and generated 6 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@nkanu17
Copy link
Copy Markdown
Collaborator Author

nkanu17 commented Apr 1, 2026

@codex review

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 50cff884e6

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

nkanu17 added a commit that referenced this pull request Apr 2, 2026
- Fix partial key renames: fail fast on collision instead of continuing
- Fix completed checkpoint: resume from post-drop state for index recreation
- Pass rename_operations to get_vector_datatype_changes for renamed vectors
- Add warning when SCAN falls back to '*' (full keyspace scan)
- Warn when field rename overwrites existing destination field
- Fix has_prefix_change falsy check for empty string prefixes
- Add logger to migration utils
nkanu17 added a commit that referenced this pull request Apr 2, 2026
- Fix _quantize_vectors docstring: 'documents quantized' not 'processed'
- Wrap test_migration_v1 assertions in try/finally for cleanup
@nkanu17 nkanu17 force-pushed the feat/migrate-executor branch from 50cff88 to 33f2a40 Compare April 2, 2026 00:30
@nkanu17
Copy link
Copy Markdown
Collaborator Author

nkanu17 commented Apr 2, 2026

@codex review

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 33f2a40b2d

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

nkanu17 added 5 commits April 1, 2026 21:53
- Fix has_operations treating empty string prefix as falsy (use 'is not None')
- Normalize single-element list prefix to string in _extract_rename_operations
- Close Redis client created in _check_svs_vamana_requirements
- Handle renamed vector fields in estimate_disk_space
- Fix percent_indexed=None when indexing flag is present
- Remove accidentally committed docs/dev/pr-review-triage.md
… and quantization support

Migration executor that applies plans via drop/recreate with prefix
rename, field operations, and vector datatype re-encoding. Reliability
module provides crash-safe checkpointing, idempotent dtype detection,
and resumable quantization for large indexes. Includes BGSAVE safety
snapshots before mutations.

Includes integration tests for migration routes, comprehensive field
attribute changes, and JSON storage.
- Fix partial key renames: fail fast on collision instead of continuing
- Fix completed checkpoint: resume from post-drop state for index recreation
- Pass rename_operations to get_vector_datatype_changes for renamed vectors
- Add warning when SCAN falls back to '*' (full keyspace scan)
- Warn when field rename overwrites existing destination field
- Fix has_prefix_change falsy check for empty string prefixes
- Add logger to migration utils
- Fix _quantize_vectors docstring: 'documents quantized' not 'processed'
- Wrap test_migration_v1 assertions in try/finally for cleanup
- Remap datatype_changes keys to post-rename field names before quantization
- Only resume from completed checkpoint when source index is actually gone
Copilot AI review requested due to automatic review settings April 2, 2026 03:58
nkanu17 added a commit that referenced this pull request Apr 2, 2026
- Remap datatype_changes keys to post-rename field names before quantization
- Only resume from completed checkpoint when source index is actually gone
@nkanu17
Copy link
Copy Markdown
Collaborator Author

nkanu17 commented Apr 2, 2026

@codex review

Copy link
Copy Markdown

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Fix All in Cursor

Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, have a team admin enable autofix in the Cursor dashboard.

effective_changes = {
field_rename_map.get(k, k): v
for k, v in datatype_changes.items()
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resume path skips field-name remapping for quantization

High Severity

When resuming from a checkpoint with both field renames and vector quantization, the effective_changes dict is not remapped from source to target field names. The condition has_field_renames and not resuming_from_checkpoint prevents remapping during resume, but field renames were already applied before the crash (step 2). _quantize_vectors then calls hget(key, old_field_name), gets None (the field was already renamed), and silently skips all vectors. The index is recreated expecting the new datatype, but vectors remain in the old format, producing corrupt search results.

Additional Locations (1)
Fix in Cursor Fix in Web

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 8 out of 8 changed files in this pull request and generated 10 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +95 to +97
"price": 99.99,
"location": "-122.4194,37.7749", # SF coordinates
"embedding": array_to_buffer([0.1, 0.2, 0.3, 0.4], "float32"),
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sample GEO values are in lon,lat order (e.g. -122.4194,37.7749), but RedisVL’s GEO validators expect lat,lon (see redisvl/schema/type_utils.py + redisvl/schema/validation.py). This will raise validation errors when loading docs. Swap the coordinate order in the test data to 37.7749,-122.4194 (and likewise for the other entries).

Copilot uses AI. Check for mistakes.
Comment on lines +128 to +129
live_index = SearchIndex.from_existing(index_name, redis_url=redis_url)
live_index.delete(drop=True)
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cleanup in finally calls SearchIndex.from_existing(index_name) unconditionally. If the migration fails after dropping the index (or renames it), this will raise and can mask the real test failure and/or leak resources. Wrap this cleanup in a try/except (or reuse the existing source_index object when possible) so cleanup is best-effort.

Suggested change
live_index = SearchIndex.from_existing(index_name, redis_url=redis_url)
live_index.delete(drop=True)
try:
live_index = SearchIndex.from_existing(index_name, redis_url=redis_url)
live_index.delete(drop=True)
except Exception:
# Best-effort cleanup: ignore errors if the index no longer exists or deletion fails.
pass

Copilot uses AI. Check for mistakes.
from __future__ import annotations

import json
import logging
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

import logging is unused after switching to get_logger(__name__). Remove the unused import to avoid lint/typecheck failures.

Suggested change
import logging

Copilot uses AI. Check for mistakes.
Comment on lines +19 to +20

logger = logging.getLogger(__name__)
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This module uses logging.getLogger(__name__), but the codebase consistently uses redisvl.utils.log.get_logger(__name__) (e.g. redisvl/index/index.py:103-105, redisvl/schema/validation.py:18-21). Switching to get_logger keeps logging formatting/handlers consistent across the library.

Suggested change
logger = logging.getLogger(__name__)
from redisvl.utils.log import get_logger
logger = get_logger(__name__)

Copilot uses AI. Check for mistakes.
Comment on lines +38 to +39

logger = logging.getLogger(__name__)
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file uses logging.getLogger(__name__), while most of the repo uses redisvl.utils.log.get_logger(__name__) for consistent logger configuration. Consider switching to get_logger to match the established convention.

Suggested change
logger = logging.getLogger(__name__)
from redisvl.utils.log import get_logger
logger = get_logger(__name__)

Copilot uses AI. Check for mistakes.
Comment on lines +245 to +284
for i in range(0, total, pipeline_size):
batch = keys[i : i + pipeline_size]
pipe = client.pipeline(transaction=False)
batch_new_keys: List[str] = []

for key in batch:
# Compute new key name
if key.startswith(old_prefix):
new_key = new_prefix + key[len(old_prefix) :]
else:
# Key doesn't match expected prefix, skip
logger.warning(
f"Key '{key}' does not start with prefix '{old_prefix}'"
)
continue
pipe.renamenx(key, new_key)
batch_new_keys.append(new_key)

try:
results = pipe.execute()
for j, r in enumerate(results):
if r is True or r == 1:
renamed += 1
else:
collisions.append(batch_new_keys[j])
except Exception as e:
logger.warning(f"Error in rename batch: {e}")
raise

# Fail fast on collisions to avoid partial renames across batches.
# Keys already renamed in THIS batch are not rolled back -- caller
# can inspect the error to understand which keys moved.
if collisions:
raise RuntimeError(
f"Prefix rename aborted after {renamed} successful rename(s): "
f"{len(collisions)} destination key(s) already exist "
f"(first 5: {collisions[:5]}). This would overwrite existing data. "
f"Remove conflicting keys or choose a different prefix."
)

Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On collision (RENAMENX returns 0), this function raises after executing the whole batch, but does not roll back keys that were successfully renamed earlier in the batch. That can leave the keyspace partially renamed and make retries/resume harder. Prefer detecting collisions before executing renames for a batch (e.g., EXISTS on destination keys) or rolling back the successful renames in the same batch before raising.

Copilot uses AI. Check for mistakes.
Comment on lines +328 to +334
logger.warning(
"Field '%s' already exists in key '%s'; "
"overwriting with value from '%s'",
new_name,
key,
old_name,
)
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If new_name already exists in a document, this code logs a warning but overwrites it with the value from old_name, which can silently lose data. Consider failing fast on destination-exists (or making overwrite behavior explicit via an opt-in flag) to keep field renames safe and predictable.

Suggested change
logger.warning(
"Field '%s' already exists in key '%s'; "
"overwriting with value from '%s'",
new_name,
key,
old_name,
)
msg = (
"Field '%s' already exists in key '%s'; "
"refusing to overwrite with value from '%s'"
)
logger.error(msg, new_name, key, old_name)
# Fail fast to avoid silently losing data.
raise RuntimeError(msg % (new_name, key, old_name))

Copilot uses AI. Check for mistakes.
Comment on lines +381 to +399
# Now set new field and delete old
# JSONPath GET returns results as a list; unwrap single-element
# results to preserve the original document shape.
# Missing paths return None or [] depending on Redis version.
pipe = client.pipeline(transaction=False)
batch_ops = 0
for key, value in zip(batch, values):
if value is None or value == []:
continue
if isinstance(value, list) and len(value) == 1:
value = value[0]
pipe.json().set(key, new_path, value)
pipe.json().delete(key, old_path)
batch_ops += 1
try:
pipe.execute()
# Count by number of keys that had old field values,
# not by JSON.SET return value
renamed += batch_ops
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JSON field renames always JSON.SET the destination path without checking whether it already exists, which can overwrite existing data. For safety, consider checking for an existing new_path value and failing (or requiring an explicit overwrite option) rather than overwriting by default.

Copilot uses AI. Check for mistakes.
Comment on lines +1009 to +1028
for key in batch:
for field_name, change in datatype_changes.items():
field_data: bytes | None = client.hget(key, field_name) # type: ignore[misc,assignment]
if not field_data:
continue

# Idempotent: skip if already converted to target dtype
dims = change.get("dims", 0)
if dims and is_already_quantized(
field_data, dims, change["source"], change["target"]
):
skipped += 1
continue

undo.store(key, field_name, field_data)
array = buffer_to_array(field_data, change["source"])
new_bytes = array_to_buffer(array, change["target"])
pipe.hset(key, field_name, new_bytes) # type: ignore[arg-type]
keys_updated_in_batch.add(key)

Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quantization reads each vector value via client.hget(...) inside nested loops, which produces N×M round-trips (keys × fields) per batch and largely defeats pipelining. Fetch the needed fields for the whole batch with a pipeline (e.g., HMGET/multiple HGETs queued on the pipeline) and then pipeline the corresponding HSETs; this will significantly reduce network overhead for large migrations.

Suggested change
for key in batch:
for field_name, change in datatype_changes.items():
field_data: bytes | None = client.hget(key, field_name) # type: ignore[misc,assignment]
if not field_data:
continue
# Idempotent: skip if already converted to target dtype
dims = change.get("dims", 0)
if dims and is_already_quantized(
field_data, dims, change["source"], change["target"]
):
skipped += 1
continue
undo.store(key, field_name, field_data)
array = buffer_to_array(field_data, change["source"])
new_bytes = array_to_buffer(array, change["target"])
pipe.hset(key, field_name, new_bytes) # type: ignore[arg-type]
keys_updated_in_batch.add(key)
# Pipeline all HGETs for this batch to avoid N×M round-trips.
read_pipe = client.pipeline()
requests: list[tuple[str, str, dict[str, Any]]] = []
for key in batch:
for field_name, change in datatype_changes.items():
read_pipe.hget(key, field_name)
requests.append((key, field_name, change))
# Execute read pipeline once for the whole batch.
results = read_pipe.execute()
for (key, field_name, change), field_data in zip(requests, results):
field_data = field_data or None # normalize falsy to None for checks
if not field_data:
continue
# Idempotent: skip if already converted to target dtype
dims = change.get("dims", 0)
if dims and is_already_quantized(
field_data, dims, change["source"], change["target"]
):
skipped += 1
continue
undo.store(key, field_name, field_data)
array = buffer_to_array(field_data, change["source"])
new_bytes = array_to_buffer(array, change["target"])
pipe.hset(key, field_name, new_bytes) # type: ignore[arg-type]
keys_updated_in_batch.add(key)

Copilot uses AI. Check for mistakes.
Comment on lines +258 to +265
while time.monotonic() < deadline:
info = client.info("persistence")
if isinstance(info, dict) and not info.get("rdb_bgsave_in_progress", 0):
return True
time.sleep(poll_interval)

raise TimeoutError(f"BGSAVE did not complete within {timeout_seconds}s")

Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

trigger_bgsave_and_wait only waits for rdb_bgsave_in_progress to become false, but doesn’t verify the last BGSAVE actually succeeded (e.g. rdb_last_bgsave_status == 'ok'). Consider checking the status and raising (or returning False) on failure so the “safety snapshot” signal is accurate.

Copilot uses AI. Check for mistakes.
@nkanu17
Copy link
Copy Markdown
Collaborator Author

nkanu17 commented Apr 2, 2026

@codex review

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 42aa7bb59a

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment on lines +477 to +481
if source_still_exists:
logger.info(
"Checkpoint at %s is completed and source index "
"still exists; treating as fresh run",
checkpoint_path,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Clear stale completed checkpoint before quantization

When this branch decides a completed checkpoint should be treated as a fresh run (because the source index still exists), checkpoint_path is left unchanged and later passed into _quantize_vectors, which immediately returns 0 for status == "completed". In migrations with vector datatype changes, that skips required re-encoding and can leave documents in the old byte format before recreate/reindex, producing indexing failures or incorrect migration results. Reset/ignore the checkpoint for this path so quantization actually runs.

Useful? React with 👍 / 👎.

Comment on lines +490 to +493
else:
resuming_from_checkpoint = True
logger.info(
"Checkpoint found at %s, skipping source index validation "
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Validate source absence before resuming incomplete checkpoint

Any non-completed checkpoint currently forces resuming_from_checkpoint = True without checking whether the source index is still present. If a stale in-progress checkpoint is reused after restoring the source index (or on a rerun), the executor skips normal source validation/drop and resumes partial quantization state, then can fail on target_index.create() while leaving vectors partially rewritten. Resume should only be enabled after confirming the source index is actually gone (or via an explicit override).

Useful? React with 👍 / 👎.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants