fix(sync): capture cloud seqs and unify push/pull cursor#339
Open
alexandervazquez98 wants to merge 1 commit intoGentleman-Programming:mainfrom
Open
fix(sync): capture cloud seqs and unify push/pull cursor#339alexandervazquez98 wants to merge 1 commit intoGentleman-Programming:mainfrom
alexandervazquez98 wants to merge 1 commit intoGentleman-Programming:mainfrom
Conversation
- Add sync_seq_mapping table to store local→cloud seq correlation - Add unified_cursor column to sync_state table - Push now captures BIGSERIAL cloud seqs and stores mapping - Pull now uses GetUnifiedCursor() instead of GetSyncState().UnifiedCursor - Add regression tests for push→ack→cursor flow - Closes Gentleman-Programming#238
There was a problem hiding this comment.
Pull request overview
This PR fixes cloud sync duplication by persisting the cloud-assigned mutation sequence IDs returned from PushMutations and introducing a unified cursor intended to operate in cloud sequence space for both push and pull.
Changes:
- Add
sync_seq_mapping(local→cloud seq correlation) andsync_state.unified_cursorto support unified cursoring in cloud seq space. - Update autosync push to capture accepted cloud seqs, persist mappings, and advance the unified cursor.
- Update autosync pull to use
GetUnifiedCursor()as the pull cursor source; add regression tests around mapping/cursor flow.
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| internal/store/store.go | Adds schema/migration for sync_seq_mapping + unified_cursor, plus Store APIs to store mappings and read/advance unified cursor. |
| internal/store/seq_mapping_test.go | New tests validating mapping storage and unified cursor advancement. |
| internal/cloud/autosync/manager.go | Captures cloud seqs from push results, stores mappings, advances unified cursor; switches pull to read from unified cursor. |
| internal/cloud/autosync/manager_test.go | Extends fakes and adds tests for seq mapping storage and unified-cursor-based pull. |
| cmd/engram/autosync_e2e_test.go | Updates E2E fake store to satisfy the expanded autosync LocalStore interface. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Comment on lines
+533
to
+540
| // Store local→cloud seq mapping for every entry in the batch. | ||
| // This must be stored BEFORE AckSyncMutationSeqs so the correlation | ||
| // is persisted even if the process restarts mid-cycle. | ||
| for i, mut := range batch { | ||
| if result.AcceptedSeqs[i] > 0 { | ||
| if err := m.store.StoreSyncSeqMapping(m.cfg.TargetKey, mut.Seq, result.AcceptedSeqs[i]); err != nil { | ||
| return fmt.Errorf("store seq mapping local=%d cloud=%d: %w", mut.Seq, result.AcceptedSeqs[i], err) | ||
| } |
Comment on lines
+548
to
+552
| // Advance unified_cursor to the highest cloud_seq in all batches pushed this cycle. | ||
| // This ensures the next pull cycle starts from the correct cloud seq, not re-fetching | ||
| // mutations that were just pushed. | ||
| if err := m.store.AdvanceUnifiedCursor(m.cfg.TargetKey); err != nil { | ||
| return fmt.Errorf("advance unified cursor: %w", err) |
Comment on lines
+3555
to
+3559
| // GetUnifiedCursor returns the current unified_cursor value for a target. | ||
| func (s *Store) GetUnifiedCursor(targetKey string) (int64, error) { | ||
| targetKey = normalizeSyncTargetKey(targetKey) | ||
| var cursor int64 | ||
| err := s.db.QueryRow(`SELECT unified_cursor FROM sync_state WHERE target_key = ?`, targetKey).Scan(&cursor) |
| ReasonCode *string `json:"reason_code,omitempty"` | ||
| ReasonMessage *string `json:"reason_message,omitempty"` | ||
| LastError *string `json:"last_error,omitempty"` | ||
| UnifiedCursor int64 `json:"unified_cursor"` |
| PRIMARY KEY (target_key, local_seq) | ||
| ); | ||
| CREATE INDEX IF NOT EXISTS idx_sync_seq_mapping_cloud | ||
| ON sync_seq_mapping(target_key, cloud_seq); |
Comment on lines
+52
to
+81
| func TestStoreSyncSeqMapping_DuplicateLocalSeqAddsNewRow(t *testing.T) { | ||
| s := newTestStore(t) | ||
| if err := s.EnrollProject("proj-a"); err != nil { | ||
| t.Fatalf("enroll project: %v", err) | ||
| } | ||
|
|
||
| // First push: local seq 1 → cloud seq 101. | ||
| if err := s.StoreSyncSeqMapping(DefaultSyncTargetKey, 1, 101); err != nil { | ||
| t.Fatalf("StoreSyncSeqMapping first: %v", err) | ||
| } | ||
|
|
||
| // Second push (retry): local seq 1 → cloud seq 201 (new row, not replace). | ||
| if err := s.StoreSyncSeqMapping(DefaultSyncTargetKey, 1, 201); err != nil { | ||
| t.Fatalf("StoreSyncSeqMapping second: %v", err) | ||
| } | ||
|
|
||
| // Should have 2 rows — INSERT OR REPLACE still creates a new row when | ||
| // the primary key (target_key, local_seq) conflicts but the cloud_seq differs. | ||
| // Note: The schema uses PRIMARY KEY (target_key, local_seq) so INSERT OR REPLACE | ||
| // replaces. For duplicate local seq we need a new row. The spec says "new cloud seq | ||
| // is added as a separate mapping row" — this implies the schema should allow it. | ||
| // With PRIMARY KEY constraint, a second insert for same (target_key, local_seq) replaces. | ||
| // This test verifies the current behavior — if the intent is to allow multiple rows | ||
| // per local_seq, the schema would need a different PK (e.g., auto-increment id). | ||
| var count int | ||
| if err := s.db.QueryRow(`SELECT COUNT(*) FROM sync_seq_mapping WHERE target_key = ? AND local_seq = 1`, DefaultSyncTargetKey).Scan(&count); err != nil { | ||
| t.Fatalf("count mapping: %v", err) | ||
| } | ||
| // With PRIMARY KEY on (target_key, local_seq), INSERT OR REPLACE replaces the row. | ||
| // The last cloud_seq (201) wins. This is the current behavior. |
Comment on lines
+141
to
+145
| func (s *fakeLocalStore) StoreSyncSeqMapping(targetKey string, localSeq, cloudSeq int64) error { | ||
| s.mu.Lock() | ||
| defer s.mu.Unlock() | ||
| s.seqMappings = append(s.seqMappings, [3]int64{0, localSeq, cloudSeq}) | ||
| _ = targetKey |
|
|
||
| func (s *autosyncFakeStore) AdvanceUnifiedCursor(_ string) error { return nil } | ||
|
|
||
| func (s *autosyncFakeStore) GetUnifiedCursor(_ string) (int64, error) { return 0, nil } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Changes
Testing
Verification
Linked Issue
Closes #238