Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 29 additions & 17 deletions src/components/P2P/handleProtocolCommands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,19 @@ export class ReadableString extends Readable {
export async function handleProtocolCommands(stream: Stream, connection: Connection) {
const { remotePeer, remoteAddr } = connection

// Pause the stream. We do async operations here before writing.
stream.pause()

P2P_LOGGER.logMessage('Incoming connection from peer ' + remotePeer, true)
P2P_LOGGER.logMessage('Using ' + remoteAddr, true)

const getErrorMessage = (err: unknown): string => {
if (err instanceof Error) return err.message || err.name || 'Unknown error'
if (typeof err === 'string') return err
try {
return JSON.stringify(err)
} catch {
return String(err)
}
}

const sendErrorAndClose = async (httpStatus: number, error: string) => {
try {
// Check if stream is already closed
Expand All @@ -48,13 +55,15 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect
return
}

// Resume stream in case it's paused - we need to write
stream.resume()
const status = { httpStatus, error }
stream.send(uint8ArrayFromString(JSON.stringify(status)))
try {
stream.send(uint8ArrayFromString(JSON.stringify(status)))
} catch {
return
}
await stream.close()
} catch (e) {
P2P_LOGGER.error(`Error sending error response: ${e.message}`)
P2P_LOGGER.error(`Error sending error response: ${getErrorMessage(e)}`)
try {
stream.abort(e as Error)
} catch {}
Expand Down Expand Up @@ -90,9 +99,6 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect
return
}

// Resume the stream. We can now write.
stream.resume()

// v3 streams are AsyncIterable
let task: Command
try {
Expand All @@ -108,7 +114,7 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect
} catch (err) {
P2P_LOGGER.log(
LOG_LEVELS_STR.LEVEL_ERROR,
`Unable to process P2P command: ${err.message}`
`Unable to process P2P command: ${getErrorMessage(err)}`
)
await sendErrorAndClose(400, 'Invalid command')
return
Expand All @@ -129,9 +135,10 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect
return
}

let response: P2PCommandResponse | undefined
try {
handler.getOceanNode().setRemoteCaller(remotePeer.toString())
const response: P2PCommandResponse = await handler.handle(task)
response = await handler.handle(task)

// Send status first
stream.send(uint8ArrayFromString(JSON.stringify(response.status)))
Expand All @@ -143,21 +150,26 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect

// Handle backpressure - if send returns false, wait for drain
if (!stream.send(bytes)) {
await stream.onDrain({
signal: AbortSignal.timeout(30000) // 30 second timeout for drain
})
await stream.onDrain()
}
}
}

await stream.close()
} catch (err) {
const errMsg = getErrorMessage(err)
P2P_LOGGER.logMessageWithEmoji(
'handleProtocolCommands Error: ' + err.message,
'handleProtocolCommands Error: ' + errMsg,
true,
GENERIC_EMOJIS.EMOJI_CROSS_MARK,
LOG_LEVELS_STR.LEVEL_ERROR
)
await sendErrorAndClose(500, err.message)
try {
;(response?.stream as any)?.destroy?.(err as Error)
} catch {}
// Avoid a second error if the peer already closed the stream.
if (stream.status !== 'closed' && stream.status !== 'closing') {
await sendErrorAndClose(500, errMsg)
}
}
}
Loading