Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/loro-websocket/src/client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -717,8 +717,8 @@ export class LoroWebsocketClient {
void this.sendRejoinRequest(roomId, msg.roomId, adaptor, active.room, auth);
} else {
// Remove local room state so client does not auto-retry unless requested
this.cleanupRoom(msg.roomId, msg.crdt);
this.emitRoomStatus(roomId, RoomJoinStatus.Error);
this.cleanupRoom(msg.roomId, msg.crdt);
}
break;
}
Expand Down
78 changes: 37 additions & 41 deletions packages/loro-websocket/src/server/simple-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,7 @@ export interface SimpleServerConfig {
* Optional handshake auth: called during WS HTTP upgrade.
* Return true to accept, false to reject.
*/
handshakeAuth?: (
req: IncomingMessage
) => boolean | Promise<boolean>;
handshakeAuth?: (req: IncomingMessage) => boolean | Promise<boolean>;
}

interface RoomDocument {
Expand Down Expand Up @@ -191,7 +189,7 @@ export class SimpleServer {
);

void closers
.catch(() => { })
.catch(() => {})
.finally(() => {
try {
wss.close(() => {
Expand All @@ -212,16 +210,16 @@ export class SimpleServer {
private async gracefulCloseWebSocket(ws: WebSocket): Promise<void> {
try {
await this.waitForSocketDrain(ws);
} catch { }
} catch {}

try {
ws.close(1001, "Server stopping");
} catch { }
} catch {}

setTimeout(() => {
try {
if (ws.readyState !== WebSocket.CLOSED) ws.terminate();
} catch { }
} catch {}
}, 50);
}

Expand All @@ -243,7 +241,11 @@ export class SimpleServer {
}

const buffered = readBufferedAmount();
if (buffered == null || buffered <= 0 || Date.now() - start >= timeoutMs) {
if (
buffered == null ||
buffered <= 0 ||
Date.now() - start >= timeoutMs
) {
resolve();
return;
}
Expand Down Expand Up @@ -346,7 +348,7 @@ export class SimpleServer {

const joinResult = roomDoc.descriptor.adaptor.handleJoinRequest(
roomDoc.data,
message.version,
message.version
);

// Send join response with current document version
Expand All @@ -368,8 +370,7 @@ export class SimpleServer {
client
);
const shouldBackfill =
(hasOthers ||
roomDoc.descriptor.allowBackfillWhenNoOtherClients) &&
(hasOthers || roomDoc.descriptor.allowBackfillWhenNoOtherClients) &&
joinResult.updates &&
joinResult.updates.length;

Expand Down Expand Up @@ -406,7 +407,7 @@ export class SimpleServer {
message.batchId,
UpdateStatusCode.PayloadTooLarge,
message.crdt,
message.roomId,
message.roomId
);
return;
}
Expand All @@ -420,7 +421,7 @@ export class SimpleServer {
message.batchId,
UpdateStatusCode.PermissionDenied,
message.crdt,
message.roomId,
message.roomId
);
client.fragments.delete(message.batchId);
return;
Expand All @@ -435,7 +436,7 @@ export class SimpleServer {
message.batchId,
UpdateStatusCode.PermissionDenied,
message.crdt,
message.roomId,
message.roomId
);
return;
}
Expand All @@ -448,7 +449,7 @@ export class SimpleServer {
try {
const newDocumentData = roomDoc.descriptor.adaptor.applyUpdates(
roomDoc.data,
message.updates,
message.updates
);
roomDoc.data = newDocumentData;
} catch (error) {
Expand All @@ -458,12 +459,11 @@ export class SimpleServer {
message.batchId,
UpdateStatusCode.InvalidUpdate,
message.crdt,
message.roomId,
message.roomId
);
return;
}


if (roomDoc.descriptor.shouldPersist) {
roomDoc.dirty = true;
}
Expand All @@ -475,7 +475,7 @@ export class SimpleServer {
message.batchId,
UpdateStatusCode.Ok,
message.crdt,
message.roomId,
message.roomId
);

if (updatesForBroadcast.length > 0) {
Expand All @@ -486,12 +486,7 @@ export class SimpleServer {
updates: updatesForBroadcast,
batchId: message.batchId,
};
this.broadcastToRoom(
message.roomId,
message.crdt,
outgoing,
client
);
this.broadcastToRoom(message.roomId, message.crdt, outgoing, client);
}
} catch (error) {
console.error(error);
Expand All @@ -500,7 +495,7 @@ export class SimpleServer {
message.batchId,
UpdateStatusCode.Unknown,
message.crdt,
message.roomId,
message.roomId
);
}
}
Expand All @@ -518,13 +513,16 @@ export class SimpleServer {
message.batchId,
UpdateStatusCode.PermissionDenied,
message.crdt,
message.roomId,
message.roomId
);
return;
}

const batch = {
data: Array.from({ length: message.fragmentCount }, () => new Uint8Array()),
data: Array.from(
{ length: message.fragmentCount },
() => new Uint8Array()
),
totalSize: message.totalSizeBytes,
received: 0,
header: message,
Expand All @@ -538,7 +536,7 @@ export class SimpleServer {
message.batchId,
UpdateStatusCode.FragmentTimeout,
message.crdt,
message.roomId,
message.roomId
);
}, 10000);

Expand All @@ -556,7 +554,7 @@ export class SimpleServer {
message.batchId,
UpdateStatusCode.FragmentTimeout,
message.crdt,
message.roomId,
message.roomId
);
return;
}
Expand Down Expand Up @@ -586,7 +584,7 @@ export class SimpleServer {
message.batchId,
UpdateStatusCode.PermissionDenied,
message.crdt,
message.roomId,
message.roomId
);
return;
}
Expand All @@ -598,7 +596,7 @@ export class SimpleServer {
message.batchId,
UpdateStatusCode.PermissionDenied,
message.crdt,
message.roomId,
message.roomId
);
client.fragments.delete(message.batchId);
return;
Expand All @@ -612,7 +610,7 @@ export class SimpleServer {
try {
const newDocumentData = roomDoc.descriptor.adaptor.applyUpdates(
roomDoc.data,
[totalData],
[totalData]
);
roomDoc.data = newDocumentData;
} catch (error) {
Expand All @@ -622,7 +620,7 @@ export class SimpleServer {
message.batchId,
UpdateStatusCode.InvalidUpdate,
message.crdt,
message.roomId,
message.roomId
);
client.fragments.delete(message.batchId);
return;
Expand All @@ -637,7 +635,7 @@ export class SimpleServer {
message.batchId,
UpdateStatusCode.Ok,
message.crdt,
message.roomId,
message.roomId
);

// Broadcast original fragments to other clients in the room
Expand Down Expand Up @@ -695,10 +693,7 @@ export class SimpleServer {

if (descriptor.shouldPersist && this.config.onLoadDocument) {
try {
const loaded = await this.config.onLoadDocument(
roomId,
crdtType
);
const loaded = await this.config.onLoadDocument(roomId, crdtType);
if (loaded) {
data = loaded;
}
Expand Down Expand Up @@ -799,9 +794,10 @@ export class SimpleServer {
return `${roomId}:${crdtType}`;
}

private parseRoomKey(
roomKey: string
): { roomId: string; crdtType: CrdtType } {
private parseRoomKey(roomKey: string): {
roomId: string;
crdtType: CrdtType;
} {
const sep = roomKey.lastIndexOf(":");
if (sep === -1) {
return { roomId: roomKey, crdtType: CrdtType.Loro };
Expand Down
Loading