Skip to content

Commit 605ebc9

Browse files
Add input paths to config file (#166)
1 parent b4f9483 commit 605ebc9

File tree

10 files changed

+120
-80
lines changed

10 files changed

+120
-80
lines changed

config.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,3 +58,9 @@ population_windows: 1
5858
plot_parameters:
5959
plot: false
6060
animated: false
61+
62+
# List of paths that specify where to look for raw block data. Relative to the root directory of the repository.
63+
# The first item in the list is the directory that is used to write newly fetched data when using the
64+
# `collect_block_data` script and is also the directory where tests expect the sample data to be found.
65+
input_directories:
66+
- raw_block_data

consensus_decentralization/helper.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
from yaml import safe_load
1414

1515
ROOT_DIR = pathlib.Path(__file__).resolve().parent.parent
16-
RAW_DATA_DIR = ROOT_DIR / 'raw_block_data'
1716
INTERIM_DIR = ROOT_DIR / 'processed_data'
1817
MAPPING_INFO_DIR = ROOT_DIR / 'mapping_information'
1918
RESULTS_DIR = ROOT_DIR / 'results'
@@ -464,3 +463,12 @@ def get_mapped_data_filename(clustering_flag):
464463
:returns: str
465464
"""
466465
return 'mapped_data_' + ('clustered' if clustering_flag else 'non_clustered') + '.json'
466+
467+
468+
def get_input_directories():
469+
"""
470+
Reads the config file and retrieves the directories to look for raw block data
471+
:returns: a list of directories that may contain the raw block data
472+
"""
473+
config = get_config_data()
474+
return [ROOT_DIR / input_dir for input_dir in config['input_directories']]

consensus_decentralization/parse.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,13 @@
1616
}
1717

1818

19-
def parse(project, input_dir):
19+
def parse(ledger, input_dirs):
2020
"""
2121
Parses raw data
22-
:param project: string that corresponds to the ledger whose data should be parsed
23-
:param input_dir: path to the directory of the raw block data
22+
:param ledger: string that corresponds to the ledger whose data should be parsed
23+
:param input_dirs: list of paths that point to the directories that contain raw block data
2424
:returns: list of dictionaries (the parsed data of the project)
2525
"""
26-
logging.info(f'Parsing {project} data..')
27-
parser = ledger_parser[project](project_name=project, input_dir=input_dir)
26+
logging.info(f'Parsing {ledger} data..')
27+
parser = ledger_parser[ledger](ledger=ledger, input_dirs=input_dirs)
2828
return parser.parse()

consensus_decentralization/parsers/default_parser.py

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,13 @@ class DefaultParser:
99
The default parser, used for Bitcoin, Litecoin, Zcash and others. Any project that requires different parsing
1010
must use a parser class that inherits from this one.
1111
12-
:ivar project_name: the name of the project associated with a specific parser instance
12+
:ivar ledger: the name of the ledger associated with a specific parser instance
13+
:ivar input_dirs: the directories where the raw block data are stored
1314
"""
1415

15-
def __init__(self, project_name, input_dir):
16-
self.project_name = project_name
17-
self.input_dir = input_dir
16+
def __init__(self, ledger, input_dirs):
17+
self.ledger = ledger
18+
self.input_dirs = input_dirs
1819

1920
@staticmethod
2021
def parse_identifiers(block_identifiers):
@@ -26,13 +27,30 @@ def parse_identifiers(block_identifiers):
2627
"""
2728
return str(codecs.decode(block_identifiers, 'hex'))
2829

30+
def get_input_file(self):
31+
"""
32+
Determines the file that contains the raw data for the project. The file is expected to be named
33+
<ledger>_raw_data.json and to be located in (exactly) one of the input directories.
34+
:returns: a Path object that corresponds to the file containing the raw data
35+
:raises FileNotFoundError: if the file does not exist in any of the input directories
36+
"""
37+
filename = f'{self.ledger}_raw_data.json'
38+
for input_dir in self.input_dirs:
39+
filepath = input_dir / filename
40+
if filepath.is_file():
41+
return filepath
42+
raise FileNotFoundError(f'File {self.ledger}_raw_data.json not found in the input directories. Skipping '
43+
f'{self.ledger}..')
44+
2945
def read_and_sort_data(self):
3046
"""
3147
Reads the "raw" block data associated with the project
3248
:returns: a list of dictionaries (block data) sorted by timestamp
3349
"""
34-
filename = f'{self.project_name}_raw_data.json'
35-
filepath = self.input_dir / filename
50+
try:
51+
filepath = self.get_input_file()
52+
except FileNotFoundError as e:
53+
raise e
3654
with open(filepath) as f:
3755
contents = f.read()
3856
data = [json.loads(item) for item in contents.strip().split('\n')]
@@ -48,8 +66,8 @@ def parse(self):
4866
data = self.read_and_sort_data()
4967

5068
for block in data:
51-
block['reward_addresses'] = ','.join(sorted([tx['addresses'][0] for tx in block['outputs']
52-
if (tx['addresses'] and int(tx['value']) > MIN_TX_VALUE)]))
69+
block['reward_addresses'] = ','.join(sorted([tx['addresses'][0] for tx in block['outputs'] if
70+
(tx['addresses'] and int(tx['value']) > MIN_TX_VALUE)]))
5371
del block['outputs']
5472
block['identifiers'] = self.parse_identifiers(block['identifiers'])
5573
return data

consensus_decentralization/parsers/dummy_parser.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@ class DummyParser(DefaultParser):
66
Dummy parser that only sorts the raw data. Used when the data are already in the required format.
77
"""
88

9-
def __init__(self, project_name, input_dir):
10-
super().__init__(project_name, input_dir)
9+
def __init__(self, ledger, input_dirs):
10+
super().__init__(ledger, input_dirs)
1111

1212
@staticmethod
1313
def parse_identifiers(block_identifiers):

consensus_decentralization/parsers/ethereum_parser.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ class EthereumParser(DummyParser):
77
Parser for Ethereum. Inherits from DummyParser class.
88
"""
99

10-
def __init__(self, project_name, input_dir):
11-
super().__init__(project_name, input_dir)
10+
def __init__(self, ledger, input_dirs):
11+
super().__init__(ledger, input_dirs)
1212

1313
@staticmethod
1414
def parse_identifiers(block_identifiers):
@@ -29,8 +29,10 @@ def read_and_sort_data(self):
2929
Note that the current version does not sort the data (because it is too memory-intensive) but assumes that the
3030
data are already sorted (which is generally the case given the suggested queries).
3131
"""
32-
filename = f'{self.project_name}_raw_data.json'
33-
filepath = self.input_dir / filename
32+
try:
33+
filepath = self.get_input_file()
34+
except FileNotFoundError as e:
35+
raise e
3436

3537
def generate_data():
3638
with open(filepath) as f:

data_collection_scripts/collect_block_data.py

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,10 @@
1616
from yaml import safe_load
1717
from datetime import datetime
1818

19-
from consensus_decentralization.helper import ROOT_DIR, RAW_DATA_DIR
19+
from consensus_decentralization.helper import ROOT_DIR
2020

2121

22-
def collect_data(ledgers, from_block, to_date):
23-
if not RAW_DATA_DIR.is_dir():
24-
RAW_DATA_DIR.mkdir()
25-
22+
def collect_data(raw_data_dir, ledgers, from_block, to_date):
2623
data_collection_dir = ROOT_DIR / "data_collection_scripts"
2724

2825
with open(data_collection_dir / "queries.yaml") as f:
@@ -31,7 +28,7 @@ def collect_data(ledgers, from_block, to_date):
3128
client = bq.Client.from_service_account_json(json_credentials_path=data_collection_dir / "google-service-account-key.json")
3229

3330
for ledger in ledgers:
34-
file = RAW_DATA_DIR / f'{ledger}_raw_data.json'
31+
file = raw_data_dir / f'{ledger}_raw_data.json'
3532
logging.info(f"Querying {ledger} from block {from_block[ledger]} until {to_date}..")
3633

3734
query = (queries[ledger]).replace("{{block_number}}", str(from_block[ledger]) if from_block[ledger] else "-1").replace("{{timestamp}}", to_date)
@@ -56,14 +53,13 @@ def collect_data(ledgers, from_block, to_date):
5653
logging.info(f'Done writing {ledger} data to file.\n')
5754

5855

59-
def get_last_block_collected(ledger):
56+
def get_last_block_collected(file):
6057
"""
6158
Get the last block collected for a ledger. This is useful for knowing where to start collecting data from.
6259
Assumes that the data is stored in a json lines file, ordered in increasing block number.
63-
:param ledger: the ledger to get the last block collected for
64-
:returns: the number of the last block collected for the specified ledger
60+
:param file: the file that corresponds to the ledger to get the last block collected for
61+
:returns: the number of the last ledger block collected in the file
6562
"""
66-
file = RAW_DATA_DIR / f'{ledger}_raw_data.json'
6763
if not file.is_file():
6864
return None
6965
with open(file) as f:
@@ -95,5 +91,8 @@ def get_last_block_collected(ledger):
9591
)
9692

9793
args = parser.parse_args()
98-
from_block = {ledger: get_last_block_collected(ledger) for ledger in args.ledgers}
99-
collect_data(ledgers=args.ledgers, from_block=from_block, to_date=args.to_date)
94+
raw_data_dir = hlp.get_input_directories()[0]
95+
if not raw_data_dir.is_dir():
96+
raw_data_dir.mkdir()
97+
from_block = {ledger: get_last_block_collected(file=raw_data_dir / f'{ledger}_raw_data.json') for ledger in args.ledgers}
98+
collect_data(raw_data_dir=raw_data_dir, ledgers=args.ledgers, from_block=from_block, to_date=args.to_date)

run.py

Lines changed: 31 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@ def process_data(force_map, ledger_dir, ledger, output_dir):
1313
clustering_flag = hlp.get_clustering_flag()
1414
mapped_data_file = ledger_dir / hlp.get_mapped_data_filename(clustering_flag)
1515
if force_map or not mapped_data_file.is_file():
16-
parsed_data = parse(ledger, input_dir=hlp.RAW_DATA_DIR)
16+
raw_data_dirs = hlp.get_input_directories()
17+
parsed_data = parse(ledger=ledger, input_dirs=raw_data_dirs)
1718
return apply_mapping(ledger, parsed_data=parsed_data, output_dir=output_dir)
1819
return None
1920

@@ -36,11 +37,16 @@ def main(ledgers, timeframe, estimation_window, frequency, population_windows, i
3637

3738
force_map = hlp.get_force_map_flag()
3839

39-
for ledger in ledgers:
40+
for ledger in list(ledgers):
4041
ledger_dir = interim_dir / ledger
4142
ledger_dir.mkdir(parents=True, exist_ok=True) # create ledger output directory if it doesn't already exist
4243

43-
mapped_data = process_data(force_map, ledger_dir, ledger, interim_dir)
44+
try:
45+
mapped_data = process_data(force_map, ledger_dir, ledger, interim_dir)
46+
except FileNotFoundError as e:
47+
logging.error(repr(e))
48+
ledgers.remove(ledger)
49+
continue
4450

4551
aggregate(
4652
ledger,
@@ -52,30 +58,31 @@ def main(ledgers, timeframe, estimation_window, frequency, population_windows, i
5258
mapped_data=mapped_data
5359
)
5460

55-
aggregated_data_filename = hlp.get_blocks_per_entity_filename(timeframe, estimation_window, frequency)
56-
metrics_dir = results_dir / 'metrics'
57-
metrics_dir.mkdir(parents=True, exist_ok=True)
58-
59-
used_metrics = analyze(
60-
projects=ledgers,
61-
aggregated_data_filename=aggregated_data_filename,
62-
population_windows=population_windows,
63-
input_dir=interim_dir,
64-
output_dir=metrics_dir
65-
)
66-
67-
if hlp.get_plot_flag():
68-
figures_dir = results_dir / 'figures'
69-
figures_dir.mkdir(parents=True, exist_ok=True)
70-
plot(
71-
ledgers=ledgers,
72-
metrics=used_metrics,
61+
if ledgers:
62+
aggregated_data_filename = hlp.get_blocks_per_entity_filename(timeframe, estimation_window, frequency)
63+
metrics_dir = results_dir / 'metrics'
64+
metrics_dir.mkdir(parents=True, exist_ok=True)
65+
66+
used_metrics = analyze(
67+
projects=ledgers,
7368
aggregated_data_filename=aggregated_data_filename,
74-
animated=hlp.get_plot_config_data()['animated'],
75-
metrics_dir=metrics_dir,
76-
figures_dir=figures_dir
69+
population_windows=population_windows,
70+
input_dir=interim_dir,
71+
output_dir=metrics_dir
7772
)
7873

74+
if hlp.get_plot_flag():
75+
figures_dir = results_dir / 'figures'
76+
figures_dir.mkdir(parents=True, exist_ok=True)
77+
plot(
78+
ledgers=ledgers,
79+
metrics=used_metrics,
80+
aggregated_data_filename=aggregated_data_filename,
81+
animated=hlp.get_plot_config_data()['animated'],
82+
metrics_dir=metrics_dir,
83+
figures_dir=figures_dir
84+
)
85+
7986

8087
if __name__ == '__main__':
8188
ledgers = hlp.get_ledgers()

tests/test_mappings.py

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
from consensus_decentralization.mappings.ethereum_mapping import EthereumMapping
1313
from consensus_decentralization.mappings.cardano_mapping import CardanoMapping
1414
from consensus_decentralization.mappings.tezos_mapping import TezosMapping
15-
from consensus_decentralization.helper import RAW_DATA_DIR, INTERIM_DIR, get_clustering_flag
15+
from consensus_decentralization.helper import INTERIM_DIR, get_clustering_flag, get_input_directories
1616

1717

1818
@pytest.fixture
@@ -31,7 +31,7 @@ def setup_and_cleanup():
3131
ledger_parser['sample_cardano'] = DummyParser
3232
ledger_mapping['sample_tezos'] = TezosMapping
3333
ledger_parser['sample_tezos'] = DummyParser
34-
test_raw_data_dir = RAW_DATA_DIR
34+
test_raw_data_dirs = get_input_directories()
3535
test_output_dir = INTERIM_DIR / "test_output"
3636
# Create the output directory for each project (as this is typically done in the run.py script before parsing or
3737
# mapping takes place)
@@ -41,7 +41,7 @@ def setup_and_cleanup():
4141
mapping_info_dir = pathlib.Path(__file__).resolve().parent.parent / 'mapping_information'
4242
# Mock return value of get_clustering_flag
4343
get_clustering_flag.return_value = True
44-
yield mapping_info_dir, test_raw_data_dir, test_output_dir
44+
yield mapping_info_dir, test_raw_data_dirs, test_output_dir
4545
# Clean up
4646
shutil.rmtree(test_output_dir)
4747

@@ -95,24 +95,24 @@ def prep_sample_tezos_mapping_info():
9595

9696

9797
def test_map(setup_and_cleanup, prep_sample_bitcoin_mapping_info):
98-
mapping_info_dir, test_raw_data_dir, test_output_dir = setup_and_cleanup
98+
mapping_info_dir, test_raw_data_dirs, test_output_dir = setup_and_cleanup
9999

100-
parsed_data = parse(project='sample_bitcoin', input_dir=test_raw_data_dir)
100+
parsed_data = parse(ledger='sample_bitcoin', input_dirs=test_raw_data_dirs)
101101
apply_mapping(project='sample_bitcoin', parsed_data=parsed_data, output_dir=test_output_dir)
102102

103103
mapped_data_file = test_output_dir / 'sample_bitcoin/mapped_data_clustered.json'
104104
assert mapped_data_file.is_file()
105105

106106

107107
def test_bitcoin_mapping(setup_and_cleanup, prep_sample_bitcoin_mapping_info):
108-
mapping_info_dir, test_raw_data_dir, test_output_dir = setup_and_cleanup
108+
mapping_info_dir, test_raw_data_dirs, test_output_dir = setup_and_cleanup
109109
with open(mapping_info_dir / 'addresses/sample_bitcoin.json') as f:
110110
pool_addresses = json.load(f)
111111
pool_addresses['0000000000000000000000000000000000000000'] = {'name': 'TEST2', 'source': ''}
112112
with open(mapping_info_dir / 'addresses/sample_bitcoin.json', 'w') as f:
113113
f.write(json.dumps(pool_addresses))
114114

115-
parsed_data = parse(project='sample_bitcoin', input_dir=test_raw_data_dir)
115+
parsed_data = parse(ledger='sample_bitcoin', input_dirs=test_raw_data_dirs)
116116
apply_mapping(project='sample_bitcoin', parsed_data=parsed_data, output_dir=test_output_dir)
117117

118118
expected_block_creators = {
@@ -136,15 +136,15 @@ def test_bitcoin_mapping(setup_and_cleanup, prep_sample_bitcoin_mapping_info):
136136

137137

138138
def test_ethereum_mapping(setup_and_cleanup, prep_sample_ethereum_mapping_info):
139-
mapping_info_dir, test_raw_data_dir, test_output_dir = setup_and_cleanup
139+
mapping_info_dir, test_raw_data_dirs, test_output_dir = setup_and_cleanup
140140

141141
with open(mapping_info_dir / 'addresses/sample_ethereum.json') as f:
142142
addresses = json.load(f)
143143
addresses['0xe9b54a47e3f401d37798fc4e22f14b78475c2afc'] = {'name': 'TEST2', 'source': ''}
144144
with open(mapping_info_dir / 'addresses/sample_ethereum.json', 'w') as f:
145145
f.write(json.dumps(addresses))
146146

147-
parsed_data = parse(project='sample_ethereum', input_dir=test_raw_data_dir)
147+
parsed_data = parse(ledger='sample_ethereum', input_dirs=test_raw_data_dirs)
148148
apply_mapping(project='sample_ethereum', parsed_data=parsed_data, output_dir=test_output_dir)
149149

150150
expected_block_creators = {
@@ -169,9 +169,9 @@ def test_ethereum_mapping(setup_and_cleanup, prep_sample_ethereum_mapping_info):
169169

170170

171171
def test_cardano_mapping(setup_and_cleanup, prep_sample_cardano_mapping_info):
172-
mapping_info_dir, test_raw_data_dir, test_output_dir = setup_and_cleanup
172+
mapping_info_dir, test_raw_data_dirs, test_output_dir = setup_and_cleanup
173173

174-
parsed_data = parse(project='sample_cardano', input_dir=test_raw_data_dir)
174+
parsed_data = parse(ledger='sample_cardano', input_dirs=test_raw_data_dirs)
175175
apply_mapping(project='sample_cardano', parsed_data=parsed_data, output_dir=test_output_dir)
176176

177177
expected_block_creators = {
@@ -193,9 +193,9 @@ def test_cardano_mapping(setup_and_cleanup, prep_sample_cardano_mapping_info):
193193

194194

195195
def test_tezos_mapping(setup_and_cleanup, prep_sample_tezos_mapping_info):
196-
mapping_info_dir, test_raw_data_dir, test_output_dir = setup_and_cleanup
196+
mapping_info_dir, test_raw_data_dirs, test_output_dir = setup_and_cleanup
197197

198-
parsed_data = parse(project='sample_tezos', input_dir=test_raw_data_dir)
198+
parsed_data = parse(ledger='sample_tezos', input_dirs=test_raw_data_dirs)
199199
apply_mapping(project='sample_tezos', parsed_data=parsed_data, output_dir=test_output_dir)
200200

201201
expected_block_creators = {

0 commit comments

Comments
 (0)