Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
dd4b946
feat: add client protocol tracking to remote participants
1egoman Mar 9, 2026
2c899b6
feat: add client protocol advertisement code, only advertise client p…
1egoman Mar 9, 2026
9f257fb
feat: add initial first implementation pass
1egoman Mar 9, 2026
6085fb8
feat: ensure that payload is streamed when compressed, not all buffer…
1egoman Mar 9, 2026
187e0fa
feat: use a data streams transmission approach much closer to what lu…
1egoman Mar 9, 2026
8c9e6dd
fix: add long rpc message to example
1egoman Mar 9, 2026
f9747ff
feat: cleanup code and don't create RPCRequest / RPCResponse in data …
1egoman Mar 10, 2026
690e1b7
fix: make functions into arrow fns to work around `this` being unset
1egoman Mar 10, 2026
171f4e7
refactor: break up long if / else if / else chain
1egoman Mar 10, 2026
e6ea806
fix: add isCallerStillConnected to ensure that rpc drops responses fo…
1egoman Mar 10, 2026
f5fc773
fix: adjust copyright headers and remove obsolete rpc exports
1egoman Mar 10, 2026
f1e0477
fix: flip around response timeout nan check
1egoman Mar 10, 2026
83e5ccc
fix: adjust compression / data stream thresholds to not be inclusive
1egoman Mar 10, 2026
b179ddc
fix: remove small payload uncompressed path
1egoman Mar 13, 2026
c697e2b
refactor: run npm run format
1egoman Mar 13, 2026
4fed5c6
feat: commit (updated) rpc benchmark
1egoman Mar 13, 2026
76fc4c5
fix: remove references to "small response", uncompressed payloads are…
1egoman Mar 27, 2026
0f67c45
refactor: move ack code into handleIncomingRpcAck
1egoman Mar 27, 2026
416934c
refactor: extract all rpc related packet sending code out of the engi…
1egoman Mar 27, 2026
bda91f8
feat: add initial rpc client manager / rpc server manager focused tests
1egoman Mar 27, 2026
b5f1971
feat: port over all existing rpc tests to call RpcClientManager / Rpc…
1egoman Mar 27, 2026
3044ae6
fix: address tsc warnings
1egoman Mar 27, 2026
cf64257
fix: remove stale throws import
1egoman Mar 27, 2026
1ca1f66
feat: migrate rpc to use data streams for sending rpc requests
1egoman Mar 30, 2026
40df0a5
feat: migrate RpcClientManager / RpcServerManager to not be tightly c…
1egoman Mar 31, 2026
b5885ec
fix: address fallout from bad rebase
1egoman Apr 2, 2026
1d3e128
fix: remove DataPacket_Kind from RpcClientManager / RpcServerManager
1egoman Apr 2, 2026
cb3d0d2
fix: adjust rpc client / server manager tests to fix type errors
1egoman Apr 2, 2026
03fd74c
feat: remove compression from this for now
1egoman Apr 2, 2026
2de48ed
feat: consistency use text streams everywhere for rpc, not byte streams
1egoman Apr 3, 2026
1cce1fb
fix: add ack to rpc messages also in the data stream case
1egoman Apr 3, 2026
bce1998
fix: make rpc resilient to engine teardowns
1egoman Apr 3, 2026
dafcf03
fix: remove LLM generated file headers
1egoman Apr 3, 2026
aa2daa9
feat: make handleIncomingRpcRequest take a RpcRequest
1egoman Apr 3, 2026
7e0f0e7
feat: add explicit version field to data streams rpc (this is now exp…
1egoman Apr 3, 2026
0324c8a
fix: rename client protocol version var to match new implementation p…
1egoman Apr 3, 2026
0335480
fix: convert console.error -> log.error
1egoman Apr 3, 2026
076dee5
refactor: rename MAX_LEGACY_PAYLOAD_BYTES => MAX_V1_PAYLOAD_BYTES
1egoman Apr 3, 2026
983795c
fix: add docs comments for v1 / v2 rpcs
1egoman Apr 3, 2026
2c08c67
fix: adjust tests over to use new handleIncomingRpcRequest signature
1egoman Apr 3, 2026
325e0ae
fix: run npm run format
1egoman Apr 3, 2026
0fc0ab8
fix: remove dead code in rpc client manager test
1egoman Apr 3, 2026
aac2d49
feat: add tests for v2 -> v2 rpc messages
1egoman Apr 3, 2026
c0c47f9
fix: update tests to exercise new v1 data streams with long payload p…
1egoman Apr 3, 2026
ae86652
fix: make rpc example payload size longer
1egoman Apr 3, 2026
dd47f77
fix: run npm run format
1egoman Apr 3, 2026
3821376
feat: make a few small changes to the rpc v2 protocol
1egoman Apr 3, 2026
d9a774c
fix: npm run format
1egoman Apr 3, 2026
9746892
fix: add missing changeset
1egoman Apr 3, 2026
a4a8fa3
fix: remove more LLM added copyright notices
1egoman Apr 3, 2026
5d7ac41
refactor: reorganize some code / cosmetic changes
1egoman Apr 3, 2026
6eb316d
feat: stop checking in rpc-benchmark
1egoman Apr 3, 2026
0a219fe
docs: check in draft spec
1egoman Apr 3, 2026
e71a2e4
refactor: use named constants for rpc versions
1egoman Apr 9, 2026
92c72f0
docs: add recommended naming section to rpc spec
1egoman Apr 13, 2026
32afd5b
fix: use CLIENT_PROTOCOL_DATA_STREAM_RPC constant instead of magic nu…
1egoman Apr 14, 2026
137b942
fix: skip sending error for request id being unset since if the reque…
1egoman Apr 14, 2026
cdb9981
fix: move publishRpcRequest later in the performRpc call
1egoman Apr 14, 2026
dc2a731
refactor: put rpc request headers into an enum
1egoman Apr 14, 2026
f9be9f9
feat: add better error messages / linked error with cause for new rpc…
1egoman Apr 16, 2026
1fe075c
fix: run npm run format
1egoman Apr 17, 2026
499c842
fix: commit missing `cause` in builtIn constructor for `RpcError`
1egoman Apr 17, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/chubby-buckets-drop.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'livekit-client': patch
---

Add new RPC protocol updates to support infinite payload length in requests / responses
361 changes: 361 additions & 0 deletions RPC_SPEC.md

Large diffs are not rendered by default.

34 changes: 34 additions & 0 deletions examples/rpc/rpc-demo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ async function main() {
console.error('Error:', error);
}

try {
console.log('\n\nRunning send long info example...');
await Promise.all([performSendVeryLongInfo(callersRoom)]);
} catch (error) {
console.error('Error:', error);
}

try {
console.log('\n\nRunning error handling example...');
await Promise.all([performDivision(callersRoom)]);
Expand Down Expand Up @@ -85,6 +92,18 @@ const registerReceiverMethods = async (greetersRoom: Room, mathGeniusRoom: Room)
},
);

await greetersRoom.registerRpcMethod(
'exchanging-long-info',
// eslint-disable-next-line @typescript-eslint/no-unused-vars
async (data: RpcInvocationData) => {
console.log(
`[Greeter] ${data.callerIdentity} has arrived and said that its long info is "${data.payload}"`,
);
await new Promise((resolve) => setTimeout(resolve, 2000));
return new Array<string>(20_000).fill('Y').join('');
},
);

await mathGeniusRoom.registerRpcMethod('square-root', async (data: RpcInvocationData) => {
const jsonData = JSON.parse(data.payload);
const number = jsonData.number;
Expand Down Expand Up @@ -136,6 +155,21 @@ const performGreeting = async (room: Room): Promise<void> => {
}
};

const performSendVeryLongInfo = async (room: Room): Promise<void> => {
console.log('[Caller] Sending the greeter a very long message');
try {
const response = await room.localParticipant.performRpc({
destinationIdentity: 'greeter',
method: 'exchanging-long-info',
payload: new Array<string>(20_000).fill('X').join(''),
});
console.log(`[Caller] The greeter's long info is: "${response}"`);
} catch (error) {
console.error('[Caller] RPC call failed:', error);
throw error;
}
};

const performDisconnection = async (room: Room): Promise<void> => {
console.log('[Caller] Checking back in on the greeter...');
try {
Expand Down
26 changes: 0 additions & 26 deletions src/room/RTCEngine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import {
Room as RoomModel,
RoomMovedResponse,
RpcAck,
RpcResponse,
ServerInfo,
SessionDescription,
SignalTarget,
Expand Down Expand Up @@ -74,7 +73,6 @@ import {
UnexpectedConnectionState,
} from './errors';
import { EngineEvent } from './events';
import { RpcError } from './rpc';
import CriticalTimers from './timers';
import type LocalTrack from './track/LocalTrack';
import type LocalTrackPublication from './track/LocalTrackPublication';
Expand Down Expand Up @@ -1391,30 +1389,6 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
});
};

/** @internal */
async publishRpcResponse(
destinationIdentity: string,
requestId: string,
payload: string | null,
error: RpcError | null,
) {
const packet = new DataPacket({
destinationIdentities: [destinationIdentity],
kind: DataPacket_Kind.RELIABLE,
value: {
case: 'rpcResponse',
value: new RpcResponse({
requestId,
value: error
? { case: 'error', value: error.toProto() }
: { case: 'payload', value: payload ?? '' },
}),
},
});

await this.sendDataPacket(packet, DataChannelKind.RELIABLE);
}

/** @internal */
async publishRpcAck(destinationIdentity: string, requestId: string) {
const packet = new DataPacket({
Expand Down
164 changes: 83 additions & 81 deletions src/room/Room.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import type {
} from '../options';
import TypedPromise from '../utils/TypedPromise';
import { getBrowser } from '../utils/browserParser';
import { CLIENT_PROTOCOL_DEFAULT } from '../version';
import { BackOffStrategy } from './BackOffStrategy';
import DeviceManager from './DeviceManager';
import RTCEngine, { DataChannelKind } from './RTCEngine';
Expand Down Expand Up @@ -78,7 +79,14 @@ import LocalParticipant from './participant/LocalParticipant';
import Participant from './participant/Participant';
import { type ConnectionQuality, ParticipantKind } from './participant/Participant';
import RemoteParticipant from './participant/RemoteParticipant';
import { MAX_PAYLOAD_BYTES, RpcError, type RpcInvocationData, byteLength } from './rpc';
import {
RPC_REQUEST_DATA_STREAM_TOPIC,
RPC_RESPONSE_DATA_STREAM_TOPIC,
RpcClientManager,
RpcError,
type RpcInvocationData,
RpcServerManager,
} from './rpc';
import CriticalTimers from './timers';
import LocalAudioTrack from './track/LocalAudioTrack';
import type LocalTrack from './track/LocalTrack';
Expand Down Expand Up @@ -216,7 +224,9 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)

private outgoingDataTrackManager: OutgoingDataTrackManager;

private rpcHandlers: Map<string, (data: RpcInvocationData) => Promise<string>> = new Map();
private rpcClientManager: RpcClientManager;

private rpcServerManager: RpcServerManager;

get hasE2EESetup(): boolean {
return this.e2eeManager !== undefined;
Expand Down Expand Up @@ -294,15 +304,36 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
this.engine.sendLossyBytes(bytes, DataChannelKind.DATA_TRACK_LOSSY, 'wait');
});

this.registerRpcDataStreamHandler();

this.rpcClientManager = new RpcClientManager(
this.log,
this.outgoingDataStreamManager,
this.getRemoteParticipantClientProtocol,
() => this.engine.latestJoinResponse?.serverInfo?.version,
);
this.rpcClientManager.on('sendDataPacket', ({ packet }) => {
this.engine?.sendDataPacket(packet, DataChannelKind.RELIABLE);
});
this.rpcServerManager = new RpcServerManager(
this.log,
this.outgoingDataStreamManager,
this.getRemoteParticipantClientProtocol,
);
this.rpcServerManager.on('sendDataPacket', ({ packet }) => {
this.engine?.sendDataPacket(packet, DataChannelKind.RELIABLE);
});

this.disconnectLock = new Mutex();
this.localParticipant = new LocalParticipant(
'',
'',
this.engine,
this.options,
this.rpcHandlers,
this.outgoingDataStreamManager,
this.outgoingDataTrackManager,
this.rpcClientManager,
this.rpcServerManager,
);

if (this.options.e2ee || this.options.encryption) {
Expand Down Expand Up @@ -391,12 +422,7 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
* Other errors thrown in your handler will not be transmitted as-is, and will instead arrive to the caller as `1500` ("Application Error").
*/
registerRpcMethod(method: string, handler: (data: RpcInvocationData) => Promise<string>) {
if (this.rpcHandlers.has(method)) {
throw Error(
`RPC handler already registered for method ${method}, unregisterRpcMethod before trying to register again`,
);
}
this.rpcHandlers.set(method, handler);
this.rpcServerManager.registerRpcMethod(method, handler);
}

/**
Expand All @@ -405,7 +431,7 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
* @param method - The name of the RPC method to unregister
*/
unregisterRpcMethod(method: string) {
this.rpcHandlers.delete(method);
this.rpcServerManager.unregisterRpcMethod(method);
}

/**
Expand Down Expand Up @@ -1805,7 +1831,7 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
});
this.emit(RoomEvent.ParticipantDisconnected, participant);
participant.setDisconnected();
this.localParticipant?.handleParticipantDisconnected(participant.identity);
this.rpcClientManager.handleParticipantDisconnected(participant.identity);
}

// updates are sent only when there's a change to speaker ordering
Expand Down Expand Up @@ -1949,14 +1975,31 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
this.handleDataStream(packet, encryptionType);
} else if (packet.value.case === 'rpcRequest') {
const rpc = packet.value.value;
this.handleIncomingRpcRequest(
packet.participantIdentity,
rpc.id,
rpc.method,
rpc.payload,
rpc.responseTimeoutMs,
rpc.version,
);
this.rpcServerManager.handleIncomingRpcRequest(packet.participantIdentity, rpc);
} else if (packet.value.case === 'rpcResponse') {
const rpcResponse = packet.value.value;
switch (rpcResponse.value.case) {
case 'payload':
this.rpcClientManager.handleIncomingRpcResponseSuccess(
rpcResponse.requestId,
rpcResponse.value.value,
);
break;
case 'error':
this.rpcClientManager.handleIncomingRpcResponseFailure(
rpcResponse.requestId,
RpcError.fromProto(rpcResponse.value.value),
);
break;
default:
this.log.warn(
`Unknown rpcResponse.value.case: ${rpcResponse.value.case}`,
this.logContext,
);
break;
}
} else if (packet.value.case === 'rpcAck') {
this.rpcClientManager.handleIncomingRpcAck(packet.value.value.requestId);
}
};

Expand Down Expand Up @@ -2020,68 +2063,6 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
this.incomingDataStreamManager.handleDataStreamPacket(packet, encryptionType);
};

private async handleIncomingRpcRequest(
callerIdentity: string,
requestId: string,
method: string,
payload: string,
responseTimeout: number,
version: number,
) {
await this.engine.publishRpcAck(callerIdentity, requestId);

if (version !== 1) {
await this.engine.publishRpcResponse(
callerIdentity,
requestId,
null,
RpcError.builtIn('UNSUPPORTED_VERSION'),
);
return;
}

const handler = this.rpcHandlers.get(method);

if (!handler) {
await this.engine.publishRpcResponse(
callerIdentity,
requestId,
null,
RpcError.builtIn('UNSUPPORTED_METHOD'),
);
return;
}

let responseError: RpcError | null = null;
let responsePayload: string | null = null;

try {
const response = await handler({
requestId,
callerIdentity,
payload,
responseTimeout,
});
if (byteLength(response) > MAX_PAYLOAD_BYTES) {
responseError = RpcError.builtIn('RESPONSE_PAYLOAD_TOO_LARGE');
this.log.warn(`RPC Response payload too large for ${method}`);
} else {
responsePayload = response;
}
} catch (error) {
if (error instanceof RpcError) {
responseError = error;
} else {
this.log.warn(
`Uncaught error returned by RPC handler for ${method}. Returning APPLICATION_ERROR instead.`,
error,
);
responseError = RpcError.builtIn('APPLICATION_ERROR');
}
}
await this.engine.publishRpcResponse(callerIdentity, requestId, responsePayload, responseError);
}

bufferedSegments: Map<string, TranscriptionSegmentModel> = new Map();

private handleAudioPlaybackStarted = () => {
Expand Down Expand Up @@ -2427,6 +2408,27 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
}
}

private getRemoteParticipantClientProtocol = (identity: Participant['identity']) => {
return this.remoteParticipants.get(identity)?.clientProtocol ?? CLIENT_PROTOCOL_DEFAULT;
};

private registerRpcDataStreamHandler() {
this.incomingDataStreamManager.registerTextStreamHandler(
RPC_REQUEST_DATA_STREAM_TOPIC,
async (reader, { identity }) => {
const attributes = reader.info.attributes ?? {};
await this.rpcServerManager.handleIncomingDataStream(reader, identity, attributes);
},
);
this.incomingDataStreamManager.registerTextStreamHandler(
RPC_RESPONSE_DATA_STREAM_TOPIC,
async (reader) => {
const attributes = reader.info.attributes ?? {};
await this.rpcClientManager.handleIncomingDataStream(reader, attributes);
},
);
}

private registerConnectionReconcile() {
this.clearConnectionReconcile();
let consecutiveFailures = 0;
Expand Down
Loading
Loading