-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[feat] PIP-454: Metadata Store Migration Framework (implementation) #25219
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
| /** | ||
| * Start the migration process. | ||
| * | ||
| * @throws Exception if migration fails | ||
| */ | ||
| public void startMigration() throws Exception { | ||
| log.info("=== Starting Migration ==="); | ||
| log.info("Source: {} (current)", sourceStore.getClass().getSimpleName()); | ||
| log.info("Target: {}", targetUrl); | ||
|
|
||
| try { | ||
| // 1. Create migration flag | ||
| setInitialMigrationPhase(); | ||
|
|
||
| // 2. Wait for participants to prepare | ||
| waitForPreparation(); | ||
|
|
||
| // 3. Copy persistent data | ||
| updatePhase(MigrationPhase.COPYING); | ||
| copyPersistentData(); | ||
|
|
||
| // 4. Set state to completed | ||
| updatePhase(MigrationPhase.COMPLETED); | ||
|
|
||
| log.info("=== Migration Complete ==="); | ||
| } catch (Exception e) { | ||
| log.error("Migration failed", e); | ||
| updatePhase(MigrationPhase.FAILED); | ||
| throw e; | ||
| } | ||
| } | ||
|
|
||
| private void setInitialMigrationPhase() throws MetadataStoreException { | ||
| try { | ||
| sourceStore.put(MigrationState.MIGRATION_FLAG_PATH, | ||
| ObjectMapperFactory.getMapper().writer() | ||
| .writeValueAsBytes(new MigrationState(MigrationPhase.PREPARATION, targetUrl)), | ||
| Optional.of(-1L)).get(); | ||
| } catch (Exception e) { | ||
| throw new MetadataStoreException(e); | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a solution that prevents starting a new migration unless there's no previous migration? I guess retrying would be allowed so the previous state could be failed. Is there a way to do a CAS update to ensure that only a single node becomes the coordinator?
| private void readCurrentState() throws MetadataStoreException { | ||
| try { | ||
| // Read the current state | ||
| var initialState = migrationStateCache.get(MigrationState.MIGRATION_FLAG_PATH).get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since this is read only at startup time, I guess it could by pass the cache.
In addition, it might be necessary to read after using sync on Zookeeper to ensure that it's the most recent state.
I guess it would be a small chance that this could cause actual issues.
| readCurrentState(); | ||
| registerAsParticipant(); | ||
|
|
||
| // Watch for migration events | ||
| watchForMigrationEvents(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a chance that we lose an event between reading the current state and before the listener is registered?
Motivation
Implementation of PIP-454
Modifications
Verifying this change
(Please pick either of the following options)
This change is a trivial rework / code cleanup without any test coverage.
(or)
This change is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)
Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes
Documentation
docdoc-requireddoc-not-neededdoc-completeMatching PR in forked repository
PR in forked repository: