Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions js/lite/src/connection/accept.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ export async function accept(transport: WebTransport, url: URL, props?: AcceptPr
return acceptAlpnVersion(transport, url, Ietf.Version.DRAFT_16);
} else if (protocol === Ietf.ALPN.DRAFT_15) {
return acceptAlpnVersion(transport, url, Ietf.Version.DRAFT_15);
} else if (protocol === Lite.ALPN_04) {
return new Lite.Connection(url, transport, Lite.Version.DRAFT_04, undefined);
} else if (protocol === Lite.ALPN_03) {
return new Lite.Connection(url, transport, Lite.Version.DRAFT_03, undefined);
} else if (protocol === Lite.ALPN || protocol === "" || protocol === undefined) {
Expand Down
5 changes: 5 additions & 0 deletions js/lite/src/connection/connect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,9 @@ export async function connect(url: URL, props?: ConnectProps): Promise<Establish
setupVersion = Ietf.Version.DRAFT_16;
} else if (protocol === Ietf.ALPN.DRAFT_15) {
setupVersion = Ietf.Version.DRAFT_15;
} else if (protocol === Lite.ALPN_04) {
// moq-lite draft-04 doesn't use a session stream, so we return immediately.
return new Lite.Connection(url, session, Lite.Version.DRAFT_04, undefined);
} else if (protocol === Lite.ALPN_03) {
// moq-lite draft-03 doesn't use a session stream, so we return immediately.
return new Lite.Connection(url, session, Lite.Version.DRAFT_03, undefined);
Expand Down Expand Up @@ -273,6 +276,8 @@ async function connectTransport(url: URL, session: WebTransport): Promise<Establ
setupVersion = Ietf.Version.DRAFT_16;
} else if (protocol === Ietf.ALPN.DRAFT_15) {
setupVersion = Ietf.Version.DRAFT_15;
} else if (protocol === Lite.ALPN_04) {
return new Lite.Connection(url, session, Lite.Version.DRAFT_04, undefined);
} else if (protocol === Lite.ALPN_03) {
return new Lite.Connection(url, session, Lite.Version.DRAFT_03, undefined);
} else if (protocol === Lite.ALPN || protocol === "" || protocol === undefined) {
Expand Down
101 changes: 77 additions & 24 deletions js/lite/src/lite/announce.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
import * as Path from "../path.ts";
import type { Reader, Writer } from "../stream.ts";
import { unreachable } from "../util/error.ts";
import * as Message from "./message.ts";
import { Version } from "./version.ts";

const MAX_HOPS = 32;

export class Announce {
suffix: Path.Valid;
active: boolean;
hops: number;

constructor(props: { suffix: Path.Valid; active: boolean; hops?: number }) {
/// Ordered origin path. Draft03 populates with 0n (UNKNOWN) entries; Draft04+ uses real IDs.
hops: bigint[];

constructor(props: { suffix: Path.Valid; active: boolean; hops?: bigint[] }) {
this.suffix = props.suffix;
this.active = props.active;
this.hops = props.hops ?? 0;
this.hops = props.hops ?? [];
}

async #encode(w: Writer, version: Version) {
Expand All @@ -21,30 +24,56 @@ export class Announce {

switch (version) {
case Version.DRAFT_03:
await w.u53(this.hops);
if (this.hops.length > MAX_HOPS) {
throw new Error(`hop count ${this.hops.length} exceeds maximum of ${MAX_HOPS}`);
}
await w.u53(this.hops.length);
break;
case Version.DRAFT_01:
case Version.DRAFT_02:
break;
default:
unreachable(version);
if (this.hops.length > MAX_HOPS) {
throw new Error(`hop count ${this.hops.length} exceeds maximum of ${MAX_HOPS}`);
}
await w.u53(this.hops.length);
for (const hop of this.hops) {
await w.u62(hop);
}
break;
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}
}

static async #decode(r: Reader, version: Version): Promise<Announce> {
const active = await r.bool();
const suffix = Path.from(await r.string());

let hops = 0;
const hops: bigint[] = [];
switch (version) {
case Version.DRAFT_03:
hops = await r.u53();
case Version.DRAFT_03: {
// Read count but don't know actual IDs; use 0 as unknown placeholder.
const count = await r.u53();
if (count > MAX_HOPS) {
throw new Error(`hop count ${count} exceeds maximum of ${MAX_HOPS}`);
}
for (let i = 0; i < count; i++) {
hops.push(0n);
}
break;
}
case Version.DRAFT_01:
case Version.DRAFT_02:
break;
default:
unreachable(version);
default: {
const count = await r.u53();
if (count > MAX_HOPS) {
throw new Error(`hop count ${count} exceeds maximum of ${MAX_HOPS}`);
}
for (let i = 0; i < count; i++) {
hops.push(await r.u62());
}
break;
}
}

return new Announce({ suffix, active, hops });
Expand All @@ -66,31 +95,57 @@ export class Announce {
export class AnnounceInterest {
prefix: Path.Valid;

constructor(prefix: Path.Valid) {
this.prefix = prefix;
/// Filter out announces whose hops contain this hop ID. 0n means no filtering.
excludeHop: bigint;

constructor(props: { prefix: Path.Valid; excludeHop?: bigint }) {
this.prefix = props.prefix;
this.excludeHop = props.excludeHop ?? 0n;
}

async #encode(w: Writer) {
async #encode(w: Writer, version: Version) {
await w.string(this.prefix);

switch (version) {
case Version.DRAFT_01:
case Version.DRAFT_02:
case Version.DRAFT_03:
break;
default:
await w.u62(this.excludeHop);
break;
}
}

static async #decode(r: Reader): Promise<AnnounceInterest> {
static async #decode(r: Reader, version: Version): Promise<AnnounceInterest> {
const prefix = Path.from(await r.string());
return new AnnounceInterest(prefix);

let excludeHop = 0n;
switch (version) {
case Version.DRAFT_01:
case Version.DRAFT_02:
case Version.DRAFT_03:
break;
default:
excludeHop = await r.u62();
break;
}

return new AnnounceInterest({ prefix, excludeHop });
}

async encode(w: Writer): Promise<void> {
return Message.encode(w, this.#encode.bind(this));
async encode(w: Writer, version: Version): Promise<void> {
return Message.encode(w, (w) => this.#encode(w, version));
}

static async decode(r: Reader): Promise<AnnounceInterest> {
return Message.decode(r, AnnounceInterest.#decode);
static async decode(r: Reader, version: Version): Promise<AnnounceInterest> {
return Message.decode(r, (r) => AnnounceInterest.#decode(r, version));
}
}

/// Sent after setup to communicate the initially announced paths.
///
/// Used by Draft01/Draft02 only. Draft03 uses individual Announce messages instead.
/// Used by Draft01/Draft02 only. Draft03+ uses individual Announce messages instead.
export class AnnounceInit {
suffixes: Path.Valid[];

Expand All @@ -103,10 +158,8 @@ export class AnnounceInit {
case Version.DRAFT_01:
case Version.DRAFT_02:
break;
case Version.DRAFT_03:
throw new Error("announce init not supported for Draft03");
default:
unreachable(version);
throw new Error("announce init not supported for this version");
}
}

Expand Down
2 changes: 1 addition & 1 deletion js/lite/src/lite/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ export class Connection implements Established {
if (typ === StreamId.Session) {
throw new Error("duplicate session stream");
} else if (typ === StreamId.Announce) {
const msg = await AnnounceInterest.decode(stream.reader);
const msg = await AnnounceInterest.decode(stream.reader, this.#version);
await this.#publisher.runAnnounce(msg, stream);
return;
} else if (typ === StreamId.Subscribe) {
Expand Down
6 changes: 2 additions & 4 deletions js/lite/src/lite/fetch.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
import * as Path from "../path.ts";
import type { Reader, Writer } from "../stream.ts";
import { unreachable } from "../util/error.ts";
import * as Message from "./message.ts";
import { Version } from "./version.ts";

function guardDraft03(version: Version) {
switch (version) {
case Version.DRAFT_03:
break;
case Version.DRAFT_01:
case Version.DRAFT_02:
throw new Error("fetch not supported for this version");
default:
unreachable(version);
// DRAFT_03+
break;
}
}

Expand Down
6 changes: 2 additions & 4 deletions js/lite/src/lite/probe.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
import type { Reader, Writer } from "../stream.ts";
import { unreachable } from "../util/error.ts";
import * as Message from "./message.ts";
import { Version } from "./version.ts";

function guardDraft03(version: Version) {
switch (version) {
case Version.DRAFT_03:
break;
case Version.DRAFT_01:
case Version.DRAFT_02:
throw new Error("probe not supported for this version");
default:
unreachable(version);
// DRAFT_03+
break;
}
}

Expand Down
14 changes: 7 additions & 7 deletions js/lite/src/lite/publisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,19 +83,19 @@ export class Publisher {
}

switch (this.version) {
case Version.DRAFT_03:
// Draft03: send individual Announce messages for initial state.
for (const suffix of active) {
const wire = new Announce({ suffix, active: true });
await wire.encode(stream.writer, this.version);
}
break;
case Version.DRAFT_01:
case Version.DRAFT_02: {
const init = new AnnounceInit([...active]);
await init.encode(stream.writer, this.version);
break;
}
default:
// Draft03+: send individual Announce messages for initial state.
for (const suffix of active) {
const wire = new Announce({ suffix, active: true });
await wire.encode(stream.writer, this.version);
}
break;
}

// Wait for updates to the broadcasts.
Expand Down
6 changes: 2 additions & 4 deletions js/lite/src/lite/session.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import type { Reader, Writer } from "../stream.ts";
import { unreachable } from "../util/error.ts";
import * as Message from "./message.ts";
import { Version } from "./version.ts";

Expand Down Expand Up @@ -132,10 +131,9 @@ export class SessionInfo {
case Version.DRAFT_01:
case Version.DRAFT_02:
break;
case Version.DRAFT_03:
throw new Error("session info not supported for Draft03");
default:
unreachable(version);
// DRAFT_03+: session info not supported
throw new Error("session info not supported for this version");
}
}

Expand Down
Loading
Loading