Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions js/.changeset/stream-json-input.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@link-assistant/agent': minor
---

Add Claude-compatible `--input-format stream-json` stdin frames and `--output-format stream-json` alias with user-prompt replay acknowledgements.
142 changes: 115 additions & 27 deletions js/src/cli/continuous-mode.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { Instance } from '../project/instance.ts';
import { Bus } from '../bus/index.ts';
import { Session } from '../session/index.ts';
import { SessionPrompt } from '../session/prompt.ts';
import { createEventHandler } from '../json-standard/index.ts';
import { createEventHandler, serializeOutput } from '../json-standard/index.ts';
import { createContinuousStdinReader } from './input-queue.js';
import { Log } from '../util/log.ts';
import { config } from '../config/config.ts';
Expand Down Expand Up @@ -40,6 +40,64 @@ export function getHasError() {
// Logger for resume operations
const log = Log.create({ service: 'resume' });

function getInputFormat(argv) {
return argv['input-format'] || argv.inputFormat || 'text';
}

function outputConsumedInput({
message,
jsonStandard,
sessionID,
compactJson,
}) {
const raw = message.raw || message.message || message.system || '';
if (
jsonStandard === 'claude' &&
message.kind === 'message' &&
message.format === 'stream-json'
) {
process.stdout.write(
serializeOutput(
{
type: 'message',
timestamp: new Date().toISOString(),
session_id: sessionID,
role: 'user',
content: [{ type: 'text', text: message.message ?? '' }],
},
'claude'
)
);
return;
}

if (jsonStandard === 'claude') {
return;
}

outputInput(
{
raw,
parsed: message.parsed || message,
format: message.format || 'text',
kind: message.kind || 'message',
},
compactJson
);
}

function outputInputParseError(error, line, compactJson) {
hasError = true;
outputError(
{
errorType: 'ValidationError',
message: error instanceof Error ? error.message : String(error),
raw: line,
},
compactJson
);
}

/**
* Resolve the session to use based on --resume, --continue, and --no-fork options.
* Returns the session ID to use, handling forking as needed.
Expand Down Expand Up @@ -200,8 +258,12 @@ export async function runContinuousServerMode(
) {
// Check both CLI flag and environment variable for compact JSON mode
const compactJson = argv['compact-json'] === true || config.compactJson;
const inputFormat = getInputFormat(argv);
const isInteractive = argv.interactive !== false;
const autoMerge = argv['auto-merge-queued-messages'] !== false;
const autoMerge =
inputFormat === 'stream-json'
? false
: argv['auto-merge-queued-messages'] !== false;

// Start server like OpenCode does
const server = Server.listen({ port: 0, hostname: '127.0.0.1' });
Expand Down Expand Up @@ -244,9 +306,23 @@ export async function runContinuousServerMode(
// Track if we're currently processing a message
let isProcessing = false;
const pendingMessages = [];
let currentSystemMessage = systemMessage;
const currentAppendSystemMessage = appendSystemMessage;

// Process messages from the queue
const processMessage = async (message) => {
if (message.kind === 'interrupt') {
SessionPrompt.cancel(sessionID);
outputConsumedInput({ message, jsonStandard, sessionID, compactJson });
return;
}

if (message.kind === 'system') {
currentSystemMessage = message.system;
outputConsumedInput({ message, jsonStandard, sessionID, compactJson });
return;
}

if (isProcessing) {
pendingMessages.push(message);
return;
Expand All @@ -255,16 +331,9 @@ export async function runContinuousServerMode(
isProcessing = true;

// Output input confirmation in JSON format
outputInput(
{
raw: message.raw || message.message,
parsed: message,
format: message.format || 'text',
},
compactJson
);
outputConsumedInput({ message, jsonStandard, sessionID, compactJson });

const messageText = message.message || 'hi';
const messageText = message.message ?? 'hi';
const parts = [{ type: 'text', text: messageText }];

// Create a promise to wait for this message to complete
Expand All @@ -290,8 +359,8 @@ export async function runContinuousServerMode(
parts,
model: { providerID, modelID },
compactionModel,
system: systemMessage,
appendSystem: appendSystemMessage,
system: currentSystemMessage,
appendSystem: currentAppendSystemMessage,
temperature,
}),
}
Expand Down Expand Up @@ -377,11 +446,15 @@ export async function runContinuousServerMode(

// Create continuous stdin reader
stdinReader = createContinuousStdinReader({
interactive: isInteractive,
interactive: inputFormat === 'stream-json' ? false : isInteractive,
autoMerge,
inputFormat,
onMessage: (message) => {
processMessage(message);
},
onError: (error, line) => {
outputInputParseError(error, line, compactJson);
},
});

// Wait for stdin to end (EOF or close)
Expand Down Expand Up @@ -454,8 +527,12 @@ export async function runContinuousDirectMode(
) {
// Check both CLI flag and environment variable for compact JSON mode
const compactJson = argv['compact-json'] === true || config.compactJson;
const inputFormat = getInputFormat(argv);
const isInteractive = argv.interactive !== false;
const autoMerge = argv['auto-merge-queued-messages'] !== false;
const autoMerge =
inputFormat === 'stream-json'
? false
: argv['auto-merge-queued-messages'] !== false;

let unsub = null;
let stdinReader = null;
Expand Down Expand Up @@ -483,9 +560,23 @@ export async function runContinuousDirectMode(
// Track if we're currently processing a message
let isProcessing = false;
const pendingMessages = [];
let currentSystemMessage = systemMessage;
const currentAppendSystemMessage = appendSystemMessage;

// Process messages from the queue
const processMessage = async (message) => {
if (message.kind === 'interrupt') {
SessionPrompt.cancel(sessionID);
outputConsumedInput({ message, jsonStandard, sessionID, compactJson });
return;
}

if (message.kind === 'system') {
currentSystemMessage = message.system;
outputConsumedInput({ message, jsonStandard, sessionID, compactJson });
return;
}

if (isProcessing) {
pendingMessages.push(message);
return;
Expand All @@ -494,16 +585,9 @@ export async function runContinuousDirectMode(
isProcessing = true;

// Output input confirmation in JSON format
outputInput(
{
raw: message.raw || message.message,
parsed: message,
format: message.format || 'text',
},
compactJson
);
outputConsumedInput({ message, jsonStandard, sessionID, compactJson });

const messageText = message.message || 'hi';
const messageText = message.message ?? 'hi';
const parts = [{ type: 'text', text: messageText }];

// Create a promise to wait for this message to complete
Expand All @@ -525,8 +609,8 @@ export async function runContinuousDirectMode(
parts,
model: { providerID, modelID },
compactionModel,
system: systemMessage,
appendSystem: appendSystemMessage,
system: currentSystemMessage,
appendSystem: currentAppendSystemMessage,
temperature,
}).catch((error) => {
hasError = true;
Expand Down Expand Up @@ -610,11 +694,15 @@ export async function runContinuousDirectMode(

// Create continuous stdin reader
stdinReader = createContinuousStdinReader({
interactive: isInteractive,
interactive: inputFormat === 'stream-json' ? false : isInteractive,
autoMerge,
inputFormat,
onMessage: (message) => {
processMessage(message);
},
onError: (error, line) => {
outputInputParseError(error, line, compactJson);
},
});

// Wait for stdin to end (EOF or close)
Expand Down
Loading
Loading