Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions src/commands/node_manager_start.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
LOG_PLAIN,
settings,
)
from src.node_manager.execution import create_operator_validators_scanner
from src.node_manager.execution import scan_node_manager_validators_events
from src.node_manager.startup_check import startup_checks
from src.node_manager.tasks import NodeManagerTask, StateSyncTask
from src.validators.database import (
Expand All @@ -35,6 +35,7 @@
VaultValidatorCrud,
)
from src.validators.keystores.load import load_keystore
from src.validators.tasks import load_genesis_validators

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -206,14 +207,19 @@ async def _start(
VaultValidatorCrud().setup()
CheckpointCrud().setup()

keystore = await load_keystore()
validators_scanner = create_operator_validators_scanner(operator_address)
# load network validators from ipfs dump
await load_genesis_validators()

keystore = await load_keystore()
# start operator tasks
chain_state = await get_chain_finalized_head()

logger.info('Syncing validator events...')
await validators_scanner.process_new_events(chain_state.block_number)
await scan_node_manager_validators_events(
operator_address=operator_address,
block_number=chain_state.block_number,
is_startup=True,
)

logger.info('Updating oracles cache...')
await update_oracles_cache()
Expand All @@ -227,7 +233,6 @@ async def _start(
NodeManagerTask(
operator_address=operator_address,
keystore=keystore,
validators_scanner=validators_scanner,
).run(interrupt_handler),
StateSyncTask(operator_address).run(interrupt_handler),
)
Expand Down
30 changes: 23 additions & 7 deletions src/node_manager/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
from src.config.settings import settings
from src.node_manager.typings import OperatorStateUpdateParams
from src.validators.database import CheckpointCrud, VaultValidatorCrud
from src.validators.execution import (
NetworkValidatorsProcessor,
NetworkValidatorsStartupProcessor,
)
from src.validators.typings import VaultValidator

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -49,15 +53,27 @@ async def process_events(self, events: list[EventData], to_block: BlockNumber) -

if validators:
VaultValidatorCrud().save_vault_validators(validators)
CheckpointCrud().update_validators_checkpoint(to_block)
CheckpointCrud().update_validators_checkpoint(block_number=to_block)


def create_operator_validators_scanner(
operator_address: ChecksumAddress,
) -> EventScanner:
"""Create a reusable EventScanner for NodesManager ValidatorsRegistered events."""
processor = OperatorValidatorsProcessor(operator_address)
return EventScanner(processor, argument_filters={'operator': operator_address})
async def scan_node_manager_validators_events(
operator_address: ChecksumAddress, block_number: BlockNumber, is_startup: bool
) -> None:
"""Scans new vault and network validators for the given block number."""
network_validators_processor: NetworkValidatorsStartupProcessor | NetworkValidatorsProcessor
if is_startup:
network_validators_processor = NetworkValidatorsStartupProcessor()
else:
network_validators_processor = NetworkValidatorsProcessor()

network_validators_scanner = EventScanner(network_validators_processor)
await network_validators_scanner.process_new_events(block_number)

operator_validators_processor = OperatorValidatorsProcessor(operator_address)
operator_validators_scanner = EventScanner(
operator_validators_processor, argument_filters={'operator': operator_address}
)
await operator_validators_scanner.process_new_events(block_number)


def _parse_public_keys(public_keys_bytes: bytes) -> list[HexStr]:
Expand Down
28 changes: 20 additions & 8 deletions src/node_manager/oracles.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from sw_utils.common import urljoin
from sw_utils.typings import ProtocolConfig
from web3 import Web3
from web3.types import Gwei, Wei
from web3.types import Wei

from src.common.contracts import validators_registry_contract
from src.common.exceptions import (
Expand Down Expand Up @@ -241,7 +241,7 @@ async def create_approval_request(


async def poll_funding_approval(
validator_fundings: dict[HexStr, Gwei],
validators: Sequence[Validator],
operator_address: ChecksumAddress,
protocol_config: ProtocolConfig,
) -> list[HexStr]:
Expand All @@ -259,7 +259,7 @@ async def poll_funding_approval(
if request is None or deadline is None or deadline <= current_timestamp:
deadline = current_timestamp + protocol_config.signature_validity_period
request = create_funding_request(
validator_fundings=validator_fundings,
validators=validators,
operator_address=operator_address,
deadline=deadline,
)
Expand All @@ -276,19 +276,31 @@ async def poll_funding_approval(


def create_funding_request(
validator_fundings: dict[HexStr, Gwei],
validators: Sequence[Validator],
operator_address: ChecksumAddress,
deadline: int,
) -> NodeManagerFundingRequest:
"""Build a NodesManager funding request for validator top-ups."""
return NodeManagerFundingRequest(

request = NodeManagerFundingRequest(
operator_address=operator_address,
public_keys=list(validator_fundings.keys()),
amounts=[int(amount) for amount in validator_fundings.values()],
public_keys=[],
amounts=[],
deposit_signatures=[],
deadline=deadline,
validators_manager_signature=_sign_deadline(deadline),
)

for validator in validators:
if validator.deposit_signature is None:
raise ValueError('Deposit signature is required for validator')

request.public_keys.append(validator.public_key)
request.deposit_signatures.append(validator.deposit_signature)
request.amounts.append(validator.amount)
Comment thread
cyc60 marked this conversation as resolved.

return request


# Generic oracle request helpers

Expand Down Expand Up @@ -500,5 +512,5 @@ async def _send_request(

def _sign_deadline(deadline: int) -> HexStr:
"""EIP-191 personal_sign of the deadline timestamp."""
message = encode_defunct(text=str(deadline))
message = encode_defunct(primitive=deadline.to_bytes(32, byteorder='big'))
Comment thread
cyc60 marked this conversation as resolved.
return HexStr(wallet.sign_message(message).signature.hex())
11 changes: 1 addition & 10 deletions src/node_manager/register_validators.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from sw_utils.typings import Bytes32
from web3 import Web3
from web3.exceptions import ContractLogicError
from web3.types import Gwei

from src.common.clients import execution_client
from src.common.contracts import nodes_manager_contract, validators_registry_contract
Expand Down Expand Up @@ -93,17 +92,9 @@ async def register_validators(
async def fund_validators(
operator_address: ChecksumAddress,
signatures: list[HexStr],
validator_fundings: dict[HexStr, Gwei],
validators: Sequence[Validator],
) -> HexStr | None:
"""Submit fundValidators transaction to NodesManager contract."""
validators = [
Validator(
public_key=public_key,
amount=amount,
deposit_signature=HexStr(Web3.to_hex(bytes(96))),
)
for public_key, amount in validator_fundings.items()
]
tx_validators = [
Web3.to_bytes(tx_validator)
for tx_validator in encode_tx_validator_list(validators=validators)
Expand Down
34 changes: 22 additions & 12 deletions src/node_manager/tasks.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging

from eth_typing import ChecksumAddress
from sw_utils import EventScanner, InterruptHandler
from sw_utils import InterruptHandler
from sw_utils.typings import ProtocolConfig
from web3 import Web3
from web3.types import Gwei, Wei
Expand All @@ -16,6 +16,7 @@
from src.config.settings import settings
from src.node_manager.execution import (
fetch_operator_state_from_ipfs,
scan_node_manager_validators_events,
submit_state_sync_transaction,
)
from src.node_manager.oracles import (
Expand All @@ -27,7 +28,10 @@
from src.validators.consensus import fetch_compounding_validators_balances
from src.validators.keystores.base import BaseKeystore
from src.validators.tasks import get_deposits_amounts, get_funding_amounts
from src.validators.utils import get_validators_for_registration
from src.validators.utils import (
get_validators_for_funding,
get_validators_for_registration,
)

logger = logging.getLogger(__name__)

Expand All @@ -39,15 +43,17 @@ def __init__(
self,
operator_address: ChecksumAddress,
keystore: BaseKeystore,
validators_scanner: EventScanner,
) -> None:
self.operator_address = operator_address
self.keystore = keystore
self.validators_scanner = validators_scanner

async def process_block(self, interrupt_handler: InterruptHandler) -> None:
chain_head = await get_chain_finalized_head()
await self.validators_scanner.process_new_events(chain_head.block_number)
await scan_node_manager_validators_events(
operator_address=self.operator_address,
block_number=chain_head.block_number,
is_startup=False,
)

if not await check_gas_price(high_priority=True):
logger.debug('Gas price too high, skipping validators registration')
Expand Down Expand Up @@ -143,30 +149,34 @@ async def _process_funding(
if not validator_fundings:
return amount

validators = await get_validators_for_funding(self.keystore, validator_fundings)
if not validators:
logger.warning('No available validators keystores for funding')
return amount

funded_total = Gwei(0)
batch_limit = protocol_config.validators_approval_batch_limit

# Process in batches
funding_items = list(validator_fundings.items())
for i in range(0, len(funding_items), batch_limit):
batch = dict(funding_items[i : i + batch_limit])
for i in range(0, len(validators), batch_limit):
batch = validators[i : i + batch_limit]

signatures = await poll_funding_approval(
validator_fundings=batch,
validators=batch,
operator_address=operator_address,
protocol_config=protocol_config,
)

tx_hash = await fund_validators(
operator_address=self.operator_address,
signatures=signatures,
validator_fundings=batch,
validators=batch,
)

if tx_hash:
batch_total = sum(batch.values())
batch_total = sum(v.amount for v in batch)
funded_total = Gwei(funded_total + batch_total)
pub_keys = ', '.join(batch.keys())
pub_keys = ', '.join([v.public_key for v in batch])
logger.info('Funded community vault validators %s: tx=%s', pub_keys, tx_hash)
else:
logger.warning('Community vault funding batch failed, stopping funding')
Expand Down
1 change: 1 addition & 0 deletions src/node_manager/tests/test_oracles_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ def _make_funding_request() -> NodeManagerFundingRequest:
operator_address=faker.eth_address(),
public_keys=[faker.validator_public_key()],
amounts=[ether_to_gwei(32)],
deposit_signatures=[faker.validator_signature()],
deadline=1000,
validators_manager_signature=faker.account_signature(),
)
Expand Down
22 changes: 13 additions & 9 deletions src/node_manager/tests/test_register_validators.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
from unittest.mock import AsyncMock, MagicMock, patch

import pytest
from eth_typing import ChecksumAddress, HexStr
from eth_typing import ChecksumAddress
from sw_utils.tests.factories import faker
from web3 import Web3
from web3.exceptions import ContractLogicError
from web3.types import Gwei

from src.common.tests.utils import ether_to_gwei
from src.node_manager.register_validators import fund_validators, register_validators
from src.node_manager.typings import NodeManagerRegistrationOraclesApproval
from src.validators.typings import Validator

MODULE = 'src.node_manager.register_validators'

Expand Down Expand Up @@ -199,7 +199,7 @@ async def test_success(
result = await fund_validators(
operator_address=OPERATOR_ADDR,
signatures=[faker.account_signature()],
validator_fundings=_make_validator_fundings(),
validators=_make_validators(),
)
assert result is not None

Expand All @@ -216,7 +216,7 @@ async def test_transaction_error_returns_none(
result = await fund_validators(
operator_address=OPERATOR_ADDR,
signatures=[faker.account_signature()],
validator_fundings=_make_validator_fundings(),
validators=_make_validators(),
)
assert result is None

Expand All @@ -239,15 +239,19 @@ async def test_failed_tx_receipt_returns_none(
result = await fund_validators(
operator_address=OPERATOR_ADDR,
signatures=[faker.account_signature()],
validator_fundings=_make_validator_fundings(),
validators=_make_validators(),
)
assert result is None


def _make_validator_fundings() -> dict[HexStr, Gwei]:
return {
faker.validator_public_key(): ether_to_gwei(32),
}
def _make_validators() -> list[Validator]:
return [
Validator(
public_key=faker.validator_public_key(),
amount=ether_to_gwei(32),
deposit_signature=faker.validator_signature(),
)
]


def _make_approval() -> NodeManagerRegistrationOraclesApproval:
Expand Down
Loading
Loading