Skip to content

Commit 6ddb0f2

Browse files
committed
add message token count inspector
1 parent 41d1aea commit 6ddb0f2

File tree

1 file changed

+393
-0
lines changed

1 file changed

+393
-0
lines changed
Lines changed: 393 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,393 @@
1+
#!/usr/bin/env python3
2+
"""
3+
Show countAllMessageTokens-style token counts for each message in an OpenCode session.
4+
5+
Usage: opencode-message-token-counts [--session ID] [--json] [--no-color] [--db PATH]
6+
"""
7+
8+
from __future__ import annotations
9+
10+
import argparse
11+
import json
12+
import shutil
13+
import subprocess
14+
from pathlib import Path
15+
from typing import Optional
16+
17+
from opencode_api import APIError, add_api_arguments, create_client_from_args, list_sessions_across_projects
18+
19+
20+
SCRIPT_DIR = Path(__file__).resolve().parent
21+
REPO_ROOT = SCRIPT_DIR.parent
22+
23+
24+
class Colors:
25+
RESET = "\033[0m"
26+
BOLD = "\033[1m"
27+
DIM = "\033[2m"
28+
GREEN = "\033[32m"
29+
YELLOW = "\033[33m"
30+
CYAN = "\033[36m"
31+
32+
33+
NO_COLOR = Colors()
34+
for attr in dir(NO_COLOR):
35+
if not attr.startswith("_"):
36+
setattr(NO_COLOR, attr, "")
37+
38+
39+
def stringify_json(value) -> str:
40+
return json.dumps(value, separators=(",", ":"), ensure_ascii=False)
41+
42+
43+
def collapse_whitespace(text: str) -> str:
44+
return " ".join(text.split())
45+
46+
47+
def truncate(text: str, limit: int = 64) -> str:
48+
if len(text) <= limit:
49+
return text
50+
return text[: limit - 3] + "..."
51+
52+
53+
def get_terminal_width(default: int = 120) -> int:
54+
return max(80, shutil.get_terminal_size((default, 20)).columns)
55+
56+
57+
def short_message_id(message_id: str, limit: int = 14) -> str:
58+
return truncate(message_id or "-", limit)
59+
60+
61+
def preview_message(parts: list[dict]) -> str:
62+
for part in parts:
63+
if part.get("type") != "text":
64+
continue
65+
text = collapse_whitespace(part.get("text", ""))
66+
if not text:
67+
continue
68+
prefix = "[ignored] " if part.get("ignored", False) else ""
69+
return truncate(prefix + text)
70+
71+
tool_names = [part.get("tool", "tool") for part in parts if part.get("type") == "tool"]
72+
if tool_names:
73+
return truncate(f"[tools: {', '.join(tool_names[:3])}]")
74+
75+
for part in parts:
76+
part_type = part.get("type", "unknown")
77+
if part_type in {"step-start", "step-finish"}:
78+
continue
79+
if part_type == "tool":
80+
tool_name = part.get("tool", "tool")
81+
status = (part.get("state") or {}).get("status")
82+
suffix = f" {status}" if status else ""
83+
return f"[tool:{tool_name}{suffix}]"
84+
return f"[{part_type}]"
85+
86+
for part in parts:
87+
part_type = part.get("type", "unknown")
88+
return f"[{part_type}]"
89+
90+
return "[no content]"
91+
92+
93+
def extract_tool_content(part: dict) -> list[str]:
94+
contents: list[str] = []
95+
tool_name = part.get("tool")
96+
state = part.get("state") or {}
97+
98+
if tool_name == "question":
99+
questions = (state.get("input") or {}).get("questions")
100+
if questions is not None:
101+
content = questions if isinstance(questions, str) else stringify_json(questions)
102+
contents.append(content)
103+
return contents
104+
105+
if tool_name in {"edit", "write"}:
106+
if state.get("input") is not None:
107+
input_content = state["input"] if isinstance(state["input"], str) else stringify_json(state["input"])
108+
contents.append(input_content)
109+
110+
if state.get("status") == "completed" and state.get("output") is not None:
111+
output = state["output"]
112+
contents.append(output if isinstance(output, str) else stringify_json(output))
113+
elif state.get("status") == "error" and state.get("error") is not None:
114+
error = state["error"]
115+
contents.append(error if isinstance(error, str) else stringify_json(error))
116+
117+
return contents
118+
119+
120+
def collect_message_segments(message: dict) -> tuple[list[str], int, int, list[str]]:
121+
segments: list[str] = []
122+
text_segments = 0
123+
tool_segments = 0
124+
part_types: list[str] = []
125+
126+
for part in message.get("parts", []):
127+
part_type = part.get("type", "unknown")
128+
part_types.append(part_type)
129+
if part_type == "text":
130+
text = part.get("text", "")
131+
if text:
132+
segments.append(text)
133+
text_segments += 1
134+
continue
135+
136+
tool_contents = extract_tool_content(part)
137+
segments.extend(tool_contents)
138+
tool_segments += len(tool_contents)
139+
140+
return segments, text_segments, tool_segments, part_types
141+
142+
143+
def fallback_count_tokens(text: str) -> int:
144+
if not text:
145+
return 0
146+
return round(len(text) / 4)
147+
148+
149+
def count_tokens_batch(texts: list[str]) -> tuple[list[int], str]:
150+
if not texts:
151+
return [], "anthropic"
152+
153+
node_script = """
154+
import { countTokens } from \"@anthropic-ai/tokenizer\";
155+
import { readFileSync } from \"node:fs\";
156+
157+
const texts = JSON.parse(readFileSync(0, \"utf8\"));
158+
const counts = texts.map((text) => countTokens(text || \"\"));
159+
process.stdout.write(JSON.stringify(counts));
160+
""".strip()
161+
162+
try:
163+
proc = subprocess.run(
164+
["node", "--input-type=module", "-e", node_script],
165+
input=stringify_json(texts),
166+
capture_output=True,
167+
text=True,
168+
cwd=REPO_ROOT,
169+
check=True,
170+
timeout=15,
171+
)
172+
counts = json.loads(proc.stdout)
173+
if isinstance(counts, list) and len(counts) == len(texts):
174+
return [int(count) for count in counts], "anthropic"
175+
except (subprocess.SubprocessError, FileNotFoundError, json.JSONDecodeError, ValueError):
176+
pass
177+
178+
return [fallback_count_tokens(text) for text in texts], "approximate"
179+
180+
181+
def get_most_recent_session(client, session_list_limit: int) -> Optional[dict]:
182+
sessions = list_sessions_across_projects(client, per_project_limit=session_list_limit)
183+
return sessions[0] if sessions else None
184+
185+
186+
def analyze_session(client, session: dict) -> dict:
187+
session_id = session["id"]
188+
messages = client.get_session_messages(session_id, directory=session.get("directory"))
189+
190+
analyzed_messages = []
191+
count_inputs: list[str] = []
192+
for index, message in enumerate(messages, 1):
193+
info = message.get("info", {})
194+
segments, text_segments, tool_segments, part_types = collect_message_segments(message)
195+
count_inputs.append(" ".join(segments))
196+
analyzed_messages.append(
197+
{
198+
"index": index,
199+
"message_id": info.get("id", ""),
200+
"role": info.get("role", "unknown"),
201+
"part_count": len(message.get("parts", [])),
202+
"part_types": part_types,
203+
"counted_segments": len(segments),
204+
"text_segments": text_segments,
205+
"tool_segments": tool_segments,
206+
"preview": preview_message(message.get("parts", [])),
207+
}
208+
)
209+
210+
counts, tokenizer = count_tokens_batch(count_inputs)
211+
total_tokens = 0
212+
nonzero_messages = 0
213+
max_tokens = 0
214+
215+
for message_data, count in zip(analyzed_messages, counts):
216+
message_data["tokens"] = count
217+
total_tokens += count
218+
if count > 0:
219+
nonzero_messages += 1
220+
max_tokens = max(max_tokens, count)
221+
222+
return {
223+
"session_id": session_id,
224+
"title": session.get("title", "Unknown"),
225+
"tokenizer": tokenizer,
226+
"messages": analyzed_messages,
227+
"total_messages": len(analyzed_messages),
228+
"messages_with_tokens": nonzero_messages,
229+
"messages_without_tokens": len(analyzed_messages) - nonzero_messages,
230+
"total_tokens": total_tokens,
231+
"max_message_tokens": max_tokens,
232+
}
233+
234+
235+
def format_token_count(count: int, colors: Colors) -> str:
236+
c = colors
237+
if count == 0:
238+
return f"{c.DIM}{count:>10,}{c.RESET}"
239+
return f"{count:>10,}"
240+
241+
242+
def format_role(role: str, colors: Colors, width: int = 9) -> str:
243+
c = colors
244+
label = f"{role:<{width}}"
245+
if role == "user":
246+
return f"{c.CYAN}{label}{c.RESET}"
247+
if role == "assistant":
248+
return f"{c.GREEN}{label}{c.RESET}"
249+
return f"{c.YELLOW}{label}{c.RESET}"
250+
251+
252+
def format_size_indicator(count: int, max_count: int, width: int = 8) -> str:
253+
if max_count <= 0:
254+
return f"{'.' * width} 0%"
255+
256+
pct = round((count / max_count) * 100)
257+
if count <= 0:
258+
filled = 0
259+
else:
260+
filled = max(1, round((count / max_count) * width))
261+
filled = min(width, filled)
262+
return f"{'#' * filled}{'.' * (width - filled)} {pct:>3}%"
263+
264+
265+
def largest_messages(messages: list[dict], limit: int = 5) -> list[dict]:
266+
return sorted(messages, key=lambda message: message.get("tokens", 0), reverse=True)[:limit]
267+
268+
269+
def print_wide_message_table(result: dict, colors: Colors, width: int):
270+
c = colors
271+
messages = result["messages"]
272+
preview_width = max(24, width - 72)
273+
274+
print(
275+
f"{c.BOLD}{'#':>3} {'Role':<9} {'Tokens':>10} {'Size':<12} {'Seg/Part':<8} {'ID':<14} Preview{c.RESET}"
276+
)
277+
print("-" * width)
278+
279+
for message in messages:
280+
preview = truncate(message["preview"], preview_width)
281+
mix = f"{message['counted_segments']}/{message['part_count']}"
282+
print(
283+
f"{message['index']:>3} "
284+
f"{format_role(message['role'], c, 9)} "
285+
f"{format_token_count(message['tokens'], c)} "
286+
f"{format_size_indicator(message['tokens'], result['max_message_tokens']):<12} "
287+
f"{mix:<8} "
288+
f"{c.DIM}{short_message_id(message['message_id']):<14}{c.RESET} "
289+
f"{preview}"
290+
)
291+
292+
293+
def print_compact_message_list(result: dict, colors: Colors, width: int):
294+
c = colors
295+
messages = result["messages"]
296+
meta_width = max(18, width - 6)
297+
preview_width = max(32, width - 8)
298+
299+
print(f"{c.BOLD}Messages{c.RESET}")
300+
print("-" * width)
301+
302+
for message in messages:
303+
tokens = f"{message['tokens']:,} tokens"
304+
size = format_size_indicator(message["tokens"], result["max_message_tokens"])
305+
mix = f"{message['counted_segments']}/{message['part_count']} seg/part"
306+
meta = truncate(f"{tokens} {size} {mix}", meta_width)
307+
preview = truncate(message["preview"], preview_width)
308+
309+
print(f"{message['index']:>3} {format_role(message['role'], c, 9)} {meta}")
310+
print(f" {c.DIM}{short_message_id(message['message_id'])}{c.RESET} {preview}")
311+
312+
313+
def print_highlights(result: dict, colors: Colors, width: int):
314+
c = colors
315+
heavy_messages = [message for message in largest_messages(result["messages"]) if message.get("tokens", 0) > 0]
316+
if not heavy_messages:
317+
return
318+
319+
print(f"\n{c.BOLD}Largest messages{c.RESET}")
320+
print("-" * width)
321+
for message in heavy_messages:
322+
print(
323+
f" #{message['index']:<3} {format_role(message['role'], c, 9)} "
324+
f"{message['tokens']:>10,} {truncate(message['preview'], max(30, width - 33))}"
325+
)
326+
327+
328+
def print_message_tokens(result: dict, colors: Colors):
329+
c = colors
330+
width = get_terminal_width()
331+
print(f"{c.BOLD}{'=' * width}{c.RESET}")
332+
print(f"{c.BOLD}SESSION MESSAGE TOKEN COUNTS{c.RESET}")
333+
print(f"{c.BOLD}{'=' * width}{c.RESET}\n")
334+
print(f" Session: {c.CYAN}{result['session_id']}{c.RESET}")
335+
print(f" Title: {result['title']}")
336+
print(f" Messages: {result['total_messages']}")
337+
print(f" Tokenizer: {result['tokenizer']}")
338+
print(f" Total: {result['total_tokens']:,} tokens")
339+
print(f" Largest: {result['max_message_tokens']:,} tokens\n")
340+
341+
if not result["messages"]:
342+
print(" No messages found in this session.")
343+
return
344+
345+
if width >= 110:
346+
print_wide_message_table(result, c, width)
347+
else:
348+
print_compact_message_list(result, c, width)
349+
350+
print("-" * width)
351+
print_highlights(result, c, width)
352+
print(f"\n{c.BOLD}SESSION SUMMARY{c.RESET}")
353+
print(f" Total message tokens: {result['total_tokens']:,}")
354+
print(f" Messages with tokens: {result['messages_with_tokens']:,}")
355+
print(f" Empty messages: {result['messages_without_tokens']:,}")
356+
print(f" Largest message: {result['max_message_tokens']:,}")
357+
358+
359+
def main() -> int:
360+
parser = argparse.ArgumentParser(
361+
description="Show countAllMessageTokens-style token counts for each message in an OpenCode session"
362+
)
363+
parser.add_argument("--session", "-s", type=str, default=None, help="Session ID to analyze (default: most recent)")
364+
parser.add_argument("--json", "-j", action="store_true", help="Output as JSON")
365+
parser.add_argument("--no-color", action="store_true", help="Disable colored output")
366+
add_api_arguments(parser)
367+
args = parser.parse_args()
368+
369+
try:
370+
with create_client_from_args(args) as client:
371+
if args.session is None:
372+
session = get_most_recent_session(client, args.session_list_limit)
373+
if session is None:
374+
print("Error: No sessions found")
375+
return 1
376+
else:
377+
session = client.get_session(args.session)
378+
result = analyze_session(client, session)
379+
except APIError as err:
380+
print(f"Error: {err}")
381+
return 1
382+
383+
if args.json:
384+
print(json.dumps(result, indent=2, ensure_ascii=False))
385+
else:
386+
colors = NO_COLOR if args.no_color else Colors()
387+
print_message_tokens(result, colors)
388+
389+
return 0
390+
391+
392+
if __name__ == "__main__":
393+
raise SystemExit(main())

0 commit comments

Comments
 (0)