-
Notifications
You must be signed in to change notification settings - Fork 126
[WIP] withdraw: persist tx before state transitions #1105
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -238,20 +238,11 @@ func (m *Manager) recoverWithdrawals(ctx context.Context) error { | |
| return err | ||
| } | ||
|
|
||
| // Group the deposits by their finalized withdrawal transaction. | ||
| depositsByWithdrawalTx := make(map[chainhash.Hash][]*deposit.Deposit) | ||
| hash2tx := make(map[chainhash.Hash]*wire.MsgTx) | ||
| for _, d := range withdrawingDeposits { | ||
| withdrawalTx := d.FinalizedWithdrawalTx | ||
| if withdrawalTx == nil { | ||
| continue | ||
| } | ||
| txid := withdrawalTx.TxHash() | ||
| hash2tx[txid] = withdrawalTx | ||
|
|
||
| depositsByWithdrawalTx[txid] = append( | ||
| depositsByWithdrawalTx[txid], d, | ||
| ) | ||
| depositsByWithdrawalTx, hash2tx, err := m.groupWithdrawingDepositsByTx( | ||
| ctx, withdrawingDeposits, | ||
| ) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| // Publishing a transaction can take a while in neutrino mode, so | ||
|
|
@@ -303,6 +294,98 @@ func (m *Manager) recoverWithdrawals(ctx context.Context) error { | |
| return nil | ||
| } | ||
|
|
||
| // groupWithdrawingDepositsByTx clusters withdrawing deposits by their finalized | ||
| // withdrawal transaction hash. | ||
| func (m *Manager) groupWithdrawingDepositsByTx(ctx context.Context, | ||
| withdrawingDeposits []*deposit.Deposit) ( | ||
| map[chainhash.Hash][]*deposit.Deposit, map[chainhash.Hash]*wire.MsgTx, | ||
| error) { | ||
|
|
||
| depositsByWithdrawalTx := make(map[chainhash.Hash][]*deposit.Deposit) | ||
| hash2tx := make(map[chainhash.Hash]*wire.MsgTx) | ||
|
|
||
| // Build an index of all known finalized withdrawal transactions. | ||
| for _, d := range withdrawingDeposits { | ||
| if d.FinalizedWithdrawalTx == nil { | ||
| continue | ||
| } | ||
|
|
||
| txid := d.FinalizedWithdrawalTx.TxHash() | ||
| hash2tx[txid] = d.FinalizedWithdrawalTx | ||
| } | ||
|
|
||
| // If exactly one tx hash is present, we can recover missing tx pointers | ||
| // from that single cluster. | ||
| var fallbackTx *wire.MsgTx | ||
| if len(hash2tx) == 1 { | ||
| for _, tx := range hash2tx { | ||
| fallbackTx = tx | ||
| } | ||
| } | ||
|
|
||
| for _, d := range withdrawingDeposits { | ||
| withdrawalTx := d.FinalizedWithdrawalTx | ||
| if withdrawalTx == nil { | ||
| if fallbackTx == nil { | ||
| log.Warnf("Skipping withdrawing deposit %v "+ | ||
| "during recovery: missing finalized "+ | ||
| "withdrawal tx", d.OutPoint) | ||
|
|
||
| continue | ||
| } | ||
|
|
||
| // Persist the recovered tx pointer so future restarts | ||
| // don't depend on in-memory fallback recovery. | ||
| d.Lock() | ||
| d.FinalizedWithdrawalTx = fallbackTx | ||
| d.Unlock() | ||
|
|
||
| err := m.cfg.DepositManager.UpdateDeposit(ctx, d) | ||
| if err != nil { | ||
| return nil, nil, fmt.Errorf("unable to "+ | ||
| "persist recovered finalized "+ | ||
| "withdrawal tx for deposit %v: %w", | ||
| d.OutPoint, err) | ||
| } | ||
|
|
||
| log.Warnf("Recovered missing finalized withdrawal tx "+ | ||
| "for deposit %v", d.OutPoint) | ||
|
|
||
| withdrawalTx = fallbackTx | ||
| } | ||
|
|
||
| txid := withdrawalTx.TxHash() | ||
| hash2tx[txid] = withdrawalTx | ||
| depositsByWithdrawalTx[txid] = append( | ||
| depositsByWithdrawalTx[txid], d, | ||
| ) | ||
| } | ||
|
|
||
| return depositsByWithdrawalTx, hash2tx, nil | ||
| } | ||
|
|
||
| // persistFinalizedWithdrawalTx updates the selected deposits with the finalized | ||
| // withdrawal tx and persists the change before state transitions. | ||
| func (m *Manager) persistFinalizedWithdrawalTx(ctx context.Context, | ||
| deposits []*deposit.Deposit, finalizedTx *wire.MsgTx) error { | ||
|
|
||
| for _, d := range deposits { | ||
| d.Lock() | ||
| d.FinalizedWithdrawalTx = finalizedTx | ||
| d.Unlock() | ||
| } | ||
|
|
||
| for _, d := range deposits { | ||
| err := m.cfg.DepositManager.UpdateDeposit(ctx, d) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to update deposit %v: %w", | ||
| d.OutPoint, err) | ||
| } | ||
| } | ||
|
Comment on lines
+372
to
+384
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The function for _, d := range deposits {
d.Lock()
d.FinalizedWithdrawalTx = finalizedTx
d.Unlock()
err := m.cfg.DepositManager.UpdateDeposit(ctx, d)
if err != nil {
return fmt.Errorf("failed to update deposit %v: %w",
d.OutPoint, err)
}
} |
||
|
|
||
| return nil | ||
| } | ||
|
|
||
| // WithdrawDeposits starts a deposits withdrawal flow. If the amount is set to 0 | ||
| // the full amount of the selected deposits will be withdrawn. | ||
| func (m *Manager) WithdrawDeposits(ctx context.Context, | ||
|
|
@@ -478,14 +561,11 @@ func (m *Manager) WithdrawDeposits(ctx context.Context, | |
| m.mu.Unlock() | ||
| } | ||
|
|
||
| // Attach the finalized withdrawal tx to the deposits. After a client | ||
| // restart we can use this address as an indicator to republish the | ||
| // withdrawal tx and continue the withdrawal. | ||
| // Deposits with the same withdrawal tx are part of the same withdrawal. | ||
| for _, d := range deposits { | ||
| d.Lock() | ||
| d.FinalizedWithdrawalTx = finalizedTx | ||
| d.Unlock() | ||
| // Persist the finalized withdrawal tx before state transitions so that | ||
| // a restart can recover the full withdrawal cluster. | ||
| err = m.persistFinalizedWithdrawalTx(ctx, deposits, finalizedTx) | ||
| if err != nil { | ||
| return "", "", err | ||
| } | ||
|
|
||
| // Add the new withdrawal tx to the finalized withdrawals to republish | ||
|
|
@@ -504,15 +584,6 @@ func (m *Manager) WithdrawDeposits(ctx context.Context, | |
| err) | ||
| } | ||
|
|
||
| // Update the deposits in the database. | ||
| for _, d := range deposits { | ||
| err = m.cfg.DepositManager.UpdateDeposit(ctx, d) | ||
| if err != nil { | ||
| return "", "", fmt.Errorf("failed to update "+ | ||
| "deposit %w", err) | ||
| } | ||
| } | ||
|
|
||
| return finalizedTx.TxID(), withdrawalAddress.String(), nil | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current implementation iterates over
withdrawingDepositstwice. This can be made more efficient and arguably clearer by partitioning the deposits into those with and without a finalized transaction in a single pass. This avoids redundant work and simplifies the logic.