Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,13 @@ LLM_API_KEY=your-llm-api-key
# LLM_BASE_URL=https://api.venice.ai/api/v1 # auto-set for known providers
# LLM_MODEL=grok-41-fast # auto-set for known providers

# Dune (hourly large-transfer monitor)
DUNE_API_KEY=your-dune-api-key
DUNE_LARGE_TRANSFERS_QUERY_ID=1234567
# DUNE_LARGE_TRANSFER_THRESHOLD=5000000

# Global settings
LOG_LEVEL=INFO # DEBUG, INFO, WARNING, ERROR (DEBUG skips Telegram sends)
REQUEST_TIMEOUT=30
RETRY_COUNT=3
BACKOFF_FACTOR=1.0
BACKOFF_FACTOR=1.0
2 changes: 2 additions & 0 deletions .github/workflows/_run-monitoring.yml
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ env:
LLM_API_KEY: ${{ secrets.LLM_API_KEY }}
LLM_MODEL: ${{ secrets.LLM_MODEL }}
LLM_PROVIDER: ${{ secrets.LLM_PROVIDER }}
DUNE_API_KEY: ${{ secrets.DUNE_API_KEY }}
DUNE_LARGE_TRANSFERS_QUERY_ID: ${{ vars.DUNE_LARGE_TRANSFERS_QUERY_ID }}

jobs:
run:
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/hourly.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ jobs:
# silo/ur_sniff.py
usdai/main.py
usdai/large_mints.py
stables/dune_large_transfers.py
yearn/alert_large_flows.py
maple/main.py
timelock/timelock_alerts.py
Expand Down
201 changes: 201 additions & 0 deletions stables/dune_large_transfers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
#!/usr/bin/env python3
"""Hourly large-transfer monitor backed by a Dune query result.

Expected Dune query output columns:
- block_time
- blockchain
- tx_hash
- from
- to
- contract_address
- symbol
- amount
- amount_usd

Rows are sorted defensively by ``block_time`` descending before deduping and
alerting.
"""

from __future__ import annotations

import os
from typing import Any

from dune_client.client import DuneClient
from dune_client.query import QueryBase

from utils.alert import Alert, AlertSeverity, send_alert
from utils.cache import cache_filename, get_last_value_for_key_from_file, write_last_value_to_file
from utils.config import Config
from utils.logging import get_logger

logger = get_logger("stables.dune_large_transfers")
PROTOCOL = "stables"

CACHE_KEY_LAST_TX = "stables_dune_large_transfers_last_tx"
MAX_ROWS_PER_PROTOCOL_ALERT = 10
DEFAULT_LARGE_TRANSFER_THRESHOLD = 5_000_000.0

# Route each token to its owning protocol channel.
TOKEN_ROUTE: dict[tuple[str, str], tuple[str, str]] = {
("ethereum", "0xcccc62962d17b8914c62d74ffb843d73b2a3cccc"): ("cUSD", "cap"),
("ethereum", "0x48f9e38f3070ad8945dfeae3fa70987722e3d89c"): ("iUSD", "infinifi"),
("arbitrum", "0x0a1a1a107e45b7ced86833863f482bc5f4ed82ef"): ("USDai", "usdai"),
}

CHAIN_TX_EXPLORER: dict[str, str] = {
"ethereum": "https://etherscan.io/tx/",
"arbitrum": "https://arbiscan.io/tx/",
"optimism": "https://optimistic.etherscan.io/tx/",
"base": "https://basescan.org/tx/",
"polygon": "https://polygonscan.com/tx/",
}


def _as_str(value: Any) -> str:
if value is None:
return ""
return str(value)


def _tx_link(blockchain: str, tx_hash: str) -> str:
prefix = CHAIN_TX_EXPLORER.get(blockchain.lower())
if not prefix:
return tx_hash
return f"{prefix}{tx_hash}"


def _row_key(row: dict[str, Any]) -> str:
tx_hash = _as_str(row.get("tx_hash")).lower()
contract = _as_str(row.get("contract_address")).lower()
log_index = _as_str(row.get("log_index"))
parts = [tx_hash, contract]
if log_index:
parts.append(log_index)
return "|".join(parts)


def _route_for_row(row: dict[str, Any]) -> tuple[str, str] | None:
chain = _as_str(row.get("blockchain")).lower()
addr = _as_str(row.get("contract_address")).lower()
return TOKEN_ROUTE.get((chain, addr))


def _build_row_line(row: dict[str, Any]) -> str:
chain = _as_str(row.get("blockchain"))
symbol = _as_str(row.get("symbol")) or "unknown"
amount = row.get("amount")
amount_usd = row.get("amount_usd")
tx_hash = _as_str(row.get("tx_hash"))
link = _tx_link(chain, tx_hash)
return f"- {symbol} on {chain}: amount={amount}, amount_usd={amount_usd}, tx={link}"


def _group_rows_by_protocol(rows: list[dict[str, Any]]) -> dict[str, list[dict[str, Any]]]:
grouped: dict[str, list[dict[str, Any]]] = {}
for row in rows:
route = _route_for_row(row)
if route is None:
continue
_, protocol = route
grouped.setdefault(protocol, []).append(row)
return grouped


def _to_float(value: Any) -> float:
try:
return float(value)
except (TypeError, ValueError):
return 0.0


def _is_large_transfer(row: dict[str, Any], threshold: float) -> bool:
amount_usd = _to_float(row.get("amount_usd"))
return amount_usd >= threshold


def _sort_rows_newest_first(rows: list[dict[str, Any]]) -> list[dict[str, Any]]:
return sorted(rows, key=lambda row: _as_str(row.get("block_time")), reverse=True)


def _new_rows_since_last_seen(rows: list[dict[str, Any]], last_key: str) -> list[dict[str, Any]]:
if not last_key:
return rows

new_rows: list[dict[str, Any]] = []
for row in rows:
if _row_key(row) == last_key:
break
new_rows.append(row)
return new_rows


def main() -> None:
api_key = os.getenv("DUNE_API_KEY")
query_id = Config.get_env_int("DUNE_LARGE_TRANSFERS_QUERY_ID", 0)
threshold = Config.get_env_float("DUNE_LARGE_TRANSFER_THRESHOLD", DEFAULT_LARGE_TRANSFER_THRESHOLD)

if not api_key:
logger.warning("DUNE_API_KEY is not set; skipping Dune large transfer monitor")
return
if query_id <= 0:
logger.warning("DUNE_LARGE_TRANSFERS_QUERY_ID is not set; skipping Dune large transfer monitor")
return

try:
dune = DuneClient(api_key)
result = dune.run_query(QueryBase(query_id=query_id, name="stables_large_transfers"), ping_frequency=2)
rows = list(result.result.rows) if result and result.result and result.result.rows else []
except Exception as exc:
logger.error("Failed to fetch Dune large-transfer query result: %s", exc)
send_alert(
Alert(
AlertSeverity.MEDIUM,
f"Dune large-transfer monitor failed while querying Dune: {exc}",
PROTOCOL,
),
plain_text=True,
)
return

if not rows:
logger.info("No large transfers returned by Dune query_id=%s", query_id)
return

alert_rows = [
row
for row in _sort_rows_newest_first(rows)
if _is_large_transfer(row, threshold) and _route_for_row(row) is not None
]
if not alert_rows:
logger.info("No routed rows matched large-transfer threshold >= %s", threshold)
return

last_key = _as_str(get_last_value_for_key_from_file(cache_filename, CACHE_KEY_LAST_TX))
new_alert_rows = _new_rows_since_last_seen(alert_rows, last_key)
if not new_alert_rows:
logger.info("No new large-transfer rows since last run")
return

grouped = _group_rows_by_protocol(new_alert_rows)
total_rows = len(new_alert_rows)
for protocol, protocol_rows in grouped.items():
route = _route_for_row(protocol_rows[0])
if route is None:
continue
first_symbol, _ = route
included_rows = protocol_rows[:MAX_ROWS_PER_PROTOCOL_ALERT]
lines = [_build_row_line(row) for row in included_rows]
message = (
f"*Dune Large Transfer Alert ({first_symbol}/{protocol})*\n\n"
f"Query ID: {query_id}\n"
f"Matched rows: {total_rows}\n"
f"Included in this alert: {len(included_rows)}\n\n" + "\n".join(lines)
)
send_alert(Alert(AlertSeverity.HIGH, message, protocol), plain_text=True)

write_last_value_to_file(cache_filename, CACHE_KEY_LAST_TX, _row_key(alert_rows[0]))


if __name__ == "__main__":
main()
63 changes: 63 additions & 0 deletions tests/test_dune_large_transfers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
"""Tests for the Dune-backed large transfer monitor."""

from stables import dune_large_transfers as monitor


def _row(**overrides):
row = {
"block_time": "2026-05-12 14:30:00.000 UTC",
"blockchain": "ethereum",
"tx_hash": "0xABC",
"contract_address": "0xCCCC62962d17b8914c62D74FfB843d73B2a3cCCc",
"symbol": "cUSD",
"amount": "5000000",
"amount_usd": "5000000",
}
row.update(overrides)
return row


def test_row_key_excludes_colon_timestamp_from_cache_value():
key = monitor._row_key(_row(block_time="2026-05-12 14:30:00.000 UTC"))

assert ":" not in key
assert key == "0xabc|0xcccc62962d17b8914c62d74ffb843d73b2a3cccc"


def test_row_key_uses_log_index_when_present():
key = monitor._row_key(_row(log_index=17))

assert key.endswith("|17")


def test_route_for_row_returns_protocol_for_known_token():
assert monitor._route_for_row(_row()) == ("cUSD", "cap")


def test_route_for_row_skips_unknown_token_instead_of_stables_fallback():
row = _row(contract_address="0x0000000000000000000000000000000000000000")

assert monitor._route_for_row(row) is None


def test_is_large_transfer_requires_positive_usd_amount():
row = _row(amount="5000000", amount_usd="0")

assert monitor._is_large_transfer(row, 5_000_000) is False


def test_new_rows_since_last_seen_only_returns_new_prefix():
newest = _row(tx_hash="0xnew", block_time="2026-05-12 15:00:00.000 UTC")
previous = _row(tx_hash="0xold", block_time="2026-05-12 14:00:00.000 UTC")
older = _row(tx_hash="0xolder", block_time="2026-05-12 13:00:00.000 UTC")

rows = [newest, previous, older]

assert monitor._new_rows_since_last_seen(rows, monitor._row_key(previous)) == [newest]


def test_sort_rows_newest_first_defends_dedup_order():
newest = _row(tx_hash="0xnew", block_time="2026-05-12 15:00:00.000 UTC")
older = _row(tx_hash="0xold", block_time="2026-05-12 14:00:00.000 UTC")

assert monitor._sort_rows_newest_first([older, newest]) == [newest, older]