Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
207 changes: 132 additions & 75 deletions experiments/forking_simulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@
getLogger
)
from math import floor
from os import environ
from os import (
environ,
mkdir
)
from os.path import (
dirname,
exists as path_exists,
Expand All @@ -51,6 +54,8 @@
from tempfile import mkdtemp
from time import time as _time
from typing import (
Dict,
Iterable,
List,
Optional,
Set,
Expand Down Expand Up @@ -132,7 +137,7 @@ def __init__(

# Required to interact with the network & the nodes
self.loop = loop
self.nodes: List[TestNode] = []
self.nodes: Dict[int, TestNode] = {}
self.nodes_hub: Optional[NodesHub] = None
self.proposer_node_ids: List[int] = []
self.validator_node_ids: List[int] = []
Expand All @@ -145,11 +150,18 @@ def run(self) -> bool:
self.setup_chain()
self.setup_nodes()

try:
if self.num_validator_nodes > 0:
if self.num_validator_nodes > 0:
try:
self.autofinalization_workaround()
except BaseException as e:
self.logger.critical(
'Workaround execution failure', exc_info=e
)
return False
try:
self.start_nodes()
except (OSError, AssertionError):
except (OSError, AssertionError) as e:
self.logger.critical('Unable to start nodes', exc_info=e)
return False # Early shutdown

self.nodes_hub = NodesHub(
Expand All @@ -163,14 +175,19 @@ def run(self) -> bool:
self.nodes_hub.sync_start_proxies()
self.nodes_hub.sync_connect_nodes_graph(self.graph_edges)

# Notice that the validators have already loaded their wallets
self.logger.info('Importing wallets')
for idx, proposer_id in enumerate(self.proposer_node_ids):
if idx > 0:
self.nodes[proposer_id].createwallet(f'n{proposer_id}')
for node_id, node in self.nodes.items():
node.createwallet(f'n{node_id}')
tmp_wallet = node.get_wallet_rpc(f'n{node_id}')

tmp_wallet = self.nodes[proposer_id].get_wallet_rpc(f'n{proposer_id}')
tmp_wallet.importwallet(normpath(self.tmp_dir + f'/n{proposer_id}.wallet'))
if self.num_validator_nodes > 0:
tmp_wallet.importwallet(
normpath(self.tmp_dir + f'/n{node_id}.wallet')
)
else:
tmp_wallet.importmasterkey(
regtest_mnemonics[node_id]['mnemonics']
)

self.loop.run_until_complete(self.trigger_simulation_stop())
return True
Expand All @@ -183,10 +200,20 @@ def autofinalization_workaround(self):
self.logger.info('Running auto-finalization workaround')

lucky_proposer_id = self.proposer_node_ids[0]
validators = [self.nodes[i] for i in self.validator_node_ids]
lucky_node_ids = [lucky_proposer_id] + self.validator_node_ids

self.start_node(lucky_proposer_id)
self.start_nodes(validators)
# We'll start nodes isolated from the experiment's network, and reload
# their wallets later once the experiment starts after the workaround.
if not path_exists(self.tmp_dir + '/workaround'):
mkdir(self.tmp_dir + '/workaround')
for node_id in lucky_node_ids:
initialize_datadir(self.tmp_dir + '/workaround', node_id)

workaround_nodes = self.build_nodes_instances(
base_dir=self.tmp_dir + '/workaround',
node_ids=lucky_node_ids
)
self.start_nodes(workaround_nodes)

# Although we don't need to collect data during this initialization
# phase, we'll connect the nodes through a NodesHub instance to ensure
Expand All @@ -195,48 +222,63 @@ def autofinalization_workaround(self):
tmp_hub = NodesHub(
loop=self.loop,
latency_policy=StaticLatencyPolicy(0),
nodes=self.nodes,
nodes=workaround_nodes,
network_stats_collector=NullNetworkStatsCollector()
)
lucky_node_ids = [lucky_proposer_id] + self.validator_node_ids
tmp_hub.sync_start_proxies(lucky_node_ids)
dense_graph = create_simple_dense_graph(node_ids=lucky_node_ids)
tmp_hub.sync_connect_nodes_graph(dense_graph)

# We have to load some money into the nodes
lucky_proposer = self.nodes[lucky_proposer_id]
lucky_proposer = workaround_nodes[lucky_proposer_id]
for proposer_id in self.proposer_node_ids:
lucky_proposer.createwallet(f'n{proposer_id}')
tmp_wallet = lucky_proposer.get_wallet_rpc(f'n{proposer_id}')
tmp_wallet.importmasterkey(
regtest_mnemonics[proposer_id]['mnemonics']
)
for validator_id in self.validator_node_ids:
self.nodes[validator_id].createwallet(f'n{validator_id}')
tmp_wallet = self.nodes[validator_id].get_wallet_rpc(f'n{validator_id}')
workaround_nodes[validator_id].createwallet(f'n{validator_id}')
tmp_wallet = workaround_nodes[validator_id].get_wallet_rpc(f'n{validator_id}')
tmp_wallet.importmasterkey(
regtest_mnemonics[validator_id]['mnemonics']
)

self.loop.run_until_complete(self.ensure_autofinalization_is_off())
self.logger.info('Imported mnemonics into workaround nodes')

# Unloading the wallets that don't belong to the lucky proposer
self.loop.run_until_complete(self.ensure_autofinalization_is_off(
workaround_nodes
))

# Dumping wallets to be loaded later
for proposer_id in self.proposer_node_ids:
# The wallet file is created in the autofinalization_workaround method
tmp_wallet = lucky_proposer.get_wallet_rpc(f'n{proposer_id}')
tmp_wallet.dumpwallet(normpath(self.tmp_dir + f'/n{proposer_id}.wallet'))
if proposer_id != lucky_proposer_id:
lucky_proposer.unloadwallet(f'n{proposer_id}')
lucky_proposer.unloadwallet(f'n{proposer_id}')
for validator_id in self.validator_node_ids:
tmp_wallet = workaround_nodes[validator_id].get_wallet_rpc(f'n{validator_id}')
tmp_wallet.dumpwallet(normpath(self.tmp_dir + f'/n{validator_id}.wallet'))

self.logger.info('Dumped workaround wallets to be reused later')

# We close all temporary connections & shut down nodes
tmp_hub.close()
self.stop_nodes(workaround_nodes)

tmp_hub.close() # We close all temporary connections
# Cleaning workaround stuff
rmtree(self.tmp_dir + '/workaround')

# We recover the original topology for the full network
# self.num_nodes, self.graph_edges = tmp_num_nodes, tmp_graph_edges
self.logger.info('Finished auto-finalization workaround')

async def ensure_autofinalization_is_off(self):
async def ensure_autofinalization_is_off(
self,
workaround_nodes: Dict[int, TestNode]
):
for validator_id in self.validator_node_ids:
validator = self.nodes[validator_id]
validator = workaround_nodes[validator_id]
tmp_wallet = validator.get_wallet_rpc(f'n{validator_id}')
tmp_wallet.deposit(
tmp_wallet.getnewaddress('', 'legacy'),
Expand All @@ -250,7 +292,7 @@ async def ensure_autofinalization_is_off(self):
# We have to wait at least for one epoch :( .
await asyncio_sleep(1 + self.block_time_seconds * 50)

lucky_proposer = self.nodes[self.proposer_node_ids[0]]
lucky_proposer = workaround_nodes[self.proposer_node_ids[0]]
is_autofinalization_off = False

while not is_autofinalization_off:
Expand All @@ -266,6 +308,8 @@ def safe_run(self, close_loop=True) -> bool:
successful_run = False
try:
successful_run = self.run()
except BaseException as e:
self.logger.critical('The sub-experiment failed', exc_info=e)
finally:
self.logger.info('Releasing resources')
if self.nodes_hub is not None:
Expand Down Expand Up @@ -300,29 +344,13 @@ def cleanup_directories(self):
if self.tmp_dir != '' and path_exists(self.tmp_dir):
self.logger.info('Cleaning temporary directories')
rmtree(self.tmp_dir)
# TODO: Remove wallet.* files too

def setup_chain(self):
self.logger.info('Preparing "empty" chain')
for i in range(self.num_nodes):
initialize_datadir(self.tmp_dir, i)

def setup_nodes(self):
if len(self.nodes) > 0:
self.logger.info('Skipping nodes setup')
return

self.logger.info('Creating node wrappers')

all_node_ids = set(range(self.num_nodes))
self.proposer_node_ids = sample(
all_node_ids, self.num_proposer_nodes
)
self.validator_node_ids = sample(
all_node_ids.difference(self.proposer_node_ids),
self.num_validator_nodes
)

def get_node_args(self, node_id: int) -> List[str]:
# Some values are copied from test_framework.util.initialize_datadir, so
# they are redundant, but it's easier to see what's going on by having
# all of them together.
Expand All @@ -342,8 +370,8 @@ def setup_nodes(self):
'-debugexclude=leveldb',
'-mocktime=0',

f'-stakesplitthreshold={100 * UNIT}',
f'-stakecombinemaximum={100 * UNIT}',
f'-stakesplitthreshold={50 * UNIT}',
f'-stakecombinemaximum={50 * UNIT}',
f'''-customchainparams={json_dumps({
"block_time_seconds": self.block_time_seconds,
"block_stake_timestamp_interval_seconds": self.block_stake_timestamp_interval_seconds,
Expand All @@ -354,36 +382,61 @@ def setup_nodes(self):
for mnemonic in regtest_mnemonics
]
}
}, separators=(",",":"))}'''
}, separators=(",", ":"))}'''
]
relay_args = ['-proposing=0'] + node_args
proposer_args = ['-proposing=1'] + node_args
validator_args = ['-proposing=0', '-validating=1'] + node_args

if node_id in self.proposer_node_ids:
_node_args = proposer_args
elif node_id in self.validator_node_ids:
_node_args = validator_args
else:
_node_args = relay_args
return [
f'-bind=127.0.0.1:{NodesHub.get_p2p_node_port(node_id)}',
f'-rpcbind=127.0.0.1:{NodesHub.get_rpc_node_port(node_id)}',
f'''-stats-log-output-file={
self.nodes_stats_directory.joinpath(f"stats_{node_id}.csv")
}''',
f'-uacomment=simpatch{node_id}'
] + _node_args

def setup_nodes(self):
if len(self.nodes) > 0:
self.logger.info('Skipping nodes setup')
return

self.logger.info('Creating node wrappers')

all_node_ids = set(range(self.num_nodes))
self.proposer_node_ids = sample(
all_node_ids, self.num_proposer_nodes
)
self.validator_node_ids = sample(
all_node_ids.difference(self.proposer_node_ids),
self.num_validator_nodes
)

if not self.nodes_stats_directory.exists():
self.nodes_stats_directory.mkdir()

def get_node_args(node_id: int) -> List[str]:
if node_id in self.proposer_node_ids:
_node_args = proposer_args
elif node_id in self.validator_node_ids:
_node_args = validator_args
else:
_node_args = relay_args
return [
f'-bind=127.0.0.1:{NodesHub.get_p2p_node_port(node_id)}',
f'-rpcbind=127.0.0.1:{NodesHub.get_rpc_node_port(node_id)}',
f'''-stats-log-output-file={
self.nodes_stats_directory.joinpath(f"stats_{node_id}.csv")
}''',
f'-uacomment=simpatch{node_id}'
] + _node_args

self.nodes = [
TestNode(
self.nodes = self.build_nodes_instances(
base_dir=self.tmp_dir,
node_ids=range(self.num_nodes)
)

def build_nodes_instances(
self,
base_dir: str,
node_ids: Iterable[int]
) -> Dict[int, TestNode]:
return {
i: TestNode(
i=i,
datadir=f'{self.tmp_dir}/node{i}',
extra_args=get_node_args(i),
datadir=f'{base_dir}/node{i}',
extra_args=self.get_node_args(i),
rpchost=None,
timewait=60,
unit_e=environ['UNIT_E'],
Expand All @@ -392,8 +445,8 @@ def get_node_args(node_id: int) -> List[str]:
coverage_dir=None,
use_cli=False
)
for i in range(self.num_nodes)
]
for i in node_ids
}

def start_node(self, i: int):
node = self.nodes[i]
Expand All @@ -404,20 +457,20 @@ def start_node(self, i: int):
self.stop_nodes()
raise

def start_nodes(self, nodes: Optional[List[TestNode]] = None):
def start_nodes(self, nodes: Optional[Dict[int, TestNode]] = None):
self.logger.info('Starting nodes')

if nodes is None:
nodes = self.nodes

for node_id, node in enumerate(nodes):
for node_id, node in nodes.items():
try:
if not node.running:
node.start()
except OSError as e:
self.logger.critical(f'Node {node_id} failed to start', e)
raise
for node_id, node in enumerate(nodes):
for node_id, node in nodes.items():
try:
node.wait_for_rpc_connection()
except AssertionError as e:
Expand All @@ -429,14 +482,18 @@ def start_nodes(self, nodes: Optional[List[TestNode]] = None):

self.logger.info('Started nodes')

def stop_nodes(self):
def stop_nodes(self, nodes: Optional[Dict[int, TestNode]] = None):
self.logger.info('Stopping nodes')
for node in self.nodes:

if nodes is None:
nodes = self.nodes

for node in nodes.values():
try:
node.stop_node()
except AssertionError:
continue
for node in self.nodes:
for node in nodes.values():
node.wait_until_stopped()

def define_network_topology(self):
Expand Down
Loading