Skip to content

Commit 9b73698

Browse files
geruhchinmay-bhat
andauthored
feat: Add support for rolling back to timestamp (#2879)
# Rationale for this change This PR adds the ability to rollback a table to a ancestoral snapshot given a timestamp. Some of this work was also done in #758, and is a progress pr to be merged after #2871 & #2878. This is standalone from the other changes but it makes use of the helpers in the other prs. Additionally, adding some more tests. ## Are these changes tested? Yes ## Are there any user-facing changes? New API for meta --------- Co-authored-by: Chinmay Bhat <12948588+chinmay-bhat@users.noreply.github.com>
1 parent fe0a237 commit 9b73698

File tree

4 files changed

+182
-0
lines changed

4 files changed

+182
-0
lines changed

pyiceberg/table/snapshots.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -472,3 +472,24 @@ def ancestors_between(from_snapshot: Snapshot | None, to_snapshot: Snapshot, tab
472472
break
473473
else:
474474
yield from ancestors_of(to_snapshot, table_metadata)
475+
476+
477+
def latest_ancestor_before_timestamp(table_metadata: TableMetadata, timestamp_ms: int) -> Snapshot | None:
478+
"""Find the latest ancestor snapshot whose timestamp is before the provided timestamp.
479+
480+
Args:
481+
table_metadata: The table metadata for a table
482+
timestamp_ms: lookup snapshots strictly before this timestamp
483+
484+
Returns:
485+
The latest ancestor snapshot older than the timestamp, or None if not found.
486+
"""
487+
result: Snapshot | None = None
488+
result_timestamp: int = 0
489+
490+
for ancestor in ancestors_of(table_metadata.current_snapshot(), table_metadata):
491+
if timestamp_ms > ancestor.timestamp_ms > result_timestamp:
492+
result = ancestor
493+
result_timestamp = ancestor.timestamp_ms
494+
495+
return result

pyiceberg/table/update/snapshot.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
SnapshotSummaryCollector,
6666
Summary,
6767
ancestors_of,
68+
latest_ancestor_before_timestamp,
6869
update_snapshot_summaries,
6970
)
7071
from pyiceberg.table.update import (
@@ -1008,6 +1009,26 @@ def rollback_to_snapshot(self, snapshot_id: int) -> ManageSnapshots:
10081009

10091010
return self.set_current_snapshot(snapshot_id=snapshot_id)
10101011

1012+
def rollback_to_timestamp(self, timestamp_ms: int) -> ManageSnapshots:
1013+
"""Rollback the table to the latest snapshot before the given timestamp.
1014+
1015+
Finds the latest ancestor snapshot whose timestamp is before the given timestamp and rolls back to it.
1016+
1017+
Args:
1018+
timestamp_ms: Rollback to the latest snapshot before this timestamp in milliseconds.
1019+
1020+
Returns:
1021+
This for method chaining
1022+
1023+
Raises:
1024+
ValueError: If no valid snapshot exists older than the given timestamp.
1025+
"""
1026+
snapshot = latest_ancestor_before_timestamp(self._transaction.table_metadata, timestamp_ms)
1027+
if snapshot is None:
1028+
raise ValueError(f"Cannot roll back, no valid snapshot older than: {timestamp_ms}")
1029+
1030+
return self.set_current_snapshot(snapshot_id=snapshot.snapshot_id)
1031+
10111032
def _is_current_ancestor(self, snapshot_id: int) -> bool:
10121033
return snapshot_id in self._current_ancestors()
10131034

tests/integration/test_snapshot_operations.py

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -268,3 +268,66 @@ def test_rollback_to_snapshot_unknown_id(table_with_snapshots: Table) -> None:
268268

269269
with pytest.raises(ValueError, match="Cannot roll back to unknown snapshot id"):
270270
table_with_snapshots.manage_snapshots().rollback_to_snapshot(snapshot_id=invalid_snapshot_id).commit()
271+
272+
273+
@pytest.mark.integration
274+
def test_rollback_to_timestamp_no_valid_snapshot(table_with_snapshots: Table) -> None:
275+
history = table_with_snapshots.history()
276+
assert len(history) >= 1
277+
278+
oldest_timestamp = history[0].timestamp_ms
279+
280+
with pytest.raises(ValueError, match="Cannot roll back, no valid snapshot older than"):
281+
table_with_snapshots.manage_snapshots().rollback_to_timestamp(timestamp_ms=oldest_timestamp).commit()
282+
283+
284+
@pytest.mark.integration
285+
def test_rollback_to_timestamp(table_with_snapshots: Table) -> None:
286+
current_snapshot = table_with_snapshots.current_snapshot()
287+
assert current_snapshot is not None
288+
assert current_snapshot.parent_snapshot_id is not None
289+
290+
parent_snapshot_id = current_snapshot.parent_snapshot_id
291+
292+
table_with_snapshots.manage_snapshots().rollback_to_timestamp(timestamp_ms=current_snapshot.timestamp_ms).commit()
293+
294+
updated_snapshot = table_with_snapshots.current_snapshot()
295+
assert updated_snapshot is not None
296+
assert updated_snapshot.snapshot_id == parent_snapshot_id
297+
298+
299+
@pytest.mark.integration
300+
def test_rollback_to_timestamp_current_snapshot(table_with_snapshots: Table) -> None:
301+
current_snapshot = table_with_snapshots.current_snapshot()
302+
assert current_snapshot is not None
303+
304+
timestamp_after_current = current_snapshot.timestamp_ms + 100
305+
table_with_snapshots.manage_snapshots().rollback_to_timestamp(timestamp_ms=timestamp_after_current).commit()
306+
307+
updated_snapshot = table_with_snapshots.current_snapshot()
308+
assert updated_snapshot is not None
309+
assert updated_snapshot.snapshot_id == current_snapshot.snapshot_id
310+
311+
312+
@pytest.mark.integration
313+
def test_rollback_to_timestamp_chained_with_tag(table_with_snapshots: Table) -> None:
314+
current_snapshot = table_with_snapshots.current_snapshot()
315+
assert current_snapshot is not None
316+
assert current_snapshot.parent_snapshot_id is not None
317+
318+
parent_snapshot_id = current_snapshot.parent_snapshot_id
319+
tag_name = "my-tag"
320+
321+
(
322+
table_with_snapshots.manage_snapshots()
323+
.create_tag(snapshot_id=current_snapshot.snapshot_id, tag_name=tag_name)
324+
.rollback_to_timestamp(timestamp_ms=current_snapshot.timestamp_ms)
325+
.commit()
326+
)
327+
328+
updated_snapshot = table_with_snapshots.current_snapshot()
329+
assert updated_snapshot is not None
330+
assert updated_snapshot.snapshot_id == parent_snapshot_id
331+
assert table_with_snapshots.metadata.refs[tag_name] == SnapshotRef(
332+
snapshot_id=current_snapshot.snapshot_id, snapshot_ref_type="tag"
333+
)

tests/table/test_snapshots.py

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
Summary,
3131
ancestors_between,
3232
ancestors_of,
33+
latest_ancestor_before_timestamp,
3334
update_snapshot_summaries,
3435
)
3536
from pyiceberg.transforms import IdentityTransform
@@ -456,3 +457,79 @@ def test_ancestors_between(table_v2_with_extensive_snapshots: Table) -> None:
456457
)
457458
== 2000
458459
)
460+
461+
462+
def test_latest_ancestor_before_timestamp() -> None:
463+
from pyiceberg.table.metadata import TableMetadataV2
464+
465+
# Create metadata with 4 snapshots at ordered timestamps
466+
metadata = TableMetadataV2(
467+
**{
468+
"format-version": 2,
469+
"table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
470+
"location": "s3://bucket/test/location",
471+
"last-sequence-number": 4,
472+
"last-updated-ms": 1602638573590,
473+
"last-column-id": 1,
474+
"current-schema-id": 0,
475+
"schemas": [{"type": "struct", "schema-id": 0, "fields": [{"id": 1, "name": "x", "required": True, "type": "long"}]}],
476+
"default-spec-id": 0,
477+
"partition-specs": [{"spec-id": 0, "fields": []}],
478+
"last-partition-id": 999,
479+
"default-sort-order-id": 0,
480+
"sort-orders": [{"order-id": 0, "fields": []}],
481+
"current-snapshot-id": 4,
482+
"snapshots": [
483+
{
484+
"snapshot-id": 1,
485+
"timestamp-ms": 1000,
486+
"sequence-number": 1,
487+
"summary": {"operation": "append"},
488+
"manifest-list": "s3://a/1.avro",
489+
},
490+
{
491+
"snapshot-id": 2,
492+
"parent-snapshot-id": 1,
493+
"timestamp-ms": 2000,
494+
"sequence-number": 2,
495+
"summary": {"operation": "append"},
496+
"manifest-list": "s3://a/2.avro",
497+
},
498+
{
499+
"snapshot-id": 3,
500+
"parent-snapshot-id": 2,
501+
"timestamp-ms": 3000,
502+
"sequence-number": 3,
503+
"summary": {"operation": "append"},
504+
"manifest-list": "s3://a/3.avro",
505+
},
506+
{
507+
"snapshot-id": 4,
508+
"parent-snapshot-id": 3,
509+
"timestamp-ms": 4000,
510+
"sequence-number": 4,
511+
"summary": {"operation": "append"},
512+
"manifest-list": "s3://a/4.avro",
513+
},
514+
],
515+
}
516+
)
517+
518+
result = latest_ancestor_before_timestamp(metadata, 3500)
519+
assert result is not None
520+
assert result.snapshot_id == 3
521+
522+
result = latest_ancestor_before_timestamp(metadata, 2500)
523+
assert result is not None
524+
assert result.snapshot_id == 2
525+
526+
result = latest_ancestor_before_timestamp(metadata, 5000)
527+
assert result is not None
528+
assert result.snapshot_id == 4
529+
530+
result = latest_ancestor_before_timestamp(metadata, 3000)
531+
assert result is not None
532+
assert result.snapshot_id == 2
533+
534+
result = latest_ancestor_before_timestamp(metadata, 1000)
535+
assert result is None

0 commit comments

Comments
 (0)