-
Notifications
You must be signed in to change notification settings - Fork 40
Expand file tree
/
Copy pathstreaming_websocket.py
More file actions
87 lines (69 loc) · 3.12 KB
/
streaming_websocket.py
File metadata and controls
87 lines (69 loc) · 3.12 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import asyncio
from pytonapi.streaming import (
AccountStateNotification,
ActionsNotification,
Finality,
JettonsNotification,
TonapiWebSocket,
TransactionsNotification,
)
from pytonapi.types import Network
# TONAPI key — get one at https://tonconsole.com/
API_KEY = "YOUR_API_KEY"
# Target network — MAINNET or TESTNET
NETWORK = Network.MAINNET
# Account addresses to track — any form (raw, bounceable, non-bounceable)
ACCOUNT = "UQBAjaOyi2wGWlk-EDkSabqqnF-MrrwMadnwqrurKpkla4QB"
ANOTHER_ACCOUNT = "EQDtFpEwcFAEcRe5mLVh2N6C0x-_hJEM7W61_JLnSF74p4q2"
async def main() -> None:
client = TonapiWebSocket(API_KEY, NETWORK)
# Register handlers — same decorator API as SSE
# WebSocket additionally supports dynamic subscription changes (see below)
@client.on_transactions(min_finality=Finality.FINALIZED)
async def handle_transactions(event: TransactionsNotification) -> None:
for tx in event.transactions:
print(f"TX: {tx.get('hash', '?')} | finality={event.finality}")
@client.on_actions(min_finality=Finality.CONFIRMED)
async def handle_actions(event: ActionsNotification) -> None:
for action in event.actions:
print(f"Action: {action.get('type', '?')} | finality={event.finality}")
@client.on_account_states(min_finality=Finality.FINALIZED)
async def handle_account_state(event: AccountStateNotification) -> None:
balance = event.state.balance if event.state else "?"
print(f"Account state: {event.account} | balance={balance}")
@client.on_jettons(min_finality=Finality.FINALIZED)
async def handle_jettons(event: JettonsNotification) -> None:
if event.jetton:
print(f"Jetton: {event.jetton.jetton} | balance={event.jetton.balance}")
# Dynamic subscription change — WebSocket exclusive feature
# After start() establishes the connection, you can change what you're subscribed to
# without reconnecting. SSE does not support this.
async def change_subscription_later() -> None:
# Wait until connected
await client.wait_subscribed(timeout=30)
print("Connected! Waiting 15s before changing subscription...")
await asyncio.sleep(15)
# Replace current subscription with a new one (snapshot semantics)
# This replaces ALL addresses and filters — not incremental
print("Switching subscription to another account...")
await client.dynamic_subscribe(
addresses=[ANOTHER_ACCOUNT],
types=["transactions", "actions"],
min_finality=Finality.FINALIZED,
)
print("Subscription updated.")
await asyncio.sleep(30)
await client.stop()
# Run dynamic subscription change in background
background_tasks: set[asyncio.Task[None]] = set()
task = asyncio.create_task(change_subscription_later())
background_tasks.add(task)
task.add_done_callback(background_tasks.discard)
print("Subscribing via WebSocket...")
await client.start(
addresses=[ACCOUNT],
include_address_book=True,
)
print("Stopped.")
if __name__ == "__main__":
asyncio.run(main())