Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.StringUtils;

import org.apache.flink.shaded.guava33.com.google.common.primitives.UnsignedBytes;

Expand Down Expand Up @@ -369,10 +370,32 @@ private byte[] serializeElement(@Nonnull E element) {
private E deserializeElement(@Nonnull byte[] bytes) {
try {
final int numPrefixBytes = groupPrefixBytes.length;
inputView.setBuffer(bytes, numPrefixBytes, bytes.length - numPrefixBytes);
final int payloadLength = bytes.length - numPrefixBytes;
if (payloadLength <= 0) {
throw new IOException(
String.format(
"No payload bytes to deserialize: total bytes=%d, prefix bytes=%d",
bytes.length, numPrefixBytes));
}
inputView.setBuffer(bytes, numPrefixBytes, payloadLength);
return byteOrderProducingSerializer.deserialize(inputView);
} catch (IOException e) {
throw new FlinkRuntimeException("Error while deserializing the element.", e);
final int numPrefixBytes = groupPrefixBytes.length;
final int payloadLength = bytes.length - numPrefixBytes;
final String hexDump =
StringUtils.byteToHexString(bytes, 0, Math.min(bytes.length, 128));
throw new FlinkRuntimeException(
String.format(
"Error while deserializing the element. "
+ "Total bytes=%d, prefix bytes=%d, payload bytes=%d, "
+ "hex dump (first 128 bytes)=[%s]. "
+ "This may indicate that data from a different state type "
+ "was incorrectly written to this column family during restore.",
bytes.length,
numPrefixBytes,
payloadLength,
hexDump),
e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

/** Encapsulates the process of restoring a RocksDB instance from a full snapshot. */
public class RocksDBFullRestoreOperation<K> implements RocksDBRestoreOperation {
Expand Down Expand Up @@ -132,7 +133,7 @@ private void applyRestoreResult(SavepointRestoreResult savepointRestoreResult)
}

try (ThrowingIterator<KeyGroup> keyGroups = savepointRestoreResult.getRestoredKeyGroups()) {
restoreKVStateData(keyGroups, columnFamilyHandles);
restoreKVStateData(keyGroups, columnFamilyHandles, restoredMetaInfos);
}
}

Expand All @@ -141,7 +142,9 @@ private void applyRestoreResult(SavepointRestoreResult savepointRestoreResult)
* handle.
*/
private void restoreKVStateData(
ThrowingIterator<KeyGroup> keyGroups, Map<Integer, ColumnFamilyHandle> columnFamilies)
ThrowingIterator<KeyGroup> keyGroups,
Map<Integer, ColumnFamilyHandle> columnFamilies,
List<StateMetaInfoSnapshot> restoredMetaInfos)
throws IOException, RocksDBException, StateMigrationException {
// for all key-groups in the current state handle...
try (RocksDBWriteBatchWrapper writeBatchWrapper =
Expand All @@ -160,6 +163,29 @@ private void restoreKVStateData(
if (kvStateId != oldKvStateId) {
oldKvStateId = kvStateId;
handle = columnFamilies.get(kvStateId);
if (handle == null) {
String registeredStates =
restoredMetaInfos.stream()
.map(
meta ->
meta.getName()
+ "("
+ meta
.getBackendStateType()
+ ")")
.collect(Collectors.joining(", "));
throw new IllegalStateException(
String.format(
"Could not find column family handle for state id %d "
+ "during full snapshot restore. Total registered "
+ "states: %d [%s]. This indicates data corruption "
+ "in the checkpoint stream -- a kvStateId references "
+ "a state that was not registered in the snapshot "
+ "metadata.",
kvStateId,
restoredMetaInfos.size(),
registeredStates));
}
}
writeBatchWrapper.put(handle, groupEntry.getKey(), groupEntry.getValue());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

/** Encapsulates the process of restoring a RocksDB instance from a full snapshot. */
public class RocksDBHeapTimersFullRestoreOperation<K> implements RocksDBRestoreOperation {
Expand Down Expand Up @@ -175,7 +176,7 @@ private void applyRestoreResult(SavepointRestoreResult savepointRestoreResult)
}

try (ThrowingIterator<KeyGroup> keyGroups = savepointRestoreResult.getRestoredKeyGroups()) {
restoreKVStateData(keyGroups, columnFamilyHandles, restoredPQStates);
restoreKVStateData(keyGroups, columnFamilyHandles, restoredPQStates, restoredMetaInfos);
}
}

Expand All @@ -186,7 +187,8 @@ private void applyRestoreResult(SavepointRestoreResult savepointRestoreResult)
private void restoreKVStateData(
ThrowingIterator<KeyGroup> keyGroups,
Map<Integer, ColumnFamilyHandle> columnFamilies,
Map<Integer, HeapPriorityQueueSnapshotRestoreWrapper<?>> restoredPQStates)
Map<Integer, HeapPriorityQueueSnapshotRestoreWrapper<?>> restoredPQStates,
List<StateMetaInfoSnapshot> restoredMetaInfos)
throws IOException, RocksDBException, StateMigrationException {
// for all key-groups in the current state handle...
try (RocksDBWriteBatchWrapper writeBatchWrapper =
Expand Down Expand Up @@ -214,7 +216,26 @@ private void restoreKVStateData(
writeBatchWrapper.put(
handle, groupEntry.getKey(), groupEntry.getValue());
} else {
throw new IllegalStateException("Unknown state id: " + kvStateId);
String registeredStates =
restoredMetaInfos.stream()
.map(
meta ->
meta.getName()
+ "("
+ meta.getBackendStateType()
+ ")")
.collect(Collectors.joining(", "));
throw new IllegalStateException(
String.format(
"Could not find column family handle for state id %d "
+ "during full snapshot restore. Total registered "
+ "states: %d [%s]. This indicates data corruption "
+ "in the checkpoint stream -- a kvStateId references "
+ "a state that was not registered in the snapshot "
+ "metadata.",
kvStateId,
restoredMetaInfos.size(),
registeredStates));
}
}
}
Expand Down
Loading