Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 73 additions & 0 deletions cardano/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# Cardano Relay Analysis Pipeline

Automated pipeline to analyze the geographic distribution of Cardano relay nodes using Blockfrost, DNS resolution, geolocation APIs, and visual analytics.

## Overview

This pipeline extracts IP addresses from Cardano relay data, queries geolocation APIs, and generates visual analytics showing the geographic distribution of nodes by country, organization, and ASN.

## Requirements

It is recommended to use a dedicated Python virtual environment for this project. Create a `.venv` virtual environment in the project directory before installing dependencies:

```powershell
python -m venv .venv
.\.venv\Scripts\Activate.ps1
pip install -r requirements.txt
```

This ensures that all dependencies are installed in an isolated environment. The `run.py` script expects the `.venv` environment to be present.

You must set your Blockfrost API key in the BLOCKFROST_API_KEY environment variable before running the pipeline.

**Windows (PowerShell):**
```powershell
$env:BLOCKFROST_API_KEY = "your_blockfrost_api_key_here"
```
**Linux/macOS (bash):**
```bash
export BLOCKFROST_API_KEY="your_blockfrost_api_key_here"
```

If the environment variable is not set, the pipeline will display an error and exit.

## Quick Start

### Run the Complete Pipeline

```powershell
python run.py
```

This will execute all 4 steps:
1. Collect relay node data
2. Collect geolocation data
3. Parse data into CSVs
4. Generate plots

## Files

### Pipeline Scripts
- **`collect.py`** - Collects relay node data using Blockfrost
- **`collect_geodata.py`** - Queries geolocation APIs (ip-api.com, ipapi.is) for IP metadata
- **`parse.py`** - Parses geodata and creates CSV files for analysis
- **`plot.py`** - Generates pie charts showing geographic distribution
- **`run.py`** - Master script that runs all steps in sequence

## Output Files

Apart from `blockfrost_pools_relays.json`, saved in the main directory, all outputs are saved in the `output/` directory:

### JSON Data
- `blockfrost_pools_relays.json` - Raw relay and pool data collected from Blockfrost, used as the initial input for further processing.
- `geodata/cardano.json` - Geolocation metadata for each IP

### CSV Files
- `countries_cardano.csv` - Node distribution by country
- `organizations_cardano.csv` - Node distribution by hosting organization
- `asn_cardano.csv` - Node distribution by Autonomous System Number

### PNG Files
- `countries_cardano.png` - Pie chart of nodes by country
- `organizations_cardano.png` - Pie chart of nodes by organization
- `asn_cardano.png` - Pie chart of nodes by ASN
63 changes: 63 additions & 0 deletions cardano/collect.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
"""
Collects Cardano relay node data using the Blockfrost API.
The Blockfrost API key must be set in the BLOCKFROST_API_KEY environment variable.
"""
import os
import requests
import time
import logging

BLOCKFROST_API_KEY = os.environ.get("BLOCKFROST_API_KEY")
if not BLOCKFROST_API_KEY:
raise RuntimeError("BLOCKFROST_API_KEY environment variable not set. Please set your Blockfrost API key in the system environment.")
BASE_URL = "https://cardano-mainnet.blockfrost.io/api/v0"
HEADERS = {"project_id": BLOCKFROST_API_KEY}

logging.basicConfig(format='[%(asctime)s] %(message)s', datefmt='%Y/%m/%d %I:%M:%S %p', level=logging.INFO)

def get_all_pools():
"""Fetch all pool IDs from Blockfrost."""
pools = []
page = 1
while True:
url = f"{BASE_URL}/pools?page={page}&count=100"
resp = requests.get(url, headers=HEADERS)
if resp.status_code != 200:
logging.error(f"Failed to fetch pools: {resp.status_code} {resp.text}")
break
data = resp.json()
if not data:
break
pools.extend(data)
logging.info(f"Fetched {len(pools)} pools so far (page {page})")
page += 1
time.sleep(0.2) # Respect Blockfrost rate limits
return pools

def get_pool_relays(pool_id):
"""Fetch relay nodes for a given pool ID."""
url = f"{BASE_URL}/pools/{pool_id}/relays"
resp = requests.get(url, headers=HEADERS)
if resp.status_code != 200:
logging.warning(f"Failed to fetch relays for pool {pool_id}: {resp.status_code} {resp.text}")
return []
return resp.json()

def main():
pools = get_all_pools()
logging.info(f"Total pools fetched: {len(pools)}")
all_relays = {}
for i, pool_id in enumerate(pools, 1):
relays = get_pool_relays(pool_id)
all_relays[pool_id] = relays
if i % 50 == 0:
logging.info(f"Processed {i}/{len(pools)} pools...")
time.sleep(0.2) # Respect Blockfrost rate limits
# Save results
import json
with open("blockfrost_pools_relays.json", "w") as f:
json.dump(all_relays, f, indent=2)
logging.info("Saved all pool relays to blockfrost_pools_relays.json")

if __name__ == "__main__":
main()
147 changes: 147 additions & 0 deletions cardano/collect_geodata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
"""
Collect geodata for Cardano relay nodes.
"""
import json
import os
import time
import logging
from pathlib import Path
import network_decentralization.helper as hlp

logging.basicConfig(format='[%(asctime)s] %(message)s', datefmt='%Y/%m/%d %I:%M:%S %p', level=logging.INFO)


def load_cardano_nodes():
"""Load extracted Cardano nodes from JSON."""
nodes_file = Path(__file__).parent / 'output' / 'cardano_extracted_nodes.json'

if not nodes_file.exists():
logging.error(f'Cardano nodes file not found: {nodes_file}')
logging.info('Please run extract_ips.py first!')
return []

with open(nodes_file, 'r') as f:
data = json.load(f)

logging.info(f"Loaded {data['total_count']} Cardano nodes from {data['source']}")
return data['reachable_nodes']


def load_blockfrost_relays():
"""Load relays from blockfrost_pools_relays.json."""
relays_file = Path(__file__).parent / 'blockfrost_pools_relays.json'
if not relays_file.exists():
logging.error(f'Blockfrost relays file not found: {relays_file}')
return {}
with open(relays_file, 'r') as f:
return json.load(f)

def load_dns_resolved():
"""Load DNS resolved database as a list of dicts."""
dns_file = Path(__file__).parent / 'output' / 'dns_resolved.json'
if not dns_file.exists():
logging.error(f'DNS resolved file not found: {dns_file}')
return []
with open(dns_file, 'r') as f:
return json.load(f)

def collect_cardano_geodata():
"""Collect geodata for Cardano relay nodes."""
ledger = 'cardano'
logging.info(f'{ledger} - Collecting geodata')

# Setup output paths
output_dir = Path(__file__).parent / 'output'
geodata_dir = output_dir / 'geodata'
geodata_dir.mkdir(parents=True, exist_ok=True)

filename = geodata_dir / f'{ledger}.json'

# Load existing geodata if available
try:
with open(filename) as f:
geodata = json.load(f)
logging.info(f'Loaded existing geodata with {len(geodata)} entries')
except (FileNotFoundError, json.decoder.JSONDecodeError):
geodata = {}
logging.info('Starting fresh geodata collection')

# Load relays and DNS resolved DB
pool_relays = load_blockfrost_relays()
dns_db = load_dns_resolved()
# Build a lookup for (pool_id, dns_name, port) -> ip_address
dns_lookup = {(entry['pool_id'], entry['dns_name'], entry['port']): entry['ip_address'] for entry in dns_db}
# Build list of all relay IPs to process (unique per IP)
relay_targets = set()
for pool_id, relays in pool_relays.items():
for relay in relays:
ip = relay.get('ipv4')
if ip:
relay_targets.add(ip)
else:
dns_name = relay.get('dns')
port = relay.get('port', 3001)
if dns_name:
ip = dns_lookup.get((pool_id, dns_name, port))
if ip:
relay_targets.add(ip)
relay_targets = list(relay_targets) # Unique IPs only
if not relay_targets:
logging.error('No relay IPs to process!')
return
logging.info(f'{ledger} - Processing {len(relay_targets)} relay IPs')
processed = 0
skipped = 0
new_entries = 0
for node_ip in relay_targets:
if node_ip.endswith('onion'):
skipped += 1
continue
if node_ip in geodata:
skipped += 1
processed += 1
if processed % 100 == 0:
logging.info(f'Progress: {processed}/{len(relay_targets)} ({100*processed/len(relay_targets):.1f}%) - {new_entries} new, {skipped} skipped')
continue
if node_ip == 'Unresolved':
# Add to 'Unresolved' category in geodata
if 'Unresolved' not in geodata:
geodata['Unresolved'] = []
geodata['Unresolved'].append(node_ip)
skipped += 1
processed += 1
continue
try:
geodata[node_ip] = hlp.get_ip_geodata(node_ip)
new_entries += 1
with open(filename, 'w') as f:
json.dump(geodata, f, indent=4)
logging.debug(f'{ledger} - Collected geodata for {node_ip}')
processed += 1
if processed % 10 == 0:
logging.info(f'Progress: {processed}/{len(relay_targets)} ({100*processed/len(relay_targets):.1f}%) - {new_entries} new, {skipped} skipped')
time.sleep(1.5)
except Exception as e:
logging.error(f'Error processing {node_ip}: {e}')
processed += 1
continue
logging.info(f'{ledger} - Complete! Processed {processed}/{len(relay_targets)} relay IPs')
logging.info(f'{ledger} - New entries: {new_entries}, Skipped: {skipped}')
logging.info(f'{ledger} - Total geodata entries: {len(geodata)}')


def main():
start_time = time.time()

collect_cardano_geodata()

elapsed = time.time() - start_time
hours = int(elapsed / 3600)
mins = int((elapsed - hours*3600) / 60)
secs = int(elapsed - mins*60 - hours*3600)

print(f'\nTotal time: {hours:02}h {mins:02}m {secs:02}s')


if __name__ == '__main__':
main()
31 changes: 31 additions & 0 deletions cardano/network_decentralization/helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import requests
import time
import logging

logging.basicConfig(format='[%(asctime)s] %(message)s', datefmt='%Y/%m/%d %I:%M:%S %p', level=logging.INFO)


def get_ip_geodata(ip_addr):
"""
Retrieves geolocation and organization data for a given IP address using ip-api.com and ipapi.is. Handles rate limiting and retries until successful.
:param ip_addr: The IP address to query
:returns: A dictionary with geodata fields
"""
data = None
while not data:
r = requests.get(f'http://ip-api.com/json/{ip_addr}') # Max 45 HTTP requests per minute
try:
data = r.json()
if not data.get('org') and data.get('as'):
data['org'] = data['as'][data['as'].find(' ')+1:]
if not data.get('org') or not data.get('country'):
new_r = requests.get(f'https://api.ipapi.is/?q={ip_addr}') # Max 1000 HTTP requests per day
data = new_r.json()
if 'error' in data:
data = None
except requests.exceptions.JSONDecodeError:
pass
if data is None:
logging.error('Geodata rate limited, sleeping for 2 min...')
time.sleep(120)
return data
Loading