feat(migrator): [2/7] Sync executor with reliability checkpointing, crash-safe resume, and quantization support#561
feat(migrator): [2/7] Sync executor with reliability checkpointing, crash-safe resume, and quantization support#561nkanu17 wants to merge 5 commits intofeat/migrate-corefrom
Conversation
🛡️ Jit Security Scan Results✅ No security findings were detected in this PR
Security scan by Jit
|
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 50cff884e6
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
There was a problem hiding this comment.
Pull request overview
Adds the synchronous migration execution layer for redisvl.migration, including crash-safe reliability primitives (checkpointed/resumable vector re-encoding) and integration tests to validate supported migration routes end-to-end.
Changes:
- Introduces
MigrationExecutorimplementing drop/recreate migrations with prefix renames, field renames, and vector datatype re-encoding. - Adds
reliability.pywith quantization checkpointing, idempotent dtype detection, bounded undo buffering, and BGSAVE safety snapshot helpers. - Adds integration test coverage for the apply+validate flow and a broad set of supported migration operations (including JSON storage and quantized dtypes).
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
redisvl/migration/executor.py |
Sync migration executor implementing key enumeration, renames, quantization, drop/recreate, and validation orchestration. |
redisvl/migration/reliability.py |
Crash-safety utilities: dtype detection, checkpoint persistence, BGSAVE wait helpers, and rollback buffer. |
redisvl/migration/__init__.py |
Exposes MigrationExecutor as part of the public migration API. |
tests/integration/test_migration_v1.py |
Adds a basic end-to-end plan→apply→validate integration test. |
tests/integration/test_migration_routes.py |
Adds integration tests for supported algorithm/dtype/distance-metric migration routes. |
tests/integration/test_migration_comprehensive.py |
Adds a comprehensive integration suite covering many supported migration operations and JSON storage cases. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
@codex review |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 50cff884e6
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 6 out of 6 changed files in this pull request and generated 6 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
@codex review |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 50cff884e6
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
- Fix partial key renames: fail fast on collision instead of continuing - Fix completed checkpoint: resume from post-drop state for index recreation - Pass rename_operations to get_vector_datatype_changes for renamed vectors - Add warning when SCAN falls back to '*' (full keyspace scan) - Warn when field rename overwrites existing destination field - Fix has_prefix_change falsy check for empty string prefixes - Add logger to migration utils
- Fix _quantize_vectors docstring: 'documents quantized' not 'processed' - Wrap test_migration_v1 assertions in try/finally for cleanup
50cff88 to
33f2a40
Compare
|
@codex review |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 33f2a40b2d
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
- Fix has_operations treating empty string prefix as falsy (use 'is not None') - Normalize single-element list prefix to string in _extract_rename_operations - Close Redis client created in _check_svs_vamana_requirements - Handle renamed vector fields in estimate_disk_space - Fix percent_indexed=None when indexing flag is present - Remove accidentally committed docs/dev/pr-review-triage.md
… and quantization support Migration executor that applies plans via drop/recreate with prefix rename, field operations, and vector datatype re-encoding. Reliability module provides crash-safe checkpointing, idempotent dtype detection, and resumable quantization for large indexes. Includes BGSAVE safety snapshots before mutations. Includes integration tests for migration routes, comprehensive field attribute changes, and JSON storage.
- Fix partial key renames: fail fast on collision instead of continuing - Fix completed checkpoint: resume from post-drop state for index recreation - Pass rename_operations to get_vector_datatype_changes for renamed vectors - Add warning when SCAN falls back to '*' (full keyspace scan) - Warn when field rename overwrites existing destination field - Fix has_prefix_change falsy check for empty string prefixes - Add logger to migration utils
- Fix _quantize_vectors docstring: 'documents quantized' not 'processed' - Wrap test_migration_v1 assertions in try/finally for cleanup
- Remap datatype_changes keys to post-rename field names before quantization - Only resume from completed checkpoint when source index is actually gone
- Remap datatype_changes keys to post-rename field names before quantization - Only resume from completed checkpoint when source index is actually gone
33f2a40 to
42aa7bb
Compare
|
@codex review |
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, have a team admin enable autofix in the Cursor dashboard.
| effective_changes = { | ||
| field_rename_map.get(k, k): v | ||
| for k, v in datatype_changes.items() | ||
| } |
There was a problem hiding this comment.
Resume path skips field-name remapping for quantization
High Severity
When resuming from a checkpoint with both field renames and vector quantization, the effective_changes dict is not remapped from source to target field names. The condition has_field_renames and not resuming_from_checkpoint prevents remapping during resume, but field renames were already applied before the crash (step 2). _quantize_vectors then calls hget(key, old_field_name), gets None (the field was already renamed), and silently skips all vectors. The index is recreated expecting the new datatype, but vectors remain in the old format, producing corrupt search results.
Additional Locations (1)
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 8 out of 8 changed files in this pull request and generated 10 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| "price": 99.99, | ||
| "location": "-122.4194,37.7749", # SF coordinates | ||
| "embedding": array_to_buffer([0.1, 0.2, 0.3, 0.4], "float32"), |
There was a problem hiding this comment.
The sample GEO values are in lon,lat order (e.g. -122.4194,37.7749), but RedisVL’s GEO validators expect lat,lon (see redisvl/schema/type_utils.py + redisvl/schema/validation.py). This will raise validation errors when loading docs. Swap the coordinate order in the test data to 37.7749,-122.4194 (and likewise for the other entries).
| live_index = SearchIndex.from_existing(index_name, redis_url=redis_url) | ||
| live_index.delete(drop=True) |
There was a problem hiding this comment.
The cleanup in finally calls SearchIndex.from_existing(index_name) unconditionally. If the migration fails after dropping the index (or renames it), this will raise and can mask the real test failure and/or leak resources. Wrap this cleanup in a try/except (or reuse the existing source_index object when possible) so cleanup is best-effort.
| live_index = SearchIndex.from_existing(index_name, redis_url=redis_url) | |
| live_index.delete(drop=True) | |
| try: | |
| live_index = SearchIndex.from_existing(index_name, redis_url=redis_url) | |
| live_index.delete(drop=True) | |
| except Exception: | |
| # Best-effort cleanup: ignore errors if the index no longer exists or deletion fails. | |
| pass |
| from __future__ import annotations | ||
|
|
||
| import json | ||
| import logging |
There was a problem hiding this comment.
import logging is unused after switching to get_logger(__name__). Remove the unused import to avoid lint/typecheck failures.
| import logging |
|
|
||
| logger = logging.getLogger(__name__) |
There was a problem hiding this comment.
This module uses logging.getLogger(__name__), but the codebase consistently uses redisvl.utils.log.get_logger(__name__) (e.g. redisvl/index/index.py:103-105, redisvl/schema/validation.py:18-21). Switching to get_logger keeps logging formatting/handlers consistent across the library.
| logger = logging.getLogger(__name__) | |
| from redisvl.utils.log import get_logger | |
| logger = get_logger(__name__) |
|
|
||
| logger = logging.getLogger(__name__) |
There was a problem hiding this comment.
This file uses logging.getLogger(__name__), while most of the repo uses redisvl.utils.log.get_logger(__name__) for consistent logger configuration. Consider switching to get_logger to match the established convention.
| logger = logging.getLogger(__name__) | |
| from redisvl.utils.log import get_logger | |
| logger = get_logger(__name__) |
| for i in range(0, total, pipeline_size): | ||
| batch = keys[i : i + pipeline_size] | ||
| pipe = client.pipeline(transaction=False) | ||
| batch_new_keys: List[str] = [] | ||
|
|
||
| for key in batch: | ||
| # Compute new key name | ||
| if key.startswith(old_prefix): | ||
| new_key = new_prefix + key[len(old_prefix) :] | ||
| else: | ||
| # Key doesn't match expected prefix, skip | ||
| logger.warning( | ||
| f"Key '{key}' does not start with prefix '{old_prefix}'" | ||
| ) | ||
| continue | ||
| pipe.renamenx(key, new_key) | ||
| batch_new_keys.append(new_key) | ||
|
|
||
| try: | ||
| results = pipe.execute() | ||
| for j, r in enumerate(results): | ||
| if r is True or r == 1: | ||
| renamed += 1 | ||
| else: | ||
| collisions.append(batch_new_keys[j]) | ||
| except Exception as e: | ||
| logger.warning(f"Error in rename batch: {e}") | ||
| raise | ||
|
|
||
| # Fail fast on collisions to avoid partial renames across batches. | ||
| # Keys already renamed in THIS batch are not rolled back -- caller | ||
| # can inspect the error to understand which keys moved. | ||
| if collisions: | ||
| raise RuntimeError( | ||
| f"Prefix rename aborted after {renamed} successful rename(s): " | ||
| f"{len(collisions)} destination key(s) already exist " | ||
| f"(first 5: {collisions[:5]}). This would overwrite existing data. " | ||
| f"Remove conflicting keys or choose a different prefix." | ||
| ) | ||
|
|
There was a problem hiding this comment.
On collision (RENAMENX returns 0), this function raises after executing the whole batch, but does not roll back keys that were successfully renamed earlier in the batch. That can leave the keyspace partially renamed and make retries/resume harder. Prefer detecting collisions before executing renames for a batch (e.g., EXISTS on destination keys) or rolling back the successful renames in the same batch before raising.
| logger.warning( | ||
| "Field '%s' already exists in key '%s'; " | ||
| "overwriting with value from '%s'", | ||
| new_name, | ||
| key, | ||
| old_name, | ||
| ) |
There was a problem hiding this comment.
If new_name already exists in a document, this code logs a warning but overwrites it with the value from old_name, which can silently lose data. Consider failing fast on destination-exists (or making overwrite behavior explicit via an opt-in flag) to keep field renames safe and predictable.
| logger.warning( | |
| "Field '%s' already exists in key '%s'; " | |
| "overwriting with value from '%s'", | |
| new_name, | |
| key, | |
| old_name, | |
| ) | |
| msg = ( | |
| "Field '%s' already exists in key '%s'; " | |
| "refusing to overwrite with value from '%s'" | |
| ) | |
| logger.error(msg, new_name, key, old_name) | |
| # Fail fast to avoid silently losing data. | |
| raise RuntimeError(msg % (new_name, key, old_name)) |
| # Now set new field and delete old | ||
| # JSONPath GET returns results as a list; unwrap single-element | ||
| # results to preserve the original document shape. | ||
| # Missing paths return None or [] depending on Redis version. | ||
| pipe = client.pipeline(transaction=False) | ||
| batch_ops = 0 | ||
| for key, value in zip(batch, values): | ||
| if value is None or value == []: | ||
| continue | ||
| if isinstance(value, list) and len(value) == 1: | ||
| value = value[0] | ||
| pipe.json().set(key, new_path, value) | ||
| pipe.json().delete(key, old_path) | ||
| batch_ops += 1 | ||
| try: | ||
| pipe.execute() | ||
| # Count by number of keys that had old field values, | ||
| # not by JSON.SET return value | ||
| renamed += batch_ops |
There was a problem hiding this comment.
JSON field renames always JSON.SET the destination path without checking whether it already exists, which can overwrite existing data. For safety, consider checking for an existing new_path value and failing (or requiring an explicit overwrite option) rather than overwriting by default.
| for key in batch: | ||
| for field_name, change in datatype_changes.items(): | ||
| field_data: bytes | None = client.hget(key, field_name) # type: ignore[misc,assignment] | ||
| if not field_data: | ||
| continue | ||
|
|
||
| # Idempotent: skip if already converted to target dtype | ||
| dims = change.get("dims", 0) | ||
| if dims and is_already_quantized( | ||
| field_data, dims, change["source"], change["target"] | ||
| ): | ||
| skipped += 1 | ||
| continue | ||
|
|
||
| undo.store(key, field_name, field_data) | ||
| array = buffer_to_array(field_data, change["source"]) | ||
| new_bytes = array_to_buffer(array, change["target"]) | ||
| pipe.hset(key, field_name, new_bytes) # type: ignore[arg-type] | ||
| keys_updated_in_batch.add(key) | ||
|
|
There was a problem hiding this comment.
Quantization reads each vector value via client.hget(...) inside nested loops, which produces N×M round-trips (keys × fields) per batch and largely defeats pipelining. Fetch the needed fields for the whole batch with a pipeline (e.g., HMGET/multiple HGETs queued on the pipeline) and then pipeline the corresponding HSETs; this will significantly reduce network overhead for large migrations.
| for key in batch: | |
| for field_name, change in datatype_changes.items(): | |
| field_data: bytes | None = client.hget(key, field_name) # type: ignore[misc,assignment] | |
| if not field_data: | |
| continue | |
| # Idempotent: skip if already converted to target dtype | |
| dims = change.get("dims", 0) | |
| if dims and is_already_quantized( | |
| field_data, dims, change["source"], change["target"] | |
| ): | |
| skipped += 1 | |
| continue | |
| undo.store(key, field_name, field_data) | |
| array = buffer_to_array(field_data, change["source"]) | |
| new_bytes = array_to_buffer(array, change["target"]) | |
| pipe.hset(key, field_name, new_bytes) # type: ignore[arg-type] | |
| keys_updated_in_batch.add(key) | |
| # Pipeline all HGETs for this batch to avoid N×M round-trips. | |
| read_pipe = client.pipeline() | |
| requests: list[tuple[str, str, dict[str, Any]]] = [] | |
| for key in batch: | |
| for field_name, change in datatype_changes.items(): | |
| read_pipe.hget(key, field_name) | |
| requests.append((key, field_name, change)) | |
| # Execute read pipeline once for the whole batch. | |
| results = read_pipe.execute() | |
| for (key, field_name, change), field_data in zip(requests, results): | |
| field_data = field_data or None # normalize falsy to None for checks | |
| if not field_data: | |
| continue | |
| # Idempotent: skip if already converted to target dtype | |
| dims = change.get("dims", 0) | |
| if dims and is_already_quantized( | |
| field_data, dims, change["source"], change["target"] | |
| ): | |
| skipped += 1 | |
| continue | |
| undo.store(key, field_name, field_data) | |
| array = buffer_to_array(field_data, change["source"]) | |
| new_bytes = array_to_buffer(array, change["target"]) | |
| pipe.hset(key, field_name, new_bytes) # type: ignore[arg-type] | |
| keys_updated_in_batch.add(key) |
| while time.monotonic() < deadline: | ||
| info = client.info("persistence") | ||
| if isinstance(info, dict) and not info.get("rdb_bgsave_in_progress", 0): | ||
| return True | ||
| time.sleep(poll_interval) | ||
|
|
||
| raise TimeoutError(f"BGSAVE did not complete within {timeout_seconds}s") | ||
|
|
There was a problem hiding this comment.
trigger_bgsave_and_wait only waits for rdb_bgsave_in_progress to become false, but doesn’t verify the last BGSAVE actually succeeded (e.g. rdb_last_bgsave_status == 'ok'). Consider checking the status and raising (or returning False) on failure so the “safety snapshot” signal is accurate.
|
@codex review |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 42aa7bb59a
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| if source_still_exists: | ||
| logger.info( | ||
| "Checkpoint at %s is completed and source index " | ||
| "still exists; treating as fresh run", | ||
| checkpoint_path, |
There was a problem hiding this comment.
Clear stale completed checkpoint before quantization
When this branch decides a completed checkpoint should be treated as a fresh run (because the source index still exists), checkpoint_path is left unchanged and later passed into _quantize_vectors, which immediately returns 0 for status == "completed". In migrations with vector datatype changes, that skips required re-encoding and can leave documents in the old byte format before recreate/reindex, producing indexing failures or incorrect migration results. Reset/ignore the checkpoint for this path so quantization actually runs.
Useful? React with 👍 / 👎.
| else: | ||
| resuming_from_checkpoint = True | ||
| logger.info( | ||
| "Checkpoint found at %s, skipping source index validation " |
There was a problem hiding this comment.
Validate source absence before resuming incomplete checkpoint
Any non-completed checkpoint currently forces resuming_from_checkpoint = True without checking whether the source index is still present. If a stale in-progress checkpoint is reused after restoring the source index (or on a rerun), the executor skips normal source validation/drop and resumes partial quantization state, then can fail on target_index.create() while leaving vectors partially rewritten. Resume should only be enabled after confirming the source index is actually gone (or via an explicit override).
Useful? React with 👍 / 👎.


Summary
Migration executor that applies plans via drop/recreate with prefix rename, field operations, and vector datatype re-encoding. Reliability module provides crash-safe checkpointing, idempotent dtype detection, and resumable quantization for large indexes. Includes BGSAVE safety snapshots before mutations.
Files
redisvl/migration/executor.py- sync migration executorredisvl/migration/reliability.py- checkpointing, idempotent dtype detection, resume logicStack
Note
High Risk
High risk because it introduces a new migration execution path that mutates live Redis data (dropping/recreating indexes, renaming keys/fields, and rewriting vector payloads) with checkpoint/resume logic that must be correct to avoid data loss or partial migrations.
Overview
Adds a new synchronous
MigrationExecutorthat can applydrop_recreatemigration plans end-to-end: enumerating documents (viaFT.AGGREGATEcursor with SCAN fallback), taking an optional BGSAVE safety snapshot, performing hash/JSON field renames, dropping the index definition, optionally renaming key prefixes with collision detection, recreating the index, waiting for reindexing, and running post-migration validation/benchmark reporting.Introduces
migration/reliability.pyto make vector datatype migrations more robust, including idempotent “already quantized” detection, per-batch rollback buffering, and YAML checkpointing to resume long-running quantization after crashes (with explicit guardrails for same-width dtype conversions). Adds warnings/logging around SCAN pattern fallbacks, exportsMigrationExecutorfrommigration/__init__.py, and adds large integration suites covering comprehensive migration operations and route-level scenarios.Written by Cursor Bugbot for commit 42aa7bb. This will update automatically on new commits. Configure here.