Skip to content
224 changes: 154 additions & 70 deletions src/http/httpProxy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ const requests: Record<string, http.IncomingMessage> = {};
const responses: Record<string, http.ServerResponse> = {};
const minPort = 55000;
const maxPort = 55025;
const loopbackBindAddresses = ['127.0.0.1', '::1'];

const invocRequestEmitter = new EventEmitter();

Expand Down Expand Up @@ -87,87 +88,170 @@ function setCookies(userRes: HttpResponse, proxyRes: http.ServerResponse): void
}

export async function setupHttpProxy(): Promise<string> {
return new Promise((resolve, reject) => {
const server = http.createServer();

server.on('request', (req, res) => {
const invocationId = req.headers[invocationIdHeader];
if (typeof invocationId === 'string') {
requests[invocationId] = req;
responses[invocationId] = res;
invocRequestEmitter.emit(invocationId);
} else {
workerSystemLog('error', `Http proxy request missing header ${invocationIdHeader}`);
}
});
const bindAddress = await selectUsableLoopbackBindAddress();
const server = http.createServer();

server.on('error', (err) => {
err = ensureErrorType(err);
workerSystemLog('error', `Http proxy error: ${err.stack || err.message}`);
});
server.on('request', (req, res) => {
const invocationId = getInvocationId(req);
if (invocationId) {
requests[invocationId] = req;
responses[invocationId] = res;
invocRequestEmitter.emit(invocationId);
} else {
workerSystemLog('error', `Http proxy request missing or invalid header ${invocationIdHeader}`);
res.statusCode = 400;
res.end();
}
});

server.listen(() => {
const address = server.address();
// Valid address has been created
if (address !== null && typeof address === 'object') {
if (address.port === 0) {
// Auto-assigned port is 0, find and bind to an open port
workerSystemLog('debug', `Port 0 assigned. Finding open port.`);
findOpenPort((openPort: number) => {
// Close the server and re-listen on the found open port
server.close();
server.listen(openPort, () => {
workerSystemLog('debug', `Server is now listening on found open port: ${openPort}`);
});
resolve(`http://localhost:${openPort}/`);
});
} else {
// Auto-assigned port is not 0
workerSystemLog('debug', `Auto-assigned port is valid. Port: ${address.port}`);
resolve(`http://localhost:${address.port}/`);
}
} else {
reject(new AzFuncSystemError('Unexpected server address during http proxy setup'));
}
});
server.on('error', (err) => {
err = ensureErrorType(err);
workerSystemLog('error', `Http proxy error: ${err.stack || err.message}`);
});

server.on('close', () => {
workerSystemLog('information', 'Http proxy closing');
});
server.on('close', () => {
workerSystemLog('information', 'Http proxy closing');
});

await listenServer(server, 0, bindAddress);

const address = server.address();
if (address === null || typeof address !== 'object') {
throw new AzFuncSystemError('Unexpected server address during http proxy setup');
}

if (address.port === 0) {
workerSystemLog('debug', `Port 0 assigned. Finding open port.`);
const openPort = await findOpenPort(bindAddress);
await closeServer(server);
await listenServer(server, openPort, bindAddress);
workerSystemLog('debug', `Server is now listening on found open port: ${openPort}`);
return serializeHttpUri(bindAddress, openPort);
} else {
workerSystemLog('debug', `Auto-assigned port is valid. Port: ${address.port}`);
return serializeHttpUri(bindAddress, address.port);
}
}

// Function to find an open port starting from a specified port
function findOpenPort(callback: (port: number) => void): void {
const server = net.createServer();
function getInvocationId(req: http.IncomingMessage): string | undefined {
const invocationIdValues = getRawHeaderValues(req, invocationIdHeader);
if (invocationIdValues.length !== 1) {
return undefined;
}

const [invocationId] = invocationIdValues;
if (!invocationId || invocationId.trim() !== invocationId) {
return undefined;
}

function tryPort(port: number) {
if (port > maxPort) {
// If we've reached the maximum port, throw an error
throw new AzFuncSystemError(
`No available ports found between ${minPort} and ${maxPort}. To enable HTTP streaming, please open a port in this range.`
);
return invocationId;
}

function getRawHeaderValues(req: http.IncomingMessage, headerName: string): string[] {
const values: string[] = [];
for (let i = 0; i < req.rawHeaders.length; i += 2) {
const rawHeaderName = req.rawHeaders[i];
const rawHeaderValue = req.rawHeaders[i + 1];
if (rawHeaderName && rawHeaderName.toLowerCase() === headerName && rawHeaderValue !== undefined) {
values.push(rawHeaderValue);
}
}

server.once('error', () => {
// If the port is unavailable, increment and try the next one
tryPort(port + 1);
});
return values;
}

// If the port is available, return it
server.once('listening', () => {
const address = server.address();
if (address !== null && typeof address === 'object') {
port = address.port;
server.close();
callback(port);
}
});
async function selectUsableLoopbackBindAddress(): Promise<string> {
for (const bindAddress of loopbackBindAddresses) {
if (await canListenOnAddress(bindAddress)) {
return bindAddress;
}
}

throw new AzFuncSystemError('Unable to find a usable loopback address for HTTP streaming.');
}

async function canListenOnAddress(bindAddress: string): Promise<boolean> {
const server = net.createServer();
try {
await listenServer(server, 0, bindAddress);
await closeServer(server);
return true;
} catch (err) {
const error = ensureErrorType(err);
const code = (<NodeJS.ErrnoException>error).code;
if (code === 'EADDRNOTAVAIL' || code === 'EAFNOSUPPORT') {
return false;
}

throw error;
}
}

function serializeHttpUri(bindAddress: string, port: number): string {
const host = net.isIPv6(bindAddress) ? `[${bindAddress}]` : bindAddress;
return `http://${host}:${port}/`;
}

// Try binding to the given port
server.listen(port);
async function findOpenPort(bindAddress: string, port = minPort): Promise<number> {
if (port > maxPort) {
throw new AzFuncSystemError(
`No available ports found between ${minPort} and ${maxPort}. To enable HTTP streaming, please open a port in this range.`
);
}

// Start trying from the specified starting port
tryPort(minPort);
const server = net.createServer();
try {
await listenServer(server, port, bindAddress);
const address = server.address();
if (address !== null && typeof address === 'object') {
const openPort = address.port;
await closeServer(server);
return openPort;
} else {
throw new AzFuncSystemError('Unexpected server address while finding an open port');
}
} catch (err) {
const error = ensureErrorType(err);
if ((<NodeJS.ErrnoException>error).code === 'EADDRINUSE') {
return findOpenPort(bindAddress, port + 1);
}

throw error;
}
}

async function listenServer(server: net.Server, port: number, host: string): Promise<void> {
return new Promise((resolve, reject) => {
const onListening = () => {
server.off('error', onError);
resolve();
};
const onError = (err: Error) => {
server.off('listening', onListening);
reject(err);
};

server.once('listening', onListening);
server.once('error', onError);

try {
server.listen(port, host);
} catch (err) {
server.off('listening', onListening);
server.off('error', onError);
reject(err);
}
});
}

async function closeServer(server: net.Server): Promise<void> {
return new Promise((resolve, reject) => {
server.close((err) => {
if (err) {
reject(err);
} else {
resolve();
}
});
});
}
120 changes: 120 additions & 0 deletions test/InvocationModel.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,134 @@
import 'mocha';
import { RpcLogCategory, RpcLogLevel } from '@azure/functions-core';
import { expect } from 'chai';
import * as sinon from 'sinon';
import { Readable } from 'stream';
import { InvocationContext, McpContent, McpImageContent } from '../src';
import * as httpProxy from '../src/http/httpProxy';
import { InvocationModel } from '../src/InvocationModel';
import { setup } from '../src/setup';

function testLog(_level: RpcLogLevel, _category: RpcLogCategory, message: string) {
console.log(message);
}

describe('InvocationModel', () => {
describe('getArguments', () => {
afterEach(() => {
setup({ enableHttpStream: false });
sinon.restore();
});

it('builds a stream-backed HttpRequest from forwarded headers and keeps the gRPC response shape', async () => {
setup({ enableHttpStream: true });

const proxyReq = Object.assign(Readable.from([JSON.stringify({ hello: 'world' })]), {
headers: {
'x-forwarded-host': 'internal.example.com',
'x-forwarded-proto': 'https',
},
method: 'POST',
url: '/api/categories/fiction/products/abc?source=request',
});
const waitForProxyRequestStub = sinon.stub(httpProxy, 'waitForProxyRequest').resolves(proxyReq as never);
const sendProxyResponseStub = sinon.stub(httpProxy, 'sendProxyResponse').resolves();

const model = new InvocationModel({
invocationId: 'streamInvocId',
metadata: {
name: 'streamFunc',
bindings: {
httpTrigger1: {
type: 'httpTrigger',
direction: 'in',
},
$return: {
type: 'http',
direction: 'out',
},
},
},
request: {
inputData: [
{
name: 'httpTrigger1',
},
],
triggerMetadata: {
Headers: {
json: JSON.stringify({
'content-type': 'application/json',
'x-original-header': 'from-trigger-metadata',
}),
},
Query: {
json: JSON.stringify({
source: 'trigger-metadata',
}),
},
category: {
string: 'fiction',
},
productId: {
string: 'abc',
},
},
},
log: testLog,
});

const { context, inputs } = await model.getArguments();
const invocationContext = context as InvocationContext;
sinon.assert.calledOnceWithExactly(waitForProxyRequestStub, 'streamInvocId');

expect(inputs).to.have.length(1);
const req = inputs[0] as {
url: string;
params: Record<string, string>;
headers: Headers;
query: URLSearchParams;
json(): Promise<unknown>;
};
expect(req.url).to.equal('https://internal.example.com/api/categories/fiction/products/abc?source=request');
expect(req.params).to.deep.equal({
category: 'fiction',
productId: 'abc',
});
expect(req.headers.get('content-type')).to.equal('application/json');
expect(req.headers.get('x-original-header')).to.equal('from-trigger-metadata');
expect(req.query.get('source')).to.equal('request');
expect(await req.json()).to.deep.equal({ hello: 'world' });

expect(invocationContext.invocationId).to.equal('streamInvocId');
const response = await model.getResponse(invocationContext, {
status: 202,
headers: {
'x-streaming-enabled': 'true',
},
body: 'accepted',
});

sinon.assert.calledOnce(sendProxyResponseStub);
const [proxyInvocationId, proxyResponse] = sendProxyResponseStub.firstCall.args as [
string,
{
status: number;
headers: Headers;
text(): Promise<string>;
}
];
expect(proxyInvocationId).to.equal('streamInvocId');
expect(proxyResponse.status).to.equal(202);
expect(proxyResponse.headers.get('x-streaming-enabled')).to.equal('true');
expect(await proxyResponse.text()).to.equal('accepted');
expect(response).to.deep.equal({
invocationId: 'streamInvocId',
outputData: [],
returnValue: undefined,
});
});
});

describe('getResponse', () => {
it('Hello world http', async () => {
const model = new InvocationModel({
Expand Down
Loading
Loading