Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 65 additions & 41 deletions src/agent/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,19 +63,20 @@ impl ChannelState {
/// Cancel a running worker by aborting its tokio task and cleaning up state.
/// Returns an error message if the worker is not found.
pub async fn cancel_worker(&self, worker_id: WorkerId) -> std::result::Result<(), String> {
let handle = self.worker_handles.write().await.remove(&worker_id);
let removed = self
.active_workers
.write()
.await
.remove(&worker_id)
.is_some();
let handle = self.worker_handles.write().await.remove(&worker_id);
self.worker_inputs.write().await.remove(&worker_id);
let status_removed = self.status_block.write().await.remove_worker(worker_id);

if let Some(handle) = handle {
handle.abort();
Ok(())
} else if removed {
} else if removed || status_removed {
// Worker was in active_workers but had no handle (shouldn't happen, but handle gracefully)
Ok(())
} else {
Expand Down Expand Up @@ -391,8 +392,11 @@ impl Channel {

if messages.len() == 1 {
// Single message - process normally
let message = messages.into_iter().next().ok_or_else(|| anyhow::anyhow!("empty iterator after length check"))?;
self.handle_message(message).await
if let Some(message) = messages.into_iter().next() {
self.handle_message(message).await
} else {
Ok(())
}
} else {
// Multiple messages - batch them
self.handle_message_batch(messages).await
Expand Down Expand Up @@ -462,10 +466,18 @@ impl Channel {
.get("telegram_chat_type")
.and_then(|v| v.as_str())
});
self.conversation_context = Some(
prompt_engine
.render_conversation_context(&first.source, server_name, channel_name)?,
);
match prompt_engine.render_conversation_context(
&first.source,
server_name,
channel_name,
) {
Ok(context) => {
self.conversation_context = Some(context);
}
Err(error) => {
tracing::warn!(%error, "failed to render conversation context");
}
}
}

// Persist each message to conversation log (individual audit trail)
Expand Down Expand Up @@ -605,8 +617,9 @@ impl Channel {
let browser_enabled = rc.browser_config.load().enabled;
let web_search_enabled = rc.brave_search_key.load().is_some();
let opencode_enabled = rc.opencode.load().enabled;
let worker_capabilities =
prompt_engine.render_worker_capabilities(browser_enabled, web_search_enabled, opencode_enabled)?;
let worker_capabilities = prompt_engine
.render_worker_capabilities(browser_enabled, web_search_enabled, opencode_enabled)
.map_err(|error| AgentError::Other(error.into()))?;

let status_text = {
let status = self.state.status_block.read().await;
Expand All @@ -623,16 +636,18 @@ impl Channel {

let empty_to_none = |s: String| if s.is_empty() { None } else { Some(s) };

prompt_engine.render_channel_prompt(
empty_to_none(identity_context),
empty_to_none(memory_bulletin.to_string()),
empty_to_none(skills_prompt),
worker_capabilities,
self.conversation_context.clone(),
empty_to_none(status_text),
coalesce_hint,
available_channels,
)
Ok(prompt_engine
.render_channel_prompt(
empty_to_none(identity_context),
empty_to_none(memory_bulletin.to_string()),
empty_to_none(skills_prompt),
worker_capabilities,
self.conversation_context.clone(),
empty_to_none(status_text),
coalesce_hint,
available_channels,
)
.map_err(|error| AgentError::Other(error.into()))?)
}

/// Handle an incoming message by running the channel's LLM agent loop.
Expand Down Expand Up @@ -712,10 +727,18 @@ impl Channel {
.get("telegram_chat_type")
.and_then(|v| v.as_str())
});
self.conversation_context = Some(
prompt_engine
.render_conversation_context(&message.source, server_name, channel_name)?,
);
match prompt_engine.render_conversation_context(
&message.source,
server_name,
channel_name,
) {
Ok(context) => {
self.conversation_context = Some(context);
}
Err(error) => {
tracing::warn!(%error, "failed to render conversation context");
}
}
}

let system_prompt = self.build_system_prompt().await?;
Expand Down Expand Up @@ -790,7 +813,7 @@ impl Channel {
}

/// Assemble the full system prompt using the PromptEngine.
async fn build_system_prompt(&self) -> crate::error::Result<String> {
async fn build_system_prompt(&self) -> Result<String> {
let rc = &self.deps.runtime_config;
let prompt_engine = rc.prompts.load();

Expand All @@ -803,7 +826,8 @@ impl Channel {
let web_search_enabled = rc.brave_search_key.load().is_some();
let opencode_enabled = rc.opencode.load().enabled;
let worker_capabilities = prompt_engine
.render_worker_capabilities(browser_enabled, web_search_enabled, opencode_enabled)?;
.render_worker_capabilities(browser_enabled, web_search_enabled, opencode_enabled)
.map_err(|error| AgentError::Other(error.into()))?;

let status_text = {
let status = self.state.status_block.read().await;
Expand All @@ -814,7 +838,7 @@ impl Channel {

let empty_to_none = |s: String| if s.is_empty() { None } else { Some(s) };

prompt_engine
Ok(prompt_engine
.render_channel_prompt(
empty_to_none(identity_context),
empty_to_none(memory_bulletin.to_string()),
Expand All @@ -825,6 +849,7 @@ impl Channel {
None, // coalesce_hint - only set for batched messages
available_channels,
)
.map_err(|error| AgentError::Other(error.into()))?)
}

/// Register per-turn tools, run the LLM agentic loop, and clean up.
Expand Down Expand Up @@ -1147,8 +1172,10 @@ impl Channel {
for (key, value) in retrigger_metadata {
self.pending_retrigger_metadata.insert(key, value);
}
self.retrigger_deadline =
Some(tokio::time::Instant::now() + std::time::Duration::from_millis(RETRIGGER_DEBOUNCE_MS));
self.retrigger_deadline = Some(
tokio::time::Instant::now()
+ std::time::Duration::from_millis(RETRIGGER_DEBOUNCE_MS),
);
}
}

Expand Down Expand Up @@ -1177,19 +1204,16 @@ impl Channel {
"firing debounced retrigger"
);

let retrigger_message = match self
let retrigger_message = self
.deps
.runtime_config
.prompts
.load()
.render_system_retrigger()
{
Ok(message) => message,
Err(error) => {
tracing::error!(%error, "failed to render retrigger message");
return;
}
};
.unwrap_or_else(|error| {
tracing::warn!(%error, "failed to render retrigger message");
"Background work completed; continue processing.".to_string()
});

let synthetic = InboundMessage {
id: uuid::Uuid::new_v4().to_string(),
Expand Down Expand Up @@ -1261,7 +1285,7 @@ pub async fn spawn_branch_from_state(
&rc.instance_dir.display().to_string(),
&rc.workspace_dir.display().to_string(),
)
.map_err(|e| AgentError::Other(anyhow::anyhow!("{e}")))?;
.map_err(|error| AgentError::Other(error.into()))?;

spawn_branch(
state,
Expand All @@ -1285,10 +1309,10 @@ async fn spawn_memory_persistence_branch(
let prompt_engine = deps.runtime_config.prompts.load();
let system_prompt = prompt_engine
.render_static("memory_persistence")
.map_err(|e| AgentError::Other(anyhow::anyhow!("{e}")))?;
.map_err(|error| AgentError::Other(error.into()))?;
let prompt = prompt_engine
.render_system_memory_persistence()
.map_err(|e| AgentError::Other(anyhow::anyhow!("{e}")))?;
.map_err(|error| AgentError::Other(error.into()))?;

spawn_branch(
state,
Expand Down Expand Up @@ -1425,7 +1449,7 @@ pub async fn spawn_worker_from_state(
&rc.instance_dir.display().to_string(),
&rc.workspace_dir.display().to_string(),
)
.map_err(|e| AgentError::Other(anyhow::anyhow!("{e}")))?;
.map_err(|error| AgentError::Other(error.into()))?;
let skills = rc.skills.load();
let browser_config = (**rc.browser_config.load()).clone();
let brave_search_key = (**rc.brave_search_key.load()).clone();
Expand Down
10 changes: 7 additions & 3 deletions src/agent/compactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,14 @@ impl Compactor {
let deps = self.deps.clone();
let prompt_engine = deps.runtime_config.prompts.load();
let compactor_prompt = match prompt_engine.render_static("compactor") {
Ok(p) => p,
Ok(prompt) => prompt,
Err(error) => {
tracing::error!(%error, "failed to render compactor prompt");
let mut flag = is_compacting.write().await;
tracing::error!(
channel_id = %self.channel_id,
%error,
"failed to render compactor prompt"
);
let mut flag = self.is_compacting.write().await;
*flag = false;
return;
}
Expand Down
14 changes: 14 additions & 0 deletions src/agent/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,20 @@ impl StatusBlock {
});
}

/// Remove an active worker by ID.
pub fn remove_worker(&mut self, worker_id: WorkerId) -> bool {
if let Some(position) = self
.active_workers
.iter()
.position(|worker| worker.id == worker_id)
{
self.active_workers.remove(position);
true
} else {
false
}
}

/// Render the status block as a string for context injection.
pub fn render(&self) -> String {
let mut output = String::new();
Expand Down
38 changes: 27 additions & 11 deletions src/agent/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,10 @@ impl Worker {
None
}
})
.unwrap_or_else(|| "Worker reached maximum segments without a final response.".to_string());
.unwrap_or_else(|| {
"Worker reached maximum segments without a final response."
.to_string()
});
}

self.maybe_compact_history(&mut history).await;
Expand Down Expand Up @@ -358,9 +361,19 @@ impl Worker {
self.hook.send_status("compacting (overflow recovery)");
self.force_compact_history(&mut history).await;
let prompt_engine = self.deps.runtime_config.prompts.load();
let overflow_msg =
prompt_engine.render_system_worker_overflow()?;
follow_up_prompt = format!("{follow_up}\n\n{overflow_msg}");
match prompt_engine.render_system_worker_overflow() {
Ok(overflow_msg) => {
follow_up_prompt = format!("{follow_up}\n\n{overflow_msg}");
}
Err(render_error) => {
tracing::warn!(
worker_id = %self.id,
%render_error,
"failed to render worker overflow message"
);
follow_up_prompt = follow_up.clone();
}
}
}
Err(error) => {
self.write_failure_log(&history, &format!("follow-up failed: {error}"));
Expand Down Expand Up @@ -449,13 +462,16 @@ impl Worker {

let recap = build_worker_recap(&removed);
let prompt_engine = self.deps.runtime_config.prompts.load();
let marker = match prompt_engine.render_system_worker_compact(remove_count, &recap) {
Ok(m) => m,
Err(error) => {
tracing::error!(%error, "failed to render worker compact marker");
return;
}
};
let marker = prompt_engine
.render_system_worker_compact(remove_count, &recap)
.unwrap_or_else(|error| {
tracing::warn!(
worker_id = %self.id,
%error,
"failed to render worker compact message"
);
format!("[Compacted worker history: removed {remove_count} messages]")
});
history.insert(0, rig::message::Message::from(marker));

tracing::info!(
Expand Down
Loading