-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathassembler.py
More file actions
238 lines (205 loc) · 10.5 KB
/
assembler.py
File metadata and controls
238 lines (205 loc) · 10.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
"""
assembler.py — Context assembly policy for the tag-context system.
Builds a context window from a combination of recent messages (recency layer)
and tag-retrieved messages (topic layer), packed to a token budget.
Tag IDF filtering: tags appearing in >30% of the corpus are treated as stop
words for topic retrieval (they carry no discriminating signal). This threshold
is configurable via TOPIC_TAG_MAX_CORPUS_FREQ.
"""
from dataclasses import dataclass
from typing import List, Optional
from store import Message, MessageStore
# Tags appearing in more than this fraction of the corpus are skipped in
# topic retrieval. At >30% they're effectively stop words (e.g. "code",
# "openclaw" in a corpus of AI assistant interactions).
TOPIC_TAG_MAX_CORPUS_FREQ = 0.30
# A single message will not be included if it exceeds this fraction of the
# total token budget — prevents one giant turn from consuming the entire window.
# The "always include first" safety valve is also capped at this size.
MAX_SINGLE_MSG_BUDGET_FRACTION = 0.35
def _estimate_tokens(msg: Message) -> int:
"""Estimate tokens for a message (use stored count or word-count proxy)."""
if msg.token_count > 0:
return msg.token_count
words = len((msg.user_text + " " + msg.assistant_text).split())
return max(1, int(words * 1.3))
@dataclass
class AssemblyResult:
"""Result of a context assembly operation."""
messages: List[Message] # oldest-first, ready to use as context
total_tokens: int
sticky_count: int # how many came from the sticky layer
recency_count: int # how many came from the recency layer
topic_count: int # how many came from the topic layer
tags_used: List[str] # tags that contributed to topic layer
class ContextAssembler:
"""
Assembles context for an incoming message from three layers:
1. Sticky layer (up to 30% of budget) — pinned messages (tool chains, explicit pins)
2. Recency layer (20-25% of budget) — most recent messages regardless of tag
3. Topic layer (50-75% of budget) — messages retrieved by inferred tags,
deduplicated against sticky + recency
When sticky layer is empty, budget reallocates to recency (25%) and topic (75%).
Final result is sorted oldest-first for natural reading order.
"""
def __init__(self, store: MessageStore, token_budget: int = 4000) -> None:
self.store = store
self.token_budget = token_budget
def assemble(self, incoming_text: str,
inferred_tags: List[str],
pinned_message_ids: Optional[List[str]] = None) -> AssemblyResult:
"""
Build a context window for `incoming_text` given `inferred_tags`.
Parameters
----------
incoming_text : str
The user's new message (used only for future tag inference hooks).
inferred_tags : List[str]
Tags inferred for the incoming message by the tagger.
pinned_message_ids : Optional[List[str]]
Message IDs that should be pinned in the sticky layer.
If None, sticky layer is skipped.
"""
# ── Sticky layer ───────────────────────────────────────────────────
sticky_msgs: List[Message] = []
sticky_tokens = 0
if pinned_message_ids:
sticky_budget = int(self.token_budget * 0.3)
# Try to fetch by external_id first (for OpenClaw IDs), fall back to internal ID
for msg_id in pinned_message_ids:
msg = self.store.get_by_external_id(msg_id)
if msg is None:
# Fallback to internal ID lookup for backwards compatibility
msg = self.store.get_by_id(msg_id)
if msg is None:
continue
cost = _estimate_tokens(msg)
if sticky_tokens + cost > sticky_budget:
break
sticky_msgs.append(msg)
sticky_tokens += cost
# ── Budget allocation ──────────────────────────────────────────────
remaining_budget = self.token_budget - sticky_tokens
if sticky_msgs:
# When sticky is active: recency 20%, topic 50%+remainder
recency_budget = int(remaining_budget * 0.25)
else:
# When no sticky: recency 25%, topic 75% (original behavior)
recency_budget = int(remaining_budget * 0.25)
topic_budget = remaining_budget - recency_budget
# ── Recency layer ──────────────────────────────────────────────────
seen_ids = {m.id for m in sticky_msgs}
recency_msgs: List[Message] = []
recency_tokens = 0
# Cap: no single message may exceed this fraction of the total budget.
single_msg_cap = int(self.token_budget * MAX_SINGLE_MSG_BUDGET_FRACTION)
first_recency = True
for msg in self.store.get_recent(10):
if msg.id in seen_ids:
continue
cost = _estimate_tokens(msg)
# Skip messages that are individually larger than the per-message cap
# (e.g. giant PR-review turns). Use summary if available, otherwise skip.
if cost > single_msg_cap:
if msg.summary:
# Use the summary instead — create a lightweight proxy message
summary_msg = Message(
id=msg.id,
session_id=msg.session_id,
user_id=msg.user_id,
timestamp=msg.timestamp,
user_text=f"[Summary of large message]",
assistant_text=msg.summary,
tags=msg.tags,
token_count=len(msg.summary.split()),
external_id=msg.external_id,
summary=None
)
cost = _estimate_tokens(summary_msg)
if not first_recency and recency_tokens + cost > recency_budget:
break
recency_msgs.append(summary_msg)
recency_tokens += cost
seen_ids.add(msg.id)
first_recency = False
continue
if not first_recency and recency_tokens + cost > recency_budget:
break
recency_msgs.append(msg)
recency_tokens += cost
seen_ids.add(msg.id)
first_recency = False
# ── Topic layer ────────────────────────────────────────────────────
topic_candidates: List[Message] = []
# IDF filtering: skip tags that are too common to be discriminating.
# Tags in >30% of corpus are stop words — they retrieve nearly everything,
# blowing the token budget on low-relevance messages.
#
# Use tag_counts sum as corpus size proxy — avoids fetching all rows.
# tag_counts() returns {tag: count} where count = messages with that tag.
# Total unique messages ≈ max tag count (most frequent tag upper-bounds corpus).
tag_counts = self.store.tag_counts()
total_messages = max(tag_counts.values()) if tag_counts else 1
if total_messages == 0:
total_messages = 1 # avoid div-by-zero
useful_tags = [
t for t in inferred_tags
if tag_counts.get(t, 0) / total_messages <= TOPIC_TAG_MAX_CORPUS_FREQ
]
# Fall back to all tags if every tag is high-frequency (small corpus)
if not useful_tags and inferred_tags:
# Sort by ascending frequency and take the bottom half
useful_tags = sorted(inferred_tags, key=lambda t: tag_counts.get(t, 0))
useful_tags = useful_tags[: max(1, len(useful_tags) // 2)]
for tag in useful_tags:
for msg in self.store.get_by_tag(tag, limit=20):
if msg.id not in seen_ids:
topic_candidates.append(msg)
seen_ids.add(msg.id)
# newest-first within topic candidates, then pack to budget
topic_candidates.sort(key=lambda m: m.timestamp, reverse=True)
topic_msgs: List[Message] = []
topic_tokens = 0
first_topic = True
for msg in topic_candidates:
cost = _estimate_tokens(msg)
# Skip messages exceeding the per-message cap in topic layer too.
# Use summary if available, otherwise skip.
if cost > single_msg_cap:
if msg.summary:
# Use the summary instead — create a lightweight proxy message
summary_msg = Message(
id=msg.id,
session_id=msg.session_id,
user_id=msg.user_id,
timestamp=msg.timestamp,
user_text=f"[Summary of large message]",
assistant_text=msg.summary,
tags=msg.tags,
token_count=len(msg.summary.split()),
external_id=msg.external_id,
summary=None
)
cost = _estimate_tokens(summary_msg)
if not first_topic and topic_tokens + cost > topic_budget:
break
topic_msgs.append(summary_msg)
topic_tokens += cost
first_topic = False
continue
if not first_topic and topic_tokens + cost > topic_budget:
break
topic_msgs.append(msg)
topic_tokens += cost
first_topic = False
# ── Combine + sort oldest-first ────────────────────────────────────
all_msgs = sticky_msgs + recency_msgs + topic_msgs
all_msgs.sort(key=lambda m: m.timestamp)
return AssemblyResult(
messages=all_msgs,
total_tokens=sticky_tokens + recency_tokens + topic_tokens,
sticky_count=len(sticky_msgs),
recency_count=len(recency_msgs),
topic_count=len(topic_msgs),
tags_used=useful_tags,
)