Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -209,68 +210,81 @@ public void testKvSnapshotLease() throws Exception {

assertThat(zkClient.getKvSnapshotLeasesList()).isEmpty();

// test register kv snapshot lease for snapshot 0.
Map<TableBucket, Long> consumeBuckets = new HashMap<>();
// test register kv snapshot lease for the first round of snapshots.
Map<TableBucket, Long> consumeBuckets1 = new HashMap<>();
KvSnapshots kvSnapshots = admin.getLatestKvSnapshots(tablePath).get();
for (int bucketId : kvSnapshots.getBucketIds()) {
TableBucket tableBucket = new TableBucket(kvSnapshots.getTableId(), bucketId);
consumeBuckets.put(tableBucket, kvSnapshots.getSnapshotId(bucketId).getAsLong());
consumeBuckets1.put(tableBucket, kvSnapshots.getSnapshotId(bucketId).getAsLong());
}

KvSnapshotLease kvSnapshotLease1 =
admin.createKvSnapshotLease(kvSnapshotLeaseId1, Duration.ofDays(1).toMillis());
kvSnapshotLease1.acquireSnapshots(consumeBuckets).get();
kvSnapshotLease1.acquireSnapshots(consumeBuckets1).get();
checkKvSnapshotLeaseEquals(
metadataManager, kvSnapshotLeaseId1, tableId, new Long[] {0L, 0L, 0L});
metadataManager,
kvSnapshotLeaseId1,
tableId,
buildExpectedBucketSnapshots(consumeBuckets1));

expectedRowByBuckets = putRows(tableId, tablePath, 10);
// wait snapshot2 finish
FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshots(expectedRowByBuckets.keySet());

// test register kv snapshot lease for snapshot 1.
consumeBuckets = new HashMap<>();
// test register kv snapshot lease for the second round of snapshots.
Map<TableBucket, Long> consumeBuckets2 = new HashMap<>();
kvSnapshots = admin.getLatestKvSnapshots(tablePath).get();
for (int bucketId : kvSnapshots.getBucketIds()) {
TableBucket tableBucket = new TableBucket(kvSnapshots.getTableId(), bucketId);
consumeBuckets.put(tableBucket, kvSnapshots.getSnapshotId(bucketId).getAsLong());
consumeBuckets2.put(tableBucket, kvSnapshots.getSnapshotId(bucketId).getAsLong());
}

KvSnapshotLease kvSnapshotLease2 =
admin.createKvSnapshotLease(kvSnapshotLeaseId2, Duration.ofDays(1).toMillis());
kvSnapshotLease2.acquireSnapshots(consumeBuckets).get();
kvSnapshotLease2.acquireSnapshots(consumeBuckets2).get();
checkKvSnapshotLeaseEquals(
metadataManager, kvSnapshotLeaseId2, tableId, new Long[] {1L, 1L, 1L});
// check even snapshot1 is generated, snapshot0 also retained as lease exists.
metadataManager,
kvSnapshotLeaseId2,
tableId,
buildExpectedBucketSnapshots(consumeBuckets2));
// check even the second round snapshot is generated, the first round snapshot is also
// retained as lease exists.
for (TableBucket tb : expectedRowByBuckets.keySet()) {
assertThat(zkClient.getTableBucketSnapshot(tb, 0L).isPresent()).isTrue();
assertThat(zkClient.getTableBucketSnapshot(tb, 1L).isPresent()).isTrue();
long firstSnapshotId = consumeBuckets1.get(tb);
long secondSnapshotId = consumeBuckets2.get(tb);
assertThat(zkClient.getTableBucketSnapshot(tb, firstSnapshotId).isPresent()).isTrue();
assertThat(zkClient.getTableBucketSnapshot(tb, secondSnapshotId).isPresent()).isTrue();
}

expectedRowByBuckets = putRows(tableId, tablePath, 10);
// wait snapshot3 finish
FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshots(expectedRowByBuckets.keySet());

// release lease1.
// release lease1 for bucket 0.
kvSnapshotLease1.releaseSnapshots(Collections.singleton(new TableBucket(tableId, 0))).get();
Long[] expectedAfterReleaseBucket0 = buildExpectedBucketSnapshots(consumeBuckets1);
expectedAfterReleaseBucket0[0] = -1L;
checkKvSnapshotLeaseEquals(
metadataManager, kvSnapshotLeaseId1, tableId, new Long[] {-1L, 0L, 0L});
metadataManager, kvSnapshotLeaseId1, tableId, expectedAfterReleaseBucket0);

// release lease2.
kvSnapshotLease2.releaseSnapshots(consumeBuckets.keySet()).get();
kvSnapshotLease2.releaseSnapshots(consumeBuckets2.keySet()).get();
assertThat(zkClient.getKvSnapshotLeasesList()).doesNotContain(kvSnapshotLeaseId2);

// release all kv snapshot lease of lease1
kvSnapshotLease1.dropLease().get();
assertThat(zkClient.getKvSnapshotLeasesList()).isEmpty();

expectedRowByBuckets = putRows(tableId, tablePath, 10);
// wait snapshot2 finish
// wait next snapshot finish
FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshots(expectedRowByBuckets.keySet());
// as all leases are dropped, and new snapshot is generated, all old snapshot are
// as all leases are dropped, and new snapshot is generated, all old snapshots are
// cleared.
for (TableBucket tb : expectedRowByBuckets.keySet()) {
assertThat(zkClient.getTableBucketSnapshot(tb, 0L).isPresent()).isFalse();
assertThat(zkClient.getTableBucketSnapshot(tb, 1L).isPresent()).isFalse();
long firstSnapshotId = consumeBuckets1.get(tb);
long secondSnapshotId = consumeBuckets2.get(tb);
assertThat(zkClient.getTableBucketSnapshot(tb, firstSnapshotId).isPresent()).isFalse();
assertThat(zkClient.getTableBucketSnapshot(tb, secondSnapshotId).isPresent()).isFalse();
}

// drop no exist lease, no exception.
Expand Down Expand Up @@ -355,6 +369,19 @@ private static int getBucketId(InternalRow row) {
return function.bucketing(key, DEFAULT_BUCKET_NUM);
}

/**
* Build the expected bucket snapshots array from the consumeBuckets map. The array index is the
* bucket id, and the value is the snapshot id. Buckets not present in the map are set to -1L.
*/
private Long[] buildExpectedBucketSnapshots(Map<TableBucket, Long> consumeBuckets) {
Long[] expected = new Long[DEFAULT_BUCKET_NUM];
Arrays.fill(expected, -1L);
for (Map.Entry<TableBucket, Long> entry : consumeBuckets.entrySet()) {
expected[entry.getKey().getBucket()] = entry.getValue();
}
return expected;
}

private void checkKvSnapshotLeaseEquals(
KvSnapshotLeaseMetadataManager metadataManager,
String leaseId,
Expand Down