Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/veria-307-ws-close-state.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'mppx': patch
---

Blocked WebSocket session metering after channel close requests.
71 changes: 71 additions & 0 deletions src/tempo/session/Ws.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -407,4 +407,75 @@ describe('isows', () => {
expect(channel?.spent).toBe(0n)
expect(channel?.units).toBe(0)
})

test('does not meter or emit application messages after close is requested on-chain', async () => {
const socket = new MockSocket()
const store = memoryChannelStore()
await store.updateChannel(channelId, () => ({
channelId,
payer: '0x0000000000000000000000000000000000000001' as Address,
payee: '0x0000000000000000000000000000000000000002' as Address,
token: '0x0000000000000000000000000000000000000003' as Address,
authorizedSigner: '0x0000000000000000000000000000000000000004' as Address,
chainId: 42431,
escrowContract: '0x0000000000000000000000000000000000000005' as Address,
deposit: 1n,
settledOnChain: 0n,
highestVoucherAmount: 1n,
highestVoucher: null,
spent: 0n,
units: 0,
closeRequestedAt: 1n,
finalized: false,
createdAt: new Date().toISOString(),
}))

await Ws.serve({
socket,
store,
url: 'ws://example.test/stream',
route: async () => ({
status: 200,
withReceipt(response = new Response(null, { status: 204 })) {
response.headers.set(
'Payment-Receipt',
serializeSessionReceipt(
createSessionReceipt({
challengeId: challenge.id,
channelId,
acceptedCumulative: 1n,
spent: 0n,
units: 0,
}),
),
)
return response
},
}),
generate: (async function* () {
yield 'should-not-reach'
})(),
})

socket.receive(
Ws.formatAuthorizationMessage(
makeCredential({
action: 'open',
channelId,
cumulativeAmount: '1',
signature: `0x${'77'.repeat(65)}`,
transaction: '0x01',
type: 'transaction',
}),
),
)

await sleep(100)

expect(socket.closed).toBe(true)
expect(socket.sent.some((message) => message.includes('should-not-reach'))).toBe(false)
const channel = await store.getChannel(channelId)
expect(channel?.spent).toBe(0n)
expect(channel?.units).toBe(0)
})
})
13 changes: 13 additions & 0 deletions src/tempo/session/Ws.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import * as Credential from '../../Credential.js'
import { ChannelClosedError } from '../../Errors.js'
import * as ChannelStore from './ChannelStore.js'
import { deserializeSessionReceipt } from './Receipt.js'
import { createSessionReceipt } from './Receipt.js'
Expand Down Expand Up @@ -405,6 +406,7 @@ async function reserveChargeOrWait(options: {

let channel = await store.getChannel(channelId)
if (!channel) throw new Error('channel not found')
throwIfChannelClosed(channel)

const hasHeadroom = (state: ChannelStore.State) =>
state.highestVoucherAmount - state.spent - reservedAmount >= amount
Expand All @@ -424,6 +426,7 @@ async function reserveChargeOrWait(options: {
await waitForUpdate(store, channelId, pollIntervalMs, signal)
channel = await store.getChannel(channelId)
if (!channel) throw new Error('channel not found')
throwIfChannelClosed(channel)
}
}

Expand All @@ -440,6 +443,7 @@ async function commitReservedCharges(options: {
const channel = await store.updateChannel(channelId, (current) => {
if (!current) return null
if (current.finalized) return current
if (current.closeRequestedAt !== 0n) return current
if (current.highestVoucherAmount - current.spent < amount) return current
committed = true
return {
Expand All @@ -450,9 +454,18 @@ async function commitReservedCharges(options: {
})

if (!channel) throw new Error('channel not found')
if (channel.finalized) throw new ChannelClosedError({ reason: 'channel is finalized' })
if (channel.closeRequestedAt !== 0n)
throw new ChannelClosedError({ reason: 'channel has a pending close request' })
if (!committed) throw new Error('reserved voucher coverage is no longer available')
}

function throwIfChannelClosed(channel: ChannelStore.State): void {
if (channel.finalized) throw new ChannelClosedError({ reason: 'channel is finalized' })
if (channel.closeRequestedAt !== 0n)
throw new ChannelClosedError({ reason: 'channel has a pending close request' })
}

async function waitForUpdate(
store: ChannelStore.ChannelStore,
channelId: SessionCredentialPayload['channelId'],
Expand Down
Loading