Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,7 @@ output
raw_block_data/*_raw_data.json
.coverage
site
.ipynb_checkpoints/
*.ipynb
processed_data
results
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ analyzed. If only the timeframe is specified, all ledgers will be analyzed for
the given timeframe. If no arguments are given, all ledgers will be analyzed for
all months since January 2018.

Three files `nc.csv`, `gini.csv`, `entropy.csv` are also created in the `output` directory, containing the data from the
last execution of `run.py`.
Three files `nc.csv`, `gini.csv`, `entropy.csv` are also created in the `results` directory, containing the data from
the last execution of `run.py`.

## Contributing

Expand Down
24 changes: 10 additions & 14 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ metrics:
hhi:
nakamoto_coefficient:
theil_index:
max_power_ratio:
concentration_ratio:
- 1
- 3
tau_index:
- 0.33
- 0.66
Expand Down Expand Up @@ -36,8 +38,8 @@ analyze_flags:
# The timeframe for which an analysis should be performed.
# Each date is a string of the form YYYY-MM-DD.
timeframe:
start_date: 2011-01-01
end_date: 2023-12-31
start_date: 2018-01-01
end_date: 2025-03-01

# The number of days to use for the estimation window, i.e.how many days of blocks to use for each data point.
# If left empty, then the entire time frame will be used (only valid when combined with empty frequency).
Expand All @@ -46,19 +48,13 @@ estimation_window: 30
# How frequently to sample the data, in days
# If left empty, then only one data point will be analyzed (snapshot instead of longitudinal analysis), but this is
# only valid when combined with an empty estimation_window.
frequency: 30 # todo maybe add hadrcoded values for day, week, month, year (in the code that parses this) + for the estimation window


input_directories: # Paths to directories that contain raw input data
- ./input

# Paths to directories of snapshot db files; either absolute or relative from run.py.
# The first path will be used to write newly created dbs and the output of runs
output_directories:
- ./output
frequency: 30

# A number that specifies how many windows to look back and forward when deciding whether an entity is active on a
# given time period, or 'all' to count all entities that have produced blocks in the entire observation period.
population_windows: 1

# Plot flags
plot_parameters:
plot: false
animated: true
animated: false
10 changes: 5 additions & 5 deletions consensus_decentralization/aggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,16 @@ class Aggregator:
blocks they produced
"""

def __init__(self, project, io_dir):
def __init__(self, project, io_dir, mapped_data=None):
"""
:param project: str. Name of the project
:param io_dir: Path. Path to the project's output directory
"""
self.project = project
self.data_to_aggregate = hlp.read_mapped_project_data(io_dir)
self.data_to_aggregate = hlp.read_mapped_project_data(io_dir) if mapped_data is None else mapped_data
self.data_start_date = hlp.get_timeframe_beginning(hlp.get_date_from_block(self.data_to_aggregate[0]))
self.data_end_date = hlp.get_timeframe_beginning(hlp.get_date_from_block(self.data_to_aggregate[-1]))
self.aggregated_data_dir = io_dir / 'blocks_per_entity'
self.aggregated_data_dir = io_dir / hlp.get_aggregated_data_dir_name(hlp.get_clustering_flag())
self.aggregated_data_dir.mkdir(parents=True, exist_ok=True)

self.monthly_data_breaking_points = [(self.data_start_date.strftime('%Y-%m'), 0)]
Expand Down Expand Up @@ -89,7 +89,7 @@ def divide_timeframe(timeframe, estimation_window, frequency):
return time_chunks


def aggregate(project, output_dir, timeframe, estimation_window, frequency, force_aggregate):
def aggregate(project, output_dir, timeframe, estimation_window, frequency, force_aggregate, mapped_data=None):
"""
Aggregates the results of the mapping process for the given project and timeframe. The results are saved in a csv
file in the project's output directory. Note that the output file is created (just with the headers) even if there
Expand All @@ -113,7 +113,7 @@ def aggregate(project, output_dir, timeframe, estimation_window, frequency, forc
raise ValueError('The estimation window is too large for the given timeframe')

project_io_dir = output_dir / project
aggregator = Aggregator(project, project_io_dir)
aggregator = Aggregator(project, project_io_dir, mapped_data=mapped_data)

filename = hlp.get_blocks_per_entity_filename(timeframe=timeframe, estimation_window=estimation_window, frequency=frequency)
output_file = aggregator.aggregated_data_dir / filename
Expand Down
16 changes: 11 additions & 5 deletions consensus_decentralization/analyze.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,25 @@
from consensus_decentralization.metrics.entropy import compute_entropy, compute_entropy_percentage # noqa: F401
from consensus_decentralization.metrics.herfindahl_hirschman_index import compute_hhi # noqa: F401
from consensus_decentralization.metrics.theil_index import compute_theil_index # noqa: F401
from consensus_decentralization.metrics.max_power_ratio import compute_max_power_ratio # noqa: F401
from consensus_decentralization.metrics.concentration_ratio import compute_concentration_ratio # noqa: F401
from consensus_decentralization.metrics.tau_index import compute_tau_index # noqa: F401
from consensus_decentralization.metrics.total_entities import compute_total_entities # noqa: F401


def analyze(projects, aggregated_data_filename, output_dir):
def analyze(projects, aggregated_data_filename, input_dir, output_dir, population_windows):
"""
Calculates all available metrics for the given ledgers and timeframes. Outputs one file for each metric.
:param projects: list of strings that correspond to the ledgers whose data should be analyzed
:param aggregated_data_filename: string that corresponds to the name of the file that contains the aggregated data
:param input_dir: the directory where the aggregated data is located
:param output_dir: the directory to save the results in
:param population_windows: the number of windows to look backwards and forwards to determine the population of
active block producers for a given time period
:returns: a list with the names of all the metrics that were used

Using multiple projects and timeframes is necessary here to produce collective csv files.
"""

logging.info('Calculating metrics on aggregated data..')
metrics = hlp.get_metrics_config()
metric_params = []
Expand All @@ -30,6 +35,7 @@ def analyze(projects, aggregated_data_filename, output_dir):
else:
metric_params.append((key, key, None))
metric_names = [name for name, _, _ in metric_params]
clustering_flag = hlp.get_clustering_flag()

aggregate_output = {}

Expand All @@ -42,8 +48,9 @@ def analyze(projects, aggregated_data_filename, output_dir):
for column_index, project in enumerate(projects):
logging.info(f'Calculating {project} metrics')
aggregate_output[project] = {}
aggregated_data_dir = output_dir / project / 'blocks_per_entity'
dates, blocks_per_entity = hlp.get_blocks_per_entity_from_file(aggregated_data_dir / aggregated_data_filename)
aggregated_data_dir = input_dir / project / hlp.get_aggregated_data_dir_name(clustering_flag)
dates, blocks_per_entity = hlp.get_blocks_per_entity_from_file(aggregated_data_dir /
aggregated_data_filename, population_windows)
for date in dates:
aggregate_output[project][date] = {}

Expand Down Expand Up @@ -80,7 +87,6 @@ def analyze(projects, aggregated_data_filename, output_dir):
csv_writer = csv.writer(f)
csv_writer.writerows(csv_contents[metric])

clustering_flag = hlp.get_config_data()['analyze_flags']['clustering']
aggregate_csv_output = [['ledger', 'date', 'clustering'] + metric_names]
for project, timeframes in aggregate_output.items():
for date, results in timeframes.items():
Expand Down
77 changes: 73 additions & 4 deletions consensus_decentralization/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@

ROOT_DIR = pathlib.Path(__file__).resolve().parent.parent
RAW_DATA_DIR = ROOT_DIR / 'raw_block_data'
OUTPUT_DIR = ROOT_DIR / 'output'
INTERIM_DIR = ROOT_DIR / 'processed_data'
MAPPING_INFO_DIR = ROOT_DIR / 'mapping_information'
RESULTS_DIR = ROOT_DIR / 'results'

with open(ROOT_DIR / "config.yaml") as f:
config = safe_load(f)
Expand Down Expand Up @@ -190,11 +191,13 @@ def write_blocks_per_entity_to_file(output_dir, blocks_per_entity, dates, filena
csv_writer.writerow(entity_row)


def get_blocks_per_entity_from_file(filepath):
def get_blocks_per_entity_from_file(filepath, population_windows):
"""
Retrieves information about the number of blocks that each entity produced over some timeframe for some project.
:param filepath: the path to the file with the relevant information. It can be either an absolute or a relative
path in either a pathlib.PosixPath object or a string.
:param population_windows: int representing the number of windows to look back and forward when determining if an
entity is active during a certain time frame
:returns: a tuple of length 2 where the first item is a list of time chunks (strings) and the second item is a
dictionary with entities (keys) and a list of the number of blocks they produced during each time chunk (values)
"""
Expand All @@ -206,7 +209,17 @@ def get_blocks_per_entity_from_file(filepath):
for row in csv_reader:
entity = row[0]
for idx, item in enumerate(row[1:]):
if item != '0':
if item == '0':
if population_windows == 'all':
blocks_per_entity[entity][dates[idx]] = 0
else:
# If the entity hasn't produced any blocks in the current time chunk, we only consider it as
# active if it has produced at least one block in population_windows time chunks before or after
# (otherwise it's not considered part of the population for this time frame)
for i in range(max(0, idx - population_windows), min(len(row) - 1, idx + population_windows + 1)):
if row[i + 1] != '0':
blocks_per_entity[entity][dates[idx]] = 0
else:
blocks_per_entity[entity][dates[idx]] = int(item)
return dates, blocks_per_entity

Expand Down Expand Up @@ -294,7 +307,7 @@ def read_mapped_project_data(project_dir):
:param project_dir: pathlib.PosixPath object of the output directory corresponding to the project
:returns: a dictionary with the mapped data
"""
with open(project_dir / 'mapped_data.json') as f:
with open(project_dir / get_mapped_data_filename(get_clustering_flag())) as f:
data = json.load(f)
return data

Expand All @@ -309,6 +322,15 @@ def get_representative_dates(time_chunks):
return [str(chunk[0] + (chunk[1] - chunk[0]) // 2) for chunk in time_chunks]


def get_aggregated_data_dir_name(clustering_flag):
"""
Determines the name of the directory that will contain the aggregated data
:param clustering_flag: boolean that determines whether the data is clustered or not
:returns: str that corresponds to the name of the directory
"""
return 'blocks_per_entity_' + ('clustered' if clustering_flag else 'non_clustered')


def get_blocks_per_entity_filename(timeframe, estimation_window, frequency):
"""
Determines the filename of the csv file that contains the aggregated data
Expand Down Expand Up @@ -363,6 +385,21 @@ def get_estimation_window_and_frequency():
raise ValueError('"estimation_window" or "frequency" missing from config file')


def get_population_windows():
"""
Retrieves the number of windows to be used for estimating the population of block producers
:returns: int representing the number of windows to look back and forward when determining if an entity is active
during a certain time frame
:raises ValueError: if the population_windows field is missing from the config file
"""
try:
config = get_config_data()
population_windows = config['population_windows']
return population_windows
except KeyError:
raise ValueError('"population_windows" missing from config file')


def get_plot_flag():
"""
Gets the flag that determines whether generate plots for the output
Expand Down Expand Up @@ -395,3 +432,35 @@ def get_force_map_flag():
return config['execution_flags']['force_map']
except KeyError:
raise ValueError('Flag "force_map" missing from config file')


def get_clustering_flag():
"""
Gets the flag that determines whether to perform clustering
:returns: boolean
:raises ValueError: if the flag is not set in the config file
"""
config = get_config_data()
try:
return config['analyze_flags']['clustering']
except KeyError:
raise ValueError('Flag "clustering" missing from config file')


def get_results_dir(estimation_window, frequency, population_windows):
"""
Retrieves the path to the results directory for the specific config parameters
:returns: pathlib.PosixPath object
"""
results_dir_name = (f'{estimation_window}_day_window_with_{population_windows}_population_windows_sampled_every'
f'_{frequency}_days')
return RESULTS_DIR / results_dir_name


def get_mapped_data_filename(clustering_flag):
"""
Retrieves the filename of the mapped data file
:param clustering_flag: boolean that determines whether the data is clustered or not
:returns: str
"""
return 'mapped_data_' + ('clustered' if clustering_flag else 'non_clustered') + '.json'
12 changes: 6 additions & 6 deletions consensus_decentralization/mappings/default_mapping.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json

import consensus_decentralization.helper as hlp


Expand All @@ -9,8 +10,6 @@ class DefaultMapping:

:ivar project_name: the name of the project associated with a specific mapping instance
:ivar output_dir: the directory that includes the parsed data related to the project
:ivar mapped_data_dir: the directory to save the mapped data files in
:ivar multi_pool_dir: the directory to save the multi pool data files in
:ivar data_to_map: a list with the parsed data of the project (list of dictionaries with block information
:ivar special_addresses: a set with the special addresses of the project (addresses that don't count in the
context of out analysis)
Expand Down Expand Up @@ -45,7 +44,7 @@ def perform_mapping(self):
project.
:returns: a list of dictionaries (mapped block data)
"""
clustering_flag = hlp.get_config_data()['analyze_flags']['clustering']
clustering_flag = hlp.get_clustering_flag()
for block in self.data_to_map:
if not clustering_flag:
entity = self.fallback_mapping(block)
Expand Down Expand Up @@ -83,7 +82,7 @@ def perform_mapping(self):
})

if len(self.mapped_data) > 0:
self.write_mapped_data()
self.write_mapped_data(clustering_flag)
self.write_multi_pool_files()

return self.mapped_data
Expand Down Expand Up @@ -187,11 +186,12 @@ def write_multi_pool_files(self):
with open(self.output_dir / 'multi_pool_blocks.csv', 'w') as f:
f.write('Block No,Timestamp,Entities\n' + '\n'.join(self.multi_pool_blocks))

def write_mapped_data(self):
def write_mapped_data(self, clustering_flag):
"""
Writes the mapped data into a file in a directory associated with the mapping instance. Specifically,
into a folder named after the project, inside the general output directory
:param clustering_flag: boolean, indicating whether clustering was used in the mapping process
"""
filename = 'mapped_data.json'
filename = hlp.get_mapped_data_filename(clustering_flag)
with open(self.output_dir / filename, 'w') as f:
json.dump(self.mapped_data, f, indent=4)
3 changes: 2 additions & 1 deletion consensus_decentralization/mappings/dummy_mapping.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from consensus_decentralization.mappings.default_mapping import DefaultMapping
import consensus_decentralization.helper as hlp


class DummyMapping(DefaultMapping):
Expand Down Expand Up @@ -28,6 +29,6 @@ def perform_mapping(self):
})

if len(self.mapped_data) > 0:
self.write_mapped_data()
self.write_mapped_data(hlp.get_clustering_flag())

return self.mapped_data
9 changes: 9 additions & 0 deletions consensus_decentralization/metrics/concentration_ratio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
def compute_concentration_ratio(block_distribution, topn):
"""
Calculates the n-concentration ratio of a distribution of balances
:param block_distribution: a list of integers, each being the blocks that an entity has produced, sorted in descending order
:param topn: the number of top block producers to consider
:returns: float that represents the ratio of blocks produced by the top n block producers (0 if there weren't any)
"""
total_blocks = sum(block_distribution)
return sum(block_distribution[:topn]) / total_blocks if total_blocks else 0
8 changes: 0 additions & 8 deletions consensus_decentralization/metrics/max_power_ratio.py

This file was deleted.

3 changes: 2 additions & 1 deletion consensus_decentralization/parsers/dummy_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@ def parse(self):
directory associated with the parser instance (specifically in <general output directory>/<project_name>)
"""
data = self.read_and_sort_data()

for block in data:
if 'identifiers' not in block.keys():
block['identifiers'] = None
else:
block['identifiers'] = self.parse_identifiers(block['identifiers'])
if 'reward_addresses' not in block.keys():
block['reward_addresses'] = None
return data
yield block
Loading