Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,9 @@ population_windows: 1
plot_parameters:
plot: false
animated: false

# List of paths that specify where to look for raw block data. Relative to the root directory of the repository.
# The first item in the list is the directory that is used to write newly fetched data when using the
# `collect_block_data` script and is also the directory where tests expect the sample data to be found.
input_directories:
- raw_block_data
10 changes: 9 additions & 1 deletion consensus_decentralization/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
from yaml import safe_load

ROOT_DIR = pathlib.Path(__file__).resolve().parent.parent
RAW_DATA_DIR = ROOT_DIR / 'raw_block_data'
INTERIM_DIR = ROOT_DIR / 'processed_data'
MAPPING_INFO_DIR = ROOT_DIR / 'mapping_information'
RESULTS_DIR = ROOT_DIR / 'results'
Expand Down Expand Up @@ -464,3 +463,12 @@ def get_mapped_data_filename(clustering_flag):
:returns: str
"""
return 'mapped_data_' + ('clustered' if clustering_flag else 'non_clustered') + '.json'


def get_input_directories():
"""
Reads the config file and retrieves the directories to look for raw block data
:returns: a list of directories that may contain the raw block data
"""
config = get_config_data()
return [ROOT_DIR / input_dir for input_dir in config['input_directories']]
10 changes: 5 additions & 5 deletions consensus_decentralization/parse.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@
}


def parse(project, input_dir):
def parse(ledger, input_dirs):
"""
Parses raw data
:param project: string that corresponds to the ledger whose data should be parsed
:param input_dir: path to the directory of the raw block data
:param ledger: string that corresponds to the ledger whose data should be parsed
:param input_dirs: list of paths that point to the directories that contain raw block data
:returns: list of dictionaries (the parsed data of the project)
"""
logging.info(f'Parsing {project} data..')
parser = ledger_parser[project](project_name=project, input_dir=input_dir)
logging.info(f'Parsing {ledger} data..')
parser = ledger_parser[ledger](ledger=ledger, input_dirs=input_dirs)
return parser.parse()
34 changes: 26 additions & 8 deletions consensus_decentralization/parsers/default_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@ class DefaultParser:
The default parser, used for Bitcoin, Litecoin, Zcash and others. Any project that requires different parsing
must use a parser class that inherits from this one.

:ivar project_name: the name of the project associated with a specific parser instance
:ivar ledger: the name of the ledger associated with a specific parser instance
:ivar input_dirs: the directories where the raw block data are stored
"""

def __init__(self, project_name, input_dir):
self.project_name = project_name
self.input_dir = input_dir
def __init__(self, ledger, input_dirs):
self.ledger = ledger
self.input_dirs = input_dirs

@staticmethod
def parse_identifiers(block_identifiers):
Expand All @@ -26,13 +27,30 @@ def parse_identifiers(block_identifiers):
"""
return str(codecs.decode(block_identifiers, 'hex'))

def get_input_file(self):
"""
Determines the file that contains the raw data for the project. The file is expected to be named
<ledger>_raw_data.json and to be located in (exactly) one of the input directories.
:returns: a Path object that corresponds to the file containing the raw data
:raises FileNotFoundError: if the file does not exist in any of the input directories
"""
filename = f'{self.ledger}_raw_data.json'
for input_dir in self.input_dirs:
filepath = input_dir / filename
if filepath.is_file():
return filepath
raise FileNotFoundError(f'File {self.ledger}_raw_data.json not found in the input directories. Skipping '
f'{self.ledger}..')

def read_and_sort_data(self):
"""
Reads the "raw" block data associated with the project
:returns: a list of dictionaries (block data) sorted by timestamp
"""
filename = f'{self.project_name}_raw_data.json'
filepath = self.input_dir / filename
try:
filepath = self.get_input_file()
except FileNotFoundError as e:
raise e
with open(filepath) as f:
contents = f.read()
data = [json.loads(item) for item in contents.strip().split('\n')]
Expand All @@ -48,8 +66,8 @@ def parse(self):
data = self.read_and_sort_data()

for block in data:
block['reward_addresses'] = ','.join(sorted([tx['addresses'][0] for tx in block['outputs']
if (tx['addresses'] and int(tx['value']) > MIN_TX_VALUE)]))
block['reward_addresses'] = ','.join(sorted([tx['addresses'][0] for tx in block['outputs'] if
(tx['addresses'] and int(tx['value']) > MIN_TX_VALUE)]))
del block['outputs']
block['identifiers'] = self.parse_identifiers(block['identifiers'])
return data
4 changes: 2 additions & 2 deletions consensus_decentralization/parsers/dummy_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ class DummyParser(DefaultParser):
Dummy parser that only sorts the raw data. Used when the data are already in the required format.
"""

def __init__(self, project_name, input_dir):
super().__init__(project_name, input_dir)
def __init__(self, ledger, input_dirs):
super().__init__(ledger, input_dirs)

@staticmethod
def parse_identifiers(block_identifiers):
Expand Down
10 changes: 6 additions & 4 deletions consensus_decentralization/parsers/ethereum_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ class EthereumParser(DummyParser):
Parser for Ethereum. Inherits from DummyParser class.
"""

def __init__(self, project_name, input_dir):
super().__init__(project_name, input_dir)
def __init__(self, ledger, input_dirs):
super().__init__(ledger, input_dirs)

@staticmethod
def parse_identifiers(block_identifiers):
Expand All @@ -29,8 +29,10 @@ def read_and_sort_data(self):
Note that the current version does not sort the data (because it is too memory-intensive) but assumes that the
data are already sorted (which is generally the case given the suggested queries).
"""
filename = f'{self.project_name}_raw_data.json'
filepath = self.input_dir / filename
try:
filepath = self.get_input_file()
except FileNotFoundError as e:
raise e

def generate_data():
with open(filepath) as f:
Expand Down
23 changes: 11 additions & 12 deletions data_collection_scripts/collect_block_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,10 @@
from yaml import safe_load
from datetime import datetime

from consensus_decentralization.helper import ROOT_DIR, RAW_DATA_DIR
from consensus_decentralization.helper import ROOT_DIR


def collect_data(ledgers, from_block, to_date):
if not RAW_DATA_DIR.is_dir():
RAW_DATA_DIR.mkdir()

def collect_data(raw_data_dir, ledgers, from_block, to_date):
data_collection_dir = ROOT_DIR / "data_collection_scripts"

with open(data_collection_dir / "queries.yaml") as f:
Expand All @@ -31,7 +28,7 @@ def collect_data(ledgers, from_block, to_date):
client = bq.Client.from_service_account_json(json_credentials_path=data_collection_dir / "google-service-account-key.json")

for ledger in ledgers:
file = RAW_DATA_DIR / f'{ledger}_raw_data.json'
file = raw_data_dir / f'{ledger}_raw_data.json'
logging.info(f"Querying {ledger} from block {from_block[ledger]} until {to_date}..")

query = (queries[ledger]).replace("{{block_number}}", str(from_block[ledger]) if from_block[ledger] else "-1").replace("{{timestamp}}", to_date)
Expand All @@ -56,14 +53,13 @@ def collect_data(ledgers, from_block, to_date):
logging.info(f'Done writing {ledger} data to file.\n')


def get_last_block_collected(ledger):
def get_last_block_collected(file):
"""
Get the last block collected for a ledger. This is useful for knowing where to start collecting data from.
Assumes that the data is stored in a json lines file, ordered in increasing block number.
:param ledger: the ledger to get the last block collected for
:returns: the number of the last block collected for the specified ledger
:param file: the file that corresponds to the ledger to get the last block collected for
:returns: the number of the last ledger block collected in the file
"""
file = RAW_DATA_DIR / f'{ledger}_raw_data.json'
if not file.is_file():
return None
with open(file) as f:
Expand Down Expand Up @@ -95,5 +91,8 @@ def get_last_block_collected(ledger):
)

args = parser.parse_args()
from_block = {ledger: get_last_block_collected(ledger) for ledger in args.ledgers}
collect_data(ledgers=args.ledgers, from_block=from_block, to_date=args.to_date)
raw_data_dir = hlp.get_input_directories()[0]
if not raw_data_dir.is_dir():
raw_data_dir.mkdir()
from_block = {ledger: get_last_block_collected(file=raw_data_dir / f'{ledger}_raw_data.json') for ledger in args.ledgers}
collect_data(raw_data_dir=raw_data_dir, ledgers=args.ledgers, from_block=from_block, to_date=args.to_date)
55 changes: 31 additions & 24 deletions run.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ def process_data(force_map, ledger_dir, ledger, output_dir):
clustering_flag = hlp.get_clustering_flag()
mapped_data_file = ledger_dir / hlp.get_mapped_data_filename(clustering_flag)
if force_map or not mapped_data_file.is_file():
parsed_data = parse(ledger, input_dir=hlp.RAW_DATA_DIR)
raw_data_dirs = hlp.get_input_directories()
parsed_data = parse(ledger=ledger, input_dirs=raw_data_dirs)
return apply_mapping(ledger, parsed_data=parsed_data, output_dir=output_dir)
return None

Expand All @@ -36,11 +37,16 @@ def main(ledgers, timeframe, estimation_window, frequency, population_windows, i

force_map = hlp.get_force_map_flag()

for ledger in ledgers:
for ledger in list(ledgers):
ledger_dir = interim_dir / ledger
ledger_dir.mkdir(parents=True, exist_ok=True) # create ledger output directory if it doesn't already exist

mapped_data = process_data(force_map, ledger_dir, ledger, interim_dir)
try:
mapped_data = process_data(force_map, ledger_dir, ledger, interim_dir)
except FileNotFoundError as e:
logging.error(repr(e))
ledgers.remove(ledger)
continue

aggregate(
ledger,
Expand All @@ -52,30 +58,31 @@ def main(ledgers, timeframe, estimation_window, frequency, population_windows, i
mapped_data=mapped_data
)

aggregated_data_filename = hlp.get_blocks_per_entity_filename(timeframe, estimation_window, frequency)
metrics_dir = results_dir / 'metrics'
metrics_dir.mkdir(parents=True, exist_ok=True)

used_metrics = analyze(
projects=ledgers,
aggregated_data_filename=aggregated_data_filename,
population_windows=population_windows,
input_dir=interim_dir,
output_dir=metrics_dir
)

if hlp.get_plot_flag():
figures_dir = results_dir / 'figures'
figures_dir.mkdir(parents=True, exist_ok=True)
plot(
ledgers=ledgers,
metrics=used_metrics,
if ledgers:
aggregated_data_filename = hlp.get_blocks_per_entity_filename(timeframe, estimation_window, frequency)
metrics_dir = results_dir / 'metrics'
metrics_dir.mkdir(parents=True, exist_ok=True)

used_metrics = analyze(
projects=ledgers,
aggregated_data_filename=aggregated_data_filename,
animated=hlp.get_plot_config_data()['animated'],
metrics_dir=metrics_dir,
figures_dir=figures_dir
population_windows=population_windows,
input_dir=interim_dir,
output_dir=metrics_dir
)

if hlp.get_plot_flag():
figures_dir = results_dir / 'figures'
figures_dir.mkdir(parents=True, exist_ok=True)
plot(
ledgers=ledgers,
metrics=used_metrics,
aggregated_data_filename=aggregated_data_filename,
animated=hlp.get_plot_config_data()['animated'],
metrics_dir=metrics_dir,
figures_dir=figures_dir
)


if __name__ == '__main__':
ledgers = hlp.get_ledgers()
Expand Down
26 changes: 13 additions & 13 deletions tests/test_mappings.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from consensus_decentralization.mappings.ethereum_mapping import EthereumMapping
from consensus_decentralization.mappings.cardano_mapping import CardanoMapping
from consensus_decentralization.mappings.tezos_mapping import TezosMapping
from consensus_decentralization.helper import RAW_DATA_DIR, INTERIM_DIR, get_clustering_flag
from consensus_decentralization.helper import INTERIM_DIR, get_clustering_flag, get_input_directories


@pytest.fixture
Expand All @@ -31,7 +31,7 @@ def setup_and_cleanup():
ledger_parser['sample_cardano'] = DummyParser
ledger_mapping['sample_tezos'] = TezosMapping
ledger_parser['sample_tezos'] = DummyParser
test_raw_data_dir = RAW_DATA_DIR
test_raw_data_dirs = get_input_directories()
test_output_dir = INTERIM_DIR / "test_output"
# Create the output directory for each project (as this is typically done in the run.py script before parsing or
# mapping takes place)
Expand All @@ -41,7 +41,7 @@ def setup_and_cleanup():
mapping_info_dir = pathlib.Path(__file__).resolve().parent.parent / 'mapping_information'
# Mock return value of get_clustering_flag
get_clustering_flag.return_value = True
yield mapping_info_dir, test_raw_data_dir, test_output_dir
yield mapping_info_dir, test_raw_data_dirs, test_output_dir
# Clean up
shutil.rmtree(test_output_dir)

Expand Down Expand Up @@ -95,24 +95,24 @@ def prep_sample_tezos_mapping_info():


def test_map(setup_and_cleanup, prep_sample_bitcoin_mapping_info):
mapping_info_dir, test_raw_data_dir, test_output_dir = setup_and_cleanup
mapping_info_dir, test_raw_data_dirs, test_output_dir = setup_and_cleanup

parsed_data = parse(project='sample_bitcoin', input_dir=test_raw_data_dir)
parsed_data = parse(ledger='sample_bitcoin', input_dirs=test_raw_data_dirs)
apply_mapping(project='sample_bitcoin', parsed_data=parsed_data, output_dir=test_output_dir)

mapped_data_file = test_output_dir / 'sample_bitcoin/mapped_data_clustered.json'
assert mapped_data_file.is_file()


def test_bitcoin_mapping(setup_and_cleanup, prep_sample_bitcoin_mapping_info):
mapping_info_dir, test_raw_data_dir, test_output_dir = setup_and_cleanup
mapping_info_dir, test_raw_data_dirs, test_output_dir = setup_and_cleanup
with open(mapping_info_dir / 'addresses/sample_bitcoin.json') as f:
pool_addresses = json.load(f)
pool_addresses['0000000000000000000000000000000000000000'] = {'name': 'TEST2', 'source': ''}
with open(mapping_info_dir / 'addresses/sample_bitcoin.json', 'w') as f:
f.write(json.dumps(pool_addresses))

parsed_data = parse(project='sample_bitcoin', input_dir=test_raw_data_dir)
parsed_data = parse(ledger='sample_bitcoin', input_dirs=test_raw_data_dirs)
apply_mapping(project='sample_bitcoin', parsed_data=parsed_data, output_dir=test_output_dir)

expected_block_creators = {
Expand All @@ -136,15 +136,15 @@ def test_bitcoin_mapping(setup_and_cleanup, prep_sample_bitcoin_mapping_info):


def test_ethereum_mapping(setup_and_cleanup, prep_sample_ethereum_mapping_info):
mapping_info_dir, test_raw_data_dir, test_output_dir = setup_and_cleanup
mapping_info_dir, test_raw_data_dirs, test_output_dir = setup_and_cleanup

with open(mapping_info_dir / 'addresses/sample_ethereum.json') as f:
addresses = json.load(f)
addresses['0xe9b54a47e3f401d37798fc4e22f14b78475c2afc'] = {'name': 'TEST2', 'source': ''}
with open(mapping_info_dir / 'addresses/sample_ethereum.json', 'w') as f:
f.write(json.dumps(addresses))

parsed_data = parse(project='sample_ethereum', input_dir=test_raw_data_dir)
parsed_data = parse(ledger='sample_ethereum', input_dirs=test_raw_data_dirs)
apply_mapping(project='sample_ethereum', parsed_data=parsed_data, output_dir=test_output_dir)

expected_block_creators = {
Expand All @@ -169,9 +169,9 @@ def test_ethereum_mapping(setup_and_cleanup, prep_sample_ethereum_mapping_info):


def test_cardano_mapping(setup_and_cleanup, prep_sample_cardano_mapping_info):
mapping_info_dir, test_raw_data_dir, test_output_dir = setup_and_cleanup
mapping_info_dir, test_raw_data_dirs, test_output_dir = setup_and_cleanup

parsed_data = parse(project='sample_cardano', input_dir=test_raw_data_dir)
parsed_data = parse(ledger='sample_cardano', input_dirs=test_raw_data_dirs)
apply_mapping(project='sample_cardano', parsed_data=parsed_data, output_dir=test_output_dir)

expected_block_creators = {
Expand All @@ -193,9 +193,9 @@ def test_cardano_mapping(setup_and_cleanup, prep_sample_cardano_mapping_info):


def test_tezos_mapping(setup_and_cleanup, prep_sample_tezos_mapping_info):
mapping_info_dir, test_raw_data_dir, test_output_dir = setup_and_cleanup
mapping_info_dir, test_raw_data_dirs, test_output_dir = setup_and_cleanup

parsed_data = parse(project='sample_tezos', input_dir=test_raw_data_dir)
parsed_data = parse(ledger='sample_tezos', input_dirs=test_raw_data_dirs)
apply_mapping(project='sample_tezos', parsed_data=parsed_data, output_dir=test_output_dir)

expected_block_creators = {
Expand Down
Loading