Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 137 additions & 0 deletions cardano_node_tests/block_production_graph.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
#!/usr/bin/env python3
"""Generate a bar chart of total blocks per backend from an SQLite database.
The script retrieves the latest run_id from the runs table, aggregates the total
blocks per backend from the blocks table, and generates a bar chart saved as an image file.
"""

import argparse
import contextlib
import pathlib as pl
import sqlite3
import sys

import matplotlib.container as mcontainer
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns


def get_latest_run_id(conn: sqlite3.Connection) -> str:
"""Return the latest run_id from the runs table.
Assumes "latest" means last inserted row (highest rowid).
"""
cur = conn.cursor()
cur.execute("SELECT run_id FROM runs ORDER BY rowid DESC LIMIT 1;")
row = cur.fetchone()
if row is None:
err = "No runs found in 'runs' table."
raise RuntimeError(err)
return str(row[0])


def get_blocks_per_backend(conn: sqlite3.Connection, *, run_id: str) -> list[tuple[str, int]]:
"""Return a list of (backend, total_blocks) for the given run_id.
Aggregates num_blocks across all epochs and pools.
"""
cur = conn.cursor()
cur.execute(
"""
SELECT backend, SUM(num_blocks) AS total_blocks
FROM blocks
WHERE run_id = ?
GROUP BY backend
ORDER BY total_blocks DESC;
""",
(run_id,),
)
rows = cur.fetchall()
if not rows:
err = f"No block data found in 'blocks' table for run_id={run_id}."
raise RuntimeError(err)
return rows


def plot_backend_blocks(
backend_data: list[tuple[str, int]], *, run_name: str, output_path: pl.Path
) -> None:
"""Plot a bar chart of total blocks per backend."""
backends = [b for b, _ in backend_data]
totals = [t for _, t in backend_data]
df = pd.DataFrame({"backend": backends, "total_blocks": totals})

sns.set_theme(style="whitegrid")

plt.figure(figsize=(8, 5))
ax = sns.barplot(data=df, x="backend", y="total_blocks")

ax.set_xlabel("Backend")
ax.set_ylabel("Total blocks over run")
ax.set_title(f"Total blocks per backend in run {run_name}")

# Annotate bars with values (type-narrow containers to BarContainer)
for c in ax.containers:
if isinstance(c, mcontainer.BarContainer):
ax.bar_label(c)

plt.tight_layout()
plt.savefig(output_path, dpi=150)
plt.close()


def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description=(
"Generate a bar chart of total blocks per backend for the latest run "
"from an SQLite database."
)
)
parser.add_argument(
"-d",
"--dbpath",
required=True,
help="Path to the SQLite database file.",
)
parser.add_argument(
"-n",
"--name",
required=True,
help="Name of the run (for labeling purposes).",
)
parser.add_argument(
"-o",
"--output",
required=True,
help="Output image filename.",
)
return parser.parse_args()


def main() -> int:
args = parse_args()
dbpath = pl.Path(args.dbpath)
output_path = pl.Path(args.output)

if not dbpath.exists():
print(f"Error: database file '{args.dbpath}' does not exist.", file=sys.stderr)
return 1

try:
with contextlib.closing(sqlite3.connect(dbpath)) as conn, conn:
run_id = get_latest_run_id(conn)
backend_data = get_blocks_per_backend(conn, run_id=run_id)
plot_backend_blocks(
backend_data=backend_data, run_name=args.name, output_path=output_path
)
except (sqlite3.Error, RuntimeError) as e:
print(f"Error: {e}", file=sys.stderr)
return 1

print(f"Saved graph to {args.output}")
return 0


if __name__ == "__main__":
sys.exit(main())
46 changes: 25 additions & 21 deletions cardano_node_tests/tests/test_blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,12 +263,20 @@ def payment_addrs(
)
return addrs

@pytest.fixture
def block_production_db(self) -> tp.Generator[sqlite3.Connection, None, None]:
"""Open block production db."""
conn = sqlite3.connect(configuration.BLOCK_PRODUCTION_DB)
yield conn
conn.close()

@allure.link(helpers.get_vcs_link())
def test_block_production( # noqa: C901
def test_block_production(
self,
cluster_manager: cluster_management.ClusterManager,
cluster: clusterlib.ClusterLib,
payment_addrs: list[clusterlib.AddressRecord],
block_production_db: sqlite3.Connection,
):
"""Record number of blocks produced by each pool over multiple epochs.

Expand All @@ -282,12 +290,9 @@ def test_block_production( # noqa: C901
temp_template = common.get_test_id(cluster)
rand = clusterlib.get_rand_str(5)
num_epochs = int(os.environ.get("BLOCK_PRODUCTION_EPOCHS") or 50)

topology = "p2p"
if configuration.MIXED_P2P:
topology = "mixed"
elif configuration.ENABLE_LEGACY:
topology = "legacy"
mixed_backends = (
configuration.MIXED_UTXO_BACKENDS.split() if configuration.MIXED_UTXO_BACKENDS else []
)

pool_mapping = {}
for idx, pn in enumerate(cluster_management.Resources.ALL_POOLS, start=1):
Expand All @@ -301,28 +306,29 @@ def test_block_production( # noqa: C901
delegation.delegate_stake_addr(
cluster_obj=cluster,
addrs_data=cluster_manager.cache.addrs_data,
temp_template=temp_template,
temp_template=f"{temp_template}_pool_{idx}",
pool_id=pool_id,
)

# Create sqlite db
conn = sqlite3.connect(configuration.BLOCK_PRODUCTION_DB)
conn = block_production_db
cur = conn.cursor()
cur.execute("CREATE TABLE IF NOT EXISTS runs(run_id, topology)")
cur.execute("CREATE TABLE IF NOT EXISTS runs(run_id, backend)")
cur.execute(
"CREATE TABLE IF NOT EXISTS"
" blocks(run_id, epoch_no, pool_id, pool_idx, topology, num_blocks)"
" blocks(run_id, epoch_no, pool_id, pool_idx, backend, num_blocks)"
)
cur.execute(
"INSERT INTO runs VALUES (?, ?)",
(rand, "mixed" if mixed_backends else configuration.UTXO_BACKEND),
)
cur.execute("INSERT INTO runs VALUES (?, ?)", (rand, topology))
conn.commit()
cur.close()

def _get_pool_topology(pool_idx: int) -> str:
if topology == "legacy":
return "legacy"
if topology == "mixed":
return "p2p" if pool_idx % 2 == 0 else "legacy"
return "p2p"
def _get_pool_utxo_backend(pool_idx: int) -> str:
if mixed_backends:
return mixed_backends[(pool_idx - 1) % len(mixed_backends)]
return configuration.UTXO_BACKEND

def _save_state(curr_epoch: int) -> None:
ledger_state = clusterlib_utils.get_ledger_state(cluster_obj=cluster)
Expand All @@ -345,7 +351,7 @@ def _save_state(curr_epoch: int) -> None:
curr_epoch - 1,
pool_rec["pool_id"],
pool_idx,
_get_pool_topology(pool_idx),
_get_pool_utxo_backend(pool_idx),
num_blocks,
),
)
Expand Down Expand Up @@ -387,8 +393,6 @@ def _save_state(curr_epoch: int) -> None:
# Save also data for the last epoch
_save_state(cluster.g_query.get_epoch())

conn.close()


@pytest.mark.skipif(configuration.ENABLE_LEGACY, reason="runs only on P2P enabled clusters")
class TestDynamicBlockProd:
Expand Down
Loading
Loading