-
Notifications
You must be signed in to change notification settings - Fork 1k
PHOENIX-7751 : [SyncTable Tool] Feature to validate table data using PhoenixSyncTable tool b/w source and target cluster #2379
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
3c54c86
c97f7e0
53e9a3b
6b75fec
6f40ab4
7328f93
fd46404
58ef6a9
6f226f6
1ccf4b6
e75c6c1
a5060ab
cffd2e6
2ef30e6
dd18dae
326e792
b7127cc
f588291
f81aa56
d60104f
359f345
1bcd693
7904c50
b9dfd3c
6c50f95
b8c00e4
d54f970
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -199,4 +199,23 @@ public static long getMaxLookbackInMillis(Configuration conf) { | |
|
|
||
| /** Exposed for testing */ | ||
| public static final String SCANNER_OPENED_TRACE_INFO = "Scanner opened on server"; | ||
|
|
||
| /** | ||
| * PhoenixSyncTableTool scan attributes for server-side chunk formation and checksum | ||
| */ | ||
| public static final String SYNC_TABLE_CHUNK_FORMATION = "_SyncTableChunkFormation"; | ||
| public static final String SYNC_TABLE_CHUNK_SIZE_BYTES = "_SyncTableChunkSizeBytes"; | ||
| public static final String SYNC_TABLE_CONTINUED_DIGEST_STATE = "_SyncTableContinuedDigestState"; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you add JavaDoc on all 3 constants individually with a description of what they the attribute is and what type of value it would contain? |
||
|
|
||
| /** | ||
| * PhoenixSyncTableTool chunk metadata cell qualifiers. These define the wire protocol between | ||
| * PhoenixSyncTableRegionScanner (server-side coprocessor) and PhoenixSyncTableMapper (client-side | ||
| * mapper). The coprocessor returns chunk metadata as HBase cells with these qualifiers, and the | ||
| * mapper parses them to extract chunk information. | ||
| */ | ||
| public static final byte[] SYNC_TABLE_START_KEY_QUALIFIER = Bytes.toBytes("START_KEY"); | ||
| public static final byte[] SYNC_TABLE_HASH_QUALIFIER = Bytes.toBytes("HASH"); | ||
| public static final byte[] SYNC_TABLE_ROW_COUNT_QUALIFIER = Bytes.toBytes("ROW_COUNT"); | ||
| public static final byte[] SYNC_TABLE_IS_PARTIAL_CHUNK_QUALIFIER = | ||
| Bytes.toBytes("IS_PARTIAL_CHUNK"); | ||
| } | ||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,100 @@ | ||||||||||||||||||||||||||||||||||||||||||||
| /* | ||||||||||||||||||||||||||||||||||||||||||||
| * Licensed to the Apache Software Foundation (ASF) under one | ||||||||||||||||||||||||||||||||||||||||||||
| * or more contributor license agreements. See the NOTICE file | ||||||||||||||||||||||||||||||||||||||||||||
| * distributed with this work for additional information | ||||||||||||||||||||||||||||||||||||||||||||
| * regarding copyright ownership. The ASF licenses this file | ||||||||||||||||||||||||||||||||||||||||||||
| * to you under the Apache License, Version 2.0 (the | ||||||||||||||||||||||||||||||||||||||||||||
| * "License"); you may not use this file except in compliance | ||||||||||||||||||||||||||||||||||||||||||||
| * with the License. You may obtain a copy of the License at | ||||||||||||||||||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||||||||||||||||||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||||||||||||||||||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||||||||||||||||||
| * Unless required by applicable law or agreed to in writing, software | ||||||||||||||||||||||||||||||||||||||||||||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||||||||||||||||||||||||||||||||||||||||||||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||||||||||||||||||||||||||||||||||||||||
| * See the License for the specific language governing permissions and | ||||||||||||||||||||||||||||||||||||||||||||
| * limitations under the License. | ||||||||||||||||||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||||||||||||||||||
| package org.apache.phoenix.util; | ||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||
| import java.io.ByteArrayInputStream; | ||||||||||||||||||||||||||||||||||||||||||||
| import java.io.DataInputStream; | ||||||||||||||||||||||||||||||||||||||||||||
| import java.io.IOException; | ||||||||||||||||||||||||||||||||||||||||||||
| import java.nio.ByteBuffer; | ||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.hadoop.hbase.util.Bytes; | ||||||||||||||||||||||||||||||||||||||||||||
| import org.bouncycastle.crypto.digests.SHA256Digest; | ||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||
| /** | ||||||||||||||||||||||||||||||||||||||||||||
| * Utility class for SHA-256 digest state serialization and deserialization. We are not using jdk | ||||||||||||||||||||||||||||||||||||||||||||
| * bundled SHA, since their digest can't be serialized/deserialized which is needed for | ||||||||||||||||||||||||||||||||||||||||||||
| * PhoenixSyncTableTool for cross-region hash continuation. | ||||||||||||||||||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||||||||||||||||||
| public class SHA256DigestUtil { | ||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||
| /** | ||||||||||||||||||||||||||||||||||||||||||||
| * Maximum allowed size for encoded SHA-256 digest state. SHA-256 state is ~96 bytes, we allow up | ||||||||||||||||||||||||||||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you point me to the documentation on the size being ~96 bytes?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I didn't verify this, but DeepWiki says it can be up to 309 bytes: https://deepwiki.com/search/is-there-an-upper-limit-to-the_7872e61f-4f3f-462e-b4e9-cb6cbed47bd8?mode=fast
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||||||||||||||||||||||||||||||||||||||||||||
| * to 128 bytes as buffer. | ||||||||||||||||||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||||||||||||||||||
| public static final int MAX_SHA256_DIGEST_STATE_SIZE = 128; | ||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||
| /** | ||||||||||||||||||||||||||||||||||||||||||||
| * Encodes a SHA256Digest state to a byte array with length prefix for validation. Format: [4-byte | ||||||||||||||||||||||||||||||||||||||||||||
| * integer length][encoded digest state bytes] | ||||||||||||||||||||||||||||||||||||||||||||
| * @param digest The digest whose state should be encoded | ||||||||||||||||||||||||||||||||||||||||||||
| * @return Byte array containing integer length prefix + encoded state | ||||||||||||||||||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||||||||||||||||||
| public static byte[] encodeDigestState(SHA256Digest digest) { | ||||||||||||||||||||||||||||||||||||||||||||
| byte[] encoded = digest.getEncodedState(); | ||||||||||||||||||||||||||||||||||||||||||||
| ByteBuffer buffer = ByteBuffer.allocate(Bytes.SIZEOF_INT + encoded.length); | ||||||||||||||||||||||||||||||||||||||||||||
| buffer.putInt(encoded.length); | ||||||||||||||||||||||||||||||||||||||||||||
| buffer.put(encoded); | ||||||||||||||||||||||||||||||||||||||||||||
| return buffer.array(); | ||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+46
to
+52
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since MAX_SHA256_DIGEST_STATE_SIZE is capped at 128 bytes , using a 4-byte integer and ByteBuffer for the length prefix is slightly over-engineered. We can optimize this by using a single byte for the length and Bytes.add() for concatenation. This would allow us to remove the ByteBuffer, ByteArrayInputStream, and DataInputStream dependencies in these utility methods.
Suggested change
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. BTW, can you tell me why we need to encode the length into it? You are using
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
This is written as generic Util and not limited to SyncTable usage. |
||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||
| /** | ||||||||||||||||||||||||||||||||||||||||||||
| * Decodes a SHA256Digest state from a byte array. | ||||||||||||||||||||||||||||||||||||||||||||
| * @param encodedState Byte array containing 4-byte integer length prefix + encoded state | ||||||||||||||||||||||||||||||||||||||||||||
| * @return SHA256Digest restored to the saved state | ||||||||||||||||||||||||||||||||||||||||||||
| * @throws IOException if state is invalid, corrupted | ||||||||||||||||||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||||||||||||||||||
| public static SHA256Digest decodeDigestState(byte[] encodedState) throws IOException { | ||||||||||||||||||||||||||||||||||||||||||||
| if (encodedState == null) { | ||||||||||||||||||||||||||||||||||||||||||||
| throw new IllegalArgumentException("Invalid encoded digest state: encodedState is null"); | ||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||
| DataInputStream dis = new DataInputStream(new ByteArrayInputStream(encodedState)); | ||||||||||||||||||||||||||||||||||||||||||||
| int stateLength = dis.readInt(); | ||||||||||||||||||||||||||||||||||||||||||||
| // Prevent malicious large allocations | ||||||||||||||||||||||||||||||||||||||||||||
| if (stateLength > MAX_SHA256_DIGEST_STATE_SIZE) { | ||||||||||||||||||||||||||||||||||||||||||||
| throw new IllegalArgumentException( | ||||||||||||||||||||||||||||||||||||||||||||
| String.format("Invalid SHA256 state length: %d, expected <= %d", stateLength, | ||||||||||||||||||||||||||||||||||||||||||||
| MAX_SHA256_DIGEST_STATE_SIZE)); | ||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||
| byte[] state = new byte[stateLength]; | ||||||||||||||||||||||||||||||||||||||||||||
| dis.readFully(state); | ||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+65
to
+75
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Following my suggestion in encode, this will simply become:
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||
| return new SHA256Digest(state); | ||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||
| /** | ||||||||||||||||||||||||||||||||||||||||||||
| * Decodes a digest state and finalizes it to produce the SHA-256 checksum. | ||||||||||||||||||||||||||||||||||||||||||||
| * @param encodedState Serialized digest state (format: [4-byte length][state bytes]) | ||||||||||||||||||||||||||||||||||||||||||||
| * @return 32-byte SHA-256 hash | ||||||||||||||||||||||||||||||||||||||||||||
| * @throws IOException if state decoding fails | ||||||||||||||||||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||||||||||||||||||
| public static byte[] finalizeDigestToChecksum(byte[] encodedState) throws IOException { | ||||||||||||||||||||||||||||||||||||||||||||
| SHA256Digest digest = decodeDigestState(encodedState); | ||||||||||||||||||||||||||||||||||||||||||||
| return finalizeDigestToChecksum(digest); | ||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||
| /** | ||||||||||||||||||||||||||||||||||||||||||||
| * Finalizes a SHA256Digest to produce the final checksum. | ||||||||||||||||||||||||||||||||||||||||||||
| * @param digest The digest to finalize | ||||||||||||||||||||||||||||||||||||||||||||
| * @return 32-byte SHA-256 hash | ||||||||||||||||||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||||||||||||||||||
| public static byte[] finalizeDigestToChecksum(SHA256Digest digest) { | ||||||||||||||||||||||||||||||||||||||||||||
| byte[] hash = new byte[digest.getDigestSize()]; | ||||||||||||||||||||||||||||||||||||||||||||
| digest.doFinal(hash, 0); | ||||||||||||||||||||||||||||||||||||||||||||
| return hash; | ||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1207,6 +1207,10 @@ public static boolean isIndexRebuild(Scan scan) { | |
| return scan.getAttribute((BaseScannerRegionObserverConstants.REBUILD_INDEXES)) != null; | ||
| } | ||
|
|
||
| public static boolean isSyncTableChunkFormation(Scan scan) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you mean isSyncTableChunkFormationEnabled? |
||
| return scan.getAttribute(BaseScannerRegionObserverConstants.SYNC_TABLE_CHUNK_FORMATION) != null; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see you are explicitly setting to |
||
| } | ||
|
|
||
| public static int getClientVersion(Scan scan) { | ||
| int clientVersion = UNKNOWN_CLIENT_VERSION; | ||
| byte[] clientVersionBytes = | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should all of these instead be named SYNC_TOOL ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have used SyncTableTool for user facing class/config. For others, I have used SyncTable, are you recommending to move all Classes and config to SyncTool instead of SyncTable i.e PhoenixSyncTableRegionScanner -> PhoenixSyncToolRegionScanner ?
I felt SyncTable is more self explainable compared to SyncTool, we can also change it to SyncTableTool at all places ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. Its okay. Not a big deal. We can stick with the same naming convention.