Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions packages/orchestrator/src/runtime/api-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@ export async function createApiServer(
methods: ['GET', 'HEAD', 'POST', 'PUT', 'PATCH', 'DELETE'],
})

fastify.get('/runtime/health', async () => ({
status: 'ok',
activeTasks: activeTasks.size,
}))

fastify.get('/tasks', async () => dbService.listTasks().map((task) => serializeTaskForApi(task)))

fastify.get('/tasks/pending-plans', async () =>
Expand Down
1 change: 1 addition & 0 deletions packages/orchestrator/src/slack-integration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export interface SlackNotificationPayload {
task: Task
event: SlackNotificationEvent
extra?: string
agentModel?: string
}

export interface SlackBotInterface {
Expand Down
7 changes: 6 additions & 1 deletion packages/orchestrator/src/workflow/task-runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ async function persistPlanResult(task: Task, project: ProjectConfig, planResult:
const updatedTask = dbService.getTaskById(task.id)
if (updatedTask) {
getSlackBot()
?.notify({ task: updatedTask, event: 'plan_ready' })
?.notify({ task: updatedTask, event: 'plan_ready', agentModel: project.agent.model })
.catch((err: any) => logger.error(`Slack notify failed: ${err?.message ?? err}`, task.id))
}
return
Expand Down Expand Up @@ -187,6 +187,11 @@ export async function processTask(

taskLifecycle.run(task.id, `Starting execution: ${task.title}`)
dbService.incrementExecutionAttempts(task.id)

getSlackBot()
?.notify({ task, event: 'execution_started' })
.catch((err: any) => logger.error(`Slack notify failed: ${err?.message ?? err}`, task.id))

let worktreePath: string | undefined

try {
Expand Down
9 changes: 8 additions & 1 deletion packages/slack/src/bot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export class SlackBot {
private channel: string
private apiBaseUrl: string
private threadRegistry = new Map<string, string>()
private taskToThread = new Map<string, string>()

constructor({ config, apiBaseUrl, onError }: SlackBotOptions) {
this.channel = config.channel
Expand Down Expand Up @@ -39,6 +40,12 @@ export class SlackBot {
}

async notify(payload: SlackNotificationPayload): Promise<void> {
await sendNotification(this.client, this.channel, payload, this.threadRegistry)
await sendNotification(
this.client,
this.channel,
payload,
this.threadRegistry,
this.taskToThread
)
}
}
10 changes: 7 additions & 3 deletions packages/slack/src/formatters.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,25 +26,29 @@ function markdownToMrkdwn(text: string): string {
)
}

export function buildPlanApprovalMessage(task: Task): (Block | KnownBlock)[] {
export function buildPlanApprovalMessage(task: Task, agentModel?: string): (Block | KnownBlock)[] {
const planText = task.planMarkdown
? truncate(task.planMarkdown, MAX_PLAN_LENGTH)
: 'No plan content available.'

const agentLabel = agentModel
? `${agentIdentityLine(task)} - ${agentModel}`
: agentIdentityLine(task)

return [
{
type: 'header',
text: {
type: 'plain_text',
text: `Plan Ready — ${agentIdentityLine(task)}`,
text: `Plan Ready — ${agentLabel}`,
emoji: true,
},
},
{
type: 'section',
text: {
type: 'mrkdwn',
text: `*Project:* ${task.projectId}\n*Task:* ${task.externalId} · ${task.title}`,
text: `*Project:* ${task.projectId}\n*ID:* ${task.id}\n*Task:* ${task.externalId} · ${task.title}`,
},
},
{ type: 'divider' },
Expand Down
14 changes: 8 additions & 6 deletions packages/slack/src/handlers/commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ export function registerSlashCommands(app: App, apiBaseUrl: string): void {
const [subcommand, taskId] = command.text.trim().split(/\s+/)

if (!subcommand) {
await respond('Usage: `/parallax <retry|cancel|status|pr-review> <taskId>`')
await respond('Usage: `/parallax <retry|cancel|status|pr-review> [taskId]`')
return
}

if (!taskId) {
const requiresTaskId = subcommand !== 'status'
if (requiresTaskId && !taskId) {
await respond(`Usage: \`/parallax ${subcommand} <taskId>\``)
return
}
Expand All @@ -36,14 +37,15 @@ export function registerSlashCommands(app: App, apiBaseUrl: string): void {
break
}
case 'status': {
const res = await fetch(`${apiBaseUrl}/tasks/${taskId}`)
const res = await fetch(`${apiBaseUrl}/runtime/health`)
if (!res.ok) {
await respond(`Task \`${taskId}\` not found.`)
await respond('⚠️ Could not reach Parallax orchestrator.')
return
}
const data = (await res.json()) as { status: string; planState?: string; title?: string }
const data = (await res.json()) as { activeTasks: number }
const taskLabel = data.activeTasks === 1 ? '1 task' : `${data.activeTasks} tasks`
await respond(
`*Task \`${taskId}\`:* ${data.title ?? ''}\nStatus: \`${data.status}\` Plan: \`${data.planState ?? 'n/a'}\``
`✅ Parallax is running.\nActive tasks: ${data.activeTasks === 0 ? 'none' : taskLabel} processing.`
)
break
}
Expand Down
33 changes: 30 additions & 3 deletions packages/slack/src/notifications.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ export async function sendNotification(
client: WebClient,
channel: string,
payload: SlackNotificationPayload,
threadRegistry: Map<string, string>
threadRegistry: Map<string, string>,
taskToThread: Map<string, string>
): Promise<void> {
const { task, event, extra } = payload
const { task, event, extra, agentModel } = payload

if (event === 'plan_ready') {
const result = await client.chat.postMessage({
channel,
blocks: buildPlanApprovalMessage(task),
blocks: buildPlanApprovalMessage(task, agentModel),
text: `Plan ready for ${task.externalId}: ${task.title}`,
})
if (result.ts) {
Expand All @@ -22,6 +23,32 @@ export async function sendNotification(
return
}

if (event === 'execution_started') {
const result = await client.chat.postMessage({
channel,
blocks: buildEventMessage(task, event, extra),
text: `[${event}] ${task.externalId}: ${task.title}`,
})
if (result.ts) {
taskToThread.set(task.id, result.ts)
}
return
}

if (event === 'failed') {
const threadTs = taskToThread.get(task.id)
await client.chat.postMessage({
channel,
...(threadTs ? { thread_ts: threadTs } : {}),
blocks: buildEventMessage(task, event, extra),
text: `[${event}] ${task.externalId}: ${task.title}`,
})
if (threadTs) {
taskToThread.delete(task.id)
}
return
}

await client.chat.postMessage({
channel,
blocks: buildEventMessage(task, event, extra),
Expand Down
1 change: 1 addition & 0 deletions packages/slack/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export interface SlackNotificationPayload {
task: Task
event: SlackNotificationEvent
extra?: string
agentModel?: string
}

export interface SlackBotOptions {
Expand Down
Loading