[node-bridge] Support streaming response for Serverless Function (#8795)

Adds streaming response support for React Server Components with Next 13.
This commit is contained in:
Nathan Rajlich
2022-11-01 19:16:37 -07:00
committed by GitHub
parent 11d0091393
commit 301bcf58fb
17 changed files with 588 additions and 183 deletions

View File

@@ -23,6 +23,7 @@ export interface LambdaOptionsBase {
regions?: string[]; regions?: string[];
supportsMultiPayloads?: boolean; supportsMultiPayloads?: boolean;
supportsWrapper?: boolean; supportsWrapper?: boolean;
experimentalResponseStreaming?: boolean;
} }
export interface LambdaOptionsWithFiles extends LambdaOptionsBase { export interface LambdaOptionsWithFiles extends LambdaOptionsBase {
@@ -60,6 +61,7 @@ export class Lambda {
zipBuffer?: Buffer; zipBuffer?: Buffer;
supportsMultiPayloads?: boolean; supportsMultiPayloads?: boolean;
supportsWrapper?: boolean; supportsWrapper?: boolean;
experimentalResponseStreaming?: boolean;
constructor(opts: LambdaOptions) { constructor(opts: LambdaOptions) {
const { const {
@@ -72,6 +74,7 @@ export class Lambda {
regions, regions,
supportsMultiPayloads, supportsMultiPayloads,
supportsWrapper, supportsWrapper,
experimentalResponseStreaming,
} = opts; } = opts;
if ('files' in opts) { if ('files' in opts) {
assert(typeof opts.files === 'object', '"files" must be an object'); assert(typeof opts.files === 'object', '"files" must be an object');
@@ -132,6 +135,7 @@ export class Lambda {
this.zipBuffer = 'zipBuffer' in opts ? opts.zipBuffer : undefined; this.zipBuffer = 'zipBuffer' in opts ? opts.zipBuffer : undefined;
this.supportsMultiPayloads = supportsMultiPayloads; this.supportsMultiPayloads = supportsMultiPayloads;
this.supportsWrapper = supportsWrapper; this.supportsWrapper = supportsWrapper;
this.experimentalResponseStreaming = experimentalResponseStreaming;
} }
async createZip(): Promise<Buffer> { async createZip(): Promise<Buffer> {

View File

@@ -338,6 +338,7 @@ export async function serverBuild({
const apiPages: string[] = []; const apiPages: string[] = [];
const nonApiPages: string[] = []; const nonApiPages: string[] = [];
const streamingPages: string[] = [];
lambdaPageKeys.forEach(page => { lambdaPageKeys.forEach(page => {
if ( if (
@@ -359,6 +360,8 @@ export async function serverBuild({
if (pageMatchesApi(page)) { if (pageMatchesApi(page)) {
apiPages.push(page); apiPages.push(page);
} else if (appDir && lambdaAppPaths[page]) {
streamingPages.push(page);
} else { } else {
nonApiPages.push(page); nonApiPages.push(page);
} }
@@ -546,7 +549,12 @@ export async function serverBuild({
const compressedPages: { const compressedPages: {
[page: string]: PseudoFile; [page: string]: PseudoFile;
} = {}; } = {};
const mergedPageKeys = [...nonApiPages, ...apiPages, ...internalPages]; const mergedPageKeys = [
...nonApiPages,
...streamingPages,
...apiPages,
...internalPages,
];
const traceCache = {}; const traceCache = {};
const getOriginalPagePath = (page: string) => { const getOriginalPagePath = (page: string) => {
@@ -704,6 +712,27 @@ export async function serverBuild({
pageExtensions, pageExtensions,
}); });
const streamingPageLambdaGroups = await getPageLambdaGroups({
entryPath: requiredServerFilesManifest.appDir || entryPath,
config,
pages: streamingPages,
prerenderRoutes,
pageTraces,
compressedPages,
tracedPseudoLayer: tracedPseudoLayer.pseudoLayer,
initialPseudoLayer,
lambdaCompressedByteLimit,
initialPseudoLayerUncompressed: uncompressedInitialSize,
internalPages,
pageExtensions,
});
for (const group of streamingPageLambdaGroups) {
if (!group.isPrerenders) {
group.isStreaming = true;
}
}
const apiLambdaGroups = await getPageLambdaGroups({ const apiLambdaGroups = await getPageLambdaGroups({
entryPath: requiredServerFilesManifest.appDir || entryPath, entryPath: requiredServerFilesManifest.appDir || entryPath,
config, config,
@@ -733,13 +762,23 @@ export async function serverBuild({
pseudoLayerBytes: group.pseudoLayerBytes, pseudoLayerBytes: group.pseudoLayerBytes,
uncompressedLayerBytes: group.pseudoLayerUncompressedBytes, uncompressedLayerBytes: group.pseudoLayerUncompressedBytes,
})), })),
streamingPageLambdaGroups: streamingPageLambdaGroups.map(group => ({
pages: group.pages,
isPrerender: group.isPrerenders,
pseudoLayerBytes: group.pseudoLayerBytes,
uncompressedLayerBytes: group.pseudoLayerUncompressedBytes,
})),
nextServerLayerSize: initialPseudoLayer.pseudoLayerBytes, nextServerLayerSize: initialPseudoLayer.pseudoLayerBytes,
}, },
null, null,
2 2
) )
); );
const combinedGroups = [...pageLambdaGroups, ...apiLambdaGroups]; const combinedGroups = [
...pageLambdaGroups,
...streamingPageLambdaGroups,
...apiLambdaGroups,
];
await detectLambdaLimitExceeding( await detectLambdaLimitExceeding(
combinedGroups, combinedGroups,
@@ -832,6 +871,7 @@ export async function serverBuild({
memory: group.memory, memory: group.memory,
runtime: nodeVersion.runtime, runtime: nodeVersion.runtime,
maxDuration: group.maxDuration, maxDuration: group.maxDuration,
isStreaming: group.isStreaming,
}); });
for (const page of group.pages) { for (const page of group.pages) {

View File

@@ -748,6 +748,7 @@ export async function createPseudoLayer(files: {
interface CreateLambdaFromPseudoLayersOptions extends LambdaOptionsWithFiles { interface CreateLambdaFromPseudoLayersOptions extends LambdaOptionsWithFiles {
layers: PseudoLayer[]; layers: PseudoLayer[];
isStreaming?: boolean;
} }
// measured with 1, 2, 5, 10, and `os.cpus().length || 5` // measured with 1, 2, 5, 10, and `os.cpus().length || 5`
@@ -757,6 +758,7 @@ const createLambdaSema = new Sema(1);
export async function createLambdaFromPseudoLayers({ export async function createLambdaFromPseudoLayers({
files: baseFiles, files: baseFiles,
layers, layers,
isStreaming,
...lambdaOptions ...lambdaOptions
}: CreateLambdaFromPseudoLayersOptions) { }: CreateLambdaFromPseudoLayersOptions) {
await createLambdaSema.acquire(); await createLambdaSema.acquire();
@@ -791,6 +793,11 @@ export async function createLambdaFromPseudoLayers({
return new NodejsLambda({ return new NodejsLambda({
...lambdaOptions, ...lambdaOptions,
...(isStreaming
? {
experimentalResponseStreaming: true,
}
: {}),
files, files,
shouldAddHelpers: false, shouldAddHelpers: false,
shouldAddSourcemapSupport: false, shouldAddSourcemapSupport: false,
@@ -1273,6 +1280,7 @@ export type LambdaGroup = {
pages: string[]; pages: string[];
memory?: number; memory?: number;
maxDuration?: number; maxDuration?: number;
isStreaming?: boolean;
isPrerenders?: boolean; isPrerenders?: boolean;
pseudoLayer: PseudoLayer; pseudoLayer: PseudoLayer;
pseudoLayerBytes: number; pseudoLayerBytes: number;

View File

@@ -1,9 +0,0 @@
'use client';
export default function LazyComponent() {
return (
<>
<p>hello from lazy</p>
</>
);
}

View File

@@ -1,10 +0,0 @@
import { ClientComponent } from './test.js';
export default function DashboardIndexPage() {
return (
<>
<p>hello from app/dashboard/index</p>
<ClientComponent />
</>
);
}

View File

@@ -1,15 +0,0 @@
'use client';
import { useState, lazy } from 'react';
const Lazy = lazy(() => import('./lazy.js'));
export function ClientComponent() {
let [state] = useState('use client');
return (
<>
<Lazy />
<p className="hi">hello from modern the {state}</p>
</>
);
}

View File

@@ -1,9 +0,0 @@
'use client';
export default function LazyComponent() {
return (
<>
<p>hello from lazy</p>
</>
);
}

View File

@@ -1,10 +0,0 @@
import { ClientComponent } from './test.js';
export default function DashboardIndexPage() {
return (
<>
<p>hello from app/dashboard/index</p>
<ClientComponent />
</>
);
}

View File

@@ -1,15 +0,0 @@
'use client';
import { useState, lazy } from 'react';
const Lazy = lazy(() => import('./lazy.js'));
export function ClientComponent() {
let [state] = useState('use client');
return (
<>
<Lazy />
<p className="hi">hello from modern the {state}</p>
</>
);
}

View File

@@ -7,77 +7,80 @@ const runBuildLambda = require('../../../../test/lib/run-build-lambda');
jest.setTimeout(360000); jest.setTimeout(360000);
it('should build with app-dir correctly', async () => { // experimental appDir currently requires Node.js >= 16
const { buildResult } = await runBuildLambda( if (parseInt(process.versions.node.split('.')[0], 10) >= 16) {
path.join(__dirname, '../fixtures/00-app-dir') it('should build with app-dir correctly', async () => {
); const { buildResult } = await runBuildLambda(
path.join(__dirname, '../fixtures/00-app-dir')
);
const lambdas = new Set(); const lambdas = new Set();
for (const key of Object.keys(buildResult.output)) { for (const key of Object.keys(buildResult.output)) {
if (buildResult.output[key].type === 'Lambda') { if (buildResult.output[key].type === 'Lambda') {
lambdas.add(buildResult.output[key]); lambdas.add(buildResult.output[key]);
}
} }
}
expect(lambdas.size).toBe(2); expect(lambdas.size).toBe(2);
expect(buildResult.output['dashboard']).toBeDefined(); expect(buildResult.output['dashboard']).toBeDefined();
expect(buildResult.output['dashboard/another']).toBeDefined(); expect(buildResult.output['dashboard/another']).toBeDefined();
expect(buildResult.output['dashboard/changelog']).toBeDefined(); expect(buildResult.output['dashboard/changelog']).toBeDefined();
expect(buildResult.output['dashboard/deployments/[id]']).toBeDefined(); expect(buildResult.output['dashboard/deployments/[id]']).toBeDefined();
// prefixed static generation output with `/app` under dist server files // prefixed static generation output with `/app` under dist server files
expect(buildResult.output['dashboard'].type).toBe('Prerender'); expect(buildResult.output['dashboard'].type).toBe('Prerender');
expect(buildResult.output['dashboard'].fallback.fsPath).toMatch( expect(buildResult.output['dashboard'].fallback.fsPath).toMatch(
/server\/app\/dashboard\.html$/ /server\/app\/dashboard\.html$/
); );
expect(buildResult.output['dashboard.rsc'].type).toBe('Prerender'); expect(buildResult.output['dashboard.rsc'].type).toBe('Prerender');
expect(buildResult.output['dashboard.rsc'].fallback.fsPath).toMatch( expect(buildResult.output['dashboard.rsc'].fallback.fsPath).toMatch(
/server\/app\/dashboard\.rsc$/ /server\/app\/dashboard\.rsc$/
); );
expect(buildResult.output['dashboard/index/index'].type).toBe('Prerender'); expect(buildResult.output['dashboard/index/index'].type).toBe('Prerender');
expect(buildResult.output['dashboard/index/index'].fallback.fsPath).toMatch( expect(buildResult.output['dashboard/index/index'].fallback.fsPath).toMatch(
/server\/app\/dashboard\/index\.html$/ /server\/app\/dashboard\/index\.html$/
); );
expect(buildResult.output['dashboard/index.rsc'].type).toBe('Prerender'); expect(buildResult.output['dashboard/index.rsc'].type).toBe('Prerender');
expect(buildResult.output['dashboard/index.rsc'].fallback.fsPath).toMatch( expect(buildResult.output['dashboard/index.rsc'].fallback.fsPath).toMatch(
/server\/app\/dashboard\/index\.rsc$/ /server\/app\/dashboard\/index\.rsc$/
); );
}); });
it('should build with app-dir in edge runtime correctly', async () => { it('should build with app-dir in edge runtime correctly', async () => {
const { buildResult } = await runBuildLambda( const { buildResult } = await runBuildLambda(
path.join(__dirname, '../fixtures/00-app-dir-edge') path.join(__dirname, '../fixtures/00-app-dir-edge')
); );
const edgeFunctions = new Set(); const edgeFunctions = new Set();
for (const key of Object.keys(buildResult.output)) { for (const key of Object.keys(buildResult.output)) {
if (buildResult.output[key].type === 'EdgeFunction') { if (buildResult.output[key].type === 'EdgeFunction') {
edgeFunctions.add(buildResult.output[key]); edgeFunctions.add(buildResult.output[key]);
}
} }
}
expect(edgeFunctions.size).toBe(3); expect(edgeFunctions.size).toBe(3);
expect(buildResult.output['edge']).toBeDefined(); expect(buildResult.output['edge']).toBeDefined();
expect(buildResult.output['index']).toBeDefined(); expect(buildResult.output['index']).toBeDefined();
expect(buildResult.output['index/index']).toBeDefined(); expect(buildResult.output['index/index']).toBeDefined();
}); });
it('should show error from basePath with legacy monorepo build', async () => { it('should show error from basePath with legacy monorepo build', async () => {
let error; let error;
try { try {
await runBuildLambda(path.join(__dirname, 'legacy-monorepo-basepath')); await runBuildLambda(path.join(__dirname, 'legacy-monorepo-basepath'));
} catch (err) { } catch (err) {
error = err; error = err;
} }
console.error(error); console.error(error);
expect(error.message).toBe( expect(error.message).toBe(
'basePath can not be used with `builds` in vercel.json, use Project Settings to configure your monorepo instead' 'basePath can not be used with `builds` in vercel.json, use Project Settings to configure your monorepo instead'
); );
}); });
}
it('should build using server build', async () => { it('should build using server build', async () => {
const origLog = console.log; const origLog = console.log;

View File

@@ -1,4 +1,10 @@
const { URL } = require('url');
const { request } = require('http'); const { request } = require('http');
const { Socket } = require('net');
const { createCipheriv } = require('crypto');
const { pipeline, Transform } = require('stream');
const CRLF = `\r\n`;
/** /**
* If the `http.Server` handler function throws an error asynchronously, * If the `http.Server` handler function throws an error asynchronously,
@@ -17,9 +23,23 @@ process.on('unhandledRejection', err => {
*/ */
function normalizeProxyEvent(event) { function normalizeProxyEvent(event) {
let bodyBuffer; let bodyBuffer;
const { method, path, headers, encoding, body, payloads } = JSON.parse( /**
event.body * @type {import('./types').VercelProxyRequest}
); */
const payload = JSON.parse(event.body);
const {
method,
path,
headers,
encoding,
body,
payloads,
responseCallbackCipher,
responseCallbackCipherIV,
responseCallbackCipherKey,
responseCallbackStream,
responseCallbackUrl,
} = payload;
/** /**
* *
@@ -28,7 +48,7 @@ function normalizeProxyEvent(event) {
*/ */
const normalizeBody = b => { const normalizeBody = b => {
if (b) { if (b) {
if (encoding === 'base64') { if (typeof b === 'string' && encoding === 'base64') {
bodyBuffer = Buffer.from(b, encoding); bodyBuffer = Buffer.from(b, encoding);
} else if (encoding === undefined) { } else if (encoding === undefined) {
bodyBuffer = Buffer.from(b); bodyBuffer = Buffer.from(b);
@@ -42,13 +62,9 @@ function normalizeProxyEvent(event) {
}; };
if (payloads) { if (payloads) {
/** for (const p of payloads) {
* @param {{ body: string | Buffer }} payload p.body = normalizeBody(payload.body);
*/ }
const normalizePayload = payload => {
payload.body = normalizeBody(payload.body);
};
payloads.forEach(normalizePayload);
} }
bodyBuffer = normalizeBody(body); bodyBuffer = normalizeBody(body);
@@ -59,6 +75,11 @@ function normalizeProxyEvent(event) {
headers, headers,
body: bodyBuffer, body: bodyBuffer,
payloads, payloads,
responseCallbackCipher,
responseCallbackCipherIV,
responseCallbackCipherKey,
responseCallbackStream,
responseCallbackUrl,
}; };
} }
@@ -79,11 +100,23 @@ function normalizeAPIGatewayProxyEvent(event) {
bodyBuffer = Buffer.alloc(0); bodyBuffer = Buffer.alloc(0);
} }
return { isApiGateway: true, method, path, headers, body: bodyBuffer }; return {
body: bodyBuffer,
headers,
isApiGateway: true,
method,
path,
responseCallbackCipher: undefined,
responseCallbackCipherIV: undefined,
responseCallbackCipherKey: undefined,
responseCallbackStream: undefined,
responseCallbackUrl: undefined,
};
} }
/** /**
* @param {import('./types').VercelProxyEvent | import('aws-lambda').APIGatewayProxyEvent} event * @param {import('./types').VercelProxyEvent | import('aws-lambda').APIGatewayProxyEvent} event
* @return {import('./types').VercelProxyRequest}
*/ */
function normalizeEvent(event) { function normalizeEvent(event) {
if ('Action' in event) { if ('Action' in event) {
@@ -176,7 +209,7 @@ class Bridge {
* *
* @param {import('./types').VercelProxyEvent | import('aws-lambda').APIGatewayProxyEvent} event * @param {import('./types').VercelProxyEvent | import('aws-lambda').APIGatewayProxyEvent} event
* @param {import('aws-lambda').Context} context * @param {import('aws-lambda').Context} context
* @return {Promise<{statusCode: number, headers: import('http').IncomingHttpHeaders, body: string, encoding: 'base64'}>} * @return {Promise<import('./types').VercelProxyResponse>}
*/ */
async launcher(event, context) { async launcher(event, context) {
context.callbackWaitsForEmptyEventLoop = false; context.callbackWaitsForEmptyEventLoop = false;
@@ -268,6 +301,10 @@ class Bridge {
encoding: 'base64', encoding: 'base64',
}; };
} else { } else {
// TODO We expect this to error as it is possible to resolve to empty.
// For now it is not very important as we will only pass
// `responseCallbackUrl` in production.
// @ts-ignore
return this.handleEvent(normalizedEvent); return this.handleEvent(normalizedEvent);
} }
} }
@@ -275,11 +312,21 @@ class Bridge {
/** /**
* *
* @param {ReturnType<typeof normalizeEvent>} normalizedEvent * @param {ReturnType<typeof normalizeEvent>} normalizedEvent
* @return {Promise<{statusCode: number, headers: import('http').IncomingHttpHeaders, body: string, encoding: 'base64'}>} * @return {Promise<import('./types').VercelProxyResponse | import('./types').VercelStreamProxyResponse>}
*/ */
async handleEvent(normalizedEvent) { async handleEvent(normalizedEvent) {
const { port } = await this.listening; const { port } = await this.listening;
const { isApiGateway, method, headers, body } = normalizedEvent; const {
body,
headers,
isApiGateway,
method,
responseCallbackCipher,
responseCallbackCipherIV,
responseCallbackCipherKey,
responseCallbackStream,
responseCallbackUrl,
} = normalizedEvent;
let { path } = normalizedEvent; let { path } = normalizedEvent;
if (this.shouldStoreEvents) { if (this.shouldStoreEvents) {
@@ -288,41 +335,42 @@ class Bridge {
headers['x-now-bridge-request-id'] = reqId; headers['x-now-bridge-request-id'] = reqId;
} }
// eslint-disable-next-line consistent-return
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
let socket;
let cipher;
let url;
if (responseCallbackUrl) {
socket = new Socket();
url = new URL(responseCallbackUrl);
socket.connect(parseInt(url.port, 10), url.hostname);
socket.write(`${responseCallbackStream}${CRLF}`);
}
if (
responseCallbackCipher &&
responseCallbackCipherKey &&
responseCallbackCipherIV
) {
cipher = createCipheriv(
responseCallbackCipher,
Buffer.from(responseCallbackCipherKey, 'base64'),
Buffer.from(responseCallbackCipherIV, 'base64')
);
}
// if the path is improperly encoded we need to encode it or // if the path is improperly encoded we need to encode it or
// http.request will throw an error (related check: https://github.com/nodejs/node/blob/4ece669c6205ec78abfdadfe78869bbb8411463e/lib/_http_client.js#L84) // http.request will throw an error (related check: https://github.com/nodejs/node/blob/4ece669c6205ec78abfdadfe78869bbb8411463e/lib/_http_client.js#L84)
if (path && /[^\u0021-\u00ff]/.test(path)) { if (path && /[^\u0021-\u00ff]/.test(path)) {
path = encodeURI(path); path = encodeURI(path);
} }
const opts = { hostname: '127.0.0.1', port, path, method }; const req = request(
const req = request(opts, res => { { hostname: '127.0.0.1', port, path, method },
const response = res; socket && url && cipher
/** ? getStreamResponseCallback({ url, socket, cipher, resolve, reject })
* @type {Buffer[]} : getResponseCallback({ isApiGateway, resolve, reject })
*/ );
const respBodyChunks = [];
response.on('data', chunk => respBodyChunks.push(Buffer.from(chunk)));
response.on('error', reject);
response.on('end', () => {
const bodyBuffer = Buffer.concat(respBodyChunks);
delete response.headers.connection;
if (isApiGateway) {
delete response.headers['content-length'];
} else if (response.headers['content-length']) {
response.headers['content-length'] = String(bodyBuffer.length);
}
resolve({
statusCode: response.statusCode || 200,
headers: response.headers,
body: bodyBuffer.toString('base64'),
encoding: 'base64',
});
});
});
req.on('error', error => { req.on('error', error => {
setTimeout(() => { setTimeout(() => {
@@ -332,16 +380,10 @@ class Bridge {
}, 2); }, 2);
}); });
for (const [name, value] of Object.entries(headers)) { for (const [name, value] of getHeadersIterator(headers)) {
if (value === undefined) {
console.error(
`Skipping HTTP request header "${name}" because value is undefined`
);
continue;
}
try { try {
req.setHeader(name, value); req.setHeader(name, value);
} catch (err) { } catch (/** @type any */ err) {
console.error(`Skipping HTTP request header: "${name}: ${value}"`); console.error(`Skipping HTTP request header: "${name}: ${value}"`);
console.error(err.message); console.error(err.message);
} }
@@ -363,4 +405,106 @@ class Bridge {
} }
} }
/**
* Generates the streaming response callback which writes in the given socket client a raw
* HTTP Request message to later pipe the response body into the socket. It will pass request
* headers namespace and an additional header with the status code. Once everything is
* written it will destroy the socket and resolve to an empty object. If a cipher is given
* it will be used to pipe bytes.
*
* @type {(params: {
* url: import('url').URL,
* socket: import('net').Socket,
* cipher: import('crypto').Cipher
* resolve: (result: (Record<string, never>)) => void,
* reject: (err: Error) => void
* }) => (response: import("http").IncomingMessage) => void}
*/
function getStreamResponseCallback({ url, socket, cipher, resolve, reject }) {
return response => {
const chunked = new Transform();
chunked._transform = function (chunk, _, callback) {
this.push(Buffer.byteLength(chunk).toString(16) + CRLF);
this.push(chunk);
this.push(CRLF);
callback();
};
let headers = `Host: ${url.host}${CRLF}`;
headers += `transfer-encoding: chunked${CRLF}`;
headers += `x-vercel-status-code: ${response.statusCode || 200}${CRLF}`;
for (const [name, value] of getHeadersIterator(response.headers)) {
if (!['connection', 'transfer-encoding'].includes(name)) {
headers += `x-vercel-header-${name}: ${value}${CRLF}`;
}
}
cipher.write(`POST ${url.pathname} HTTP/1.1${CRLF}${headers}${CRLF}`);
pipeline(response, chunked, cipher, socket, err => {
if (err) return reject(err);
resolve({});
});
};
}
/**
* Generates the normal response callback which waits until the body is fully
* received before resolving the promise. It caches the entire body and resolve
* with an object that describes the response.
*
* @type {(params: {
* isApiGateway: boolean,
* resolve: (result: (import('./types').VercelProxyResponse)) => void,
* reject: (err: Error) => void
* }) => (response: import("http").IncomingMessage) => void}
*/
function getResponseCallback({ isApiGateway, resolve, reject }) {
return response => {
/**
* @type {Buffer[]}
*/
const respBodyChunks = [];
response.on('data', chunk => respBodyChunks.push(Buffer.from(chunk)));
response.on('error', reject);
response.on('end', () => {
const bodyBuffer = Buffer.concat(respBodyChunks);
delete response.headers.connection;
if (isApiGateway) {
delete response.headers['content-length'];
} else if (response.headers['content-length']) {
response.headers['content-length'] = String(bodyBuffer.length);
}
resolve({
statusCode: response.statusCode || 200,
headers: response.headers,
body: bodyBuffer.toString('base64'),
encoding: 'base64',
});
});
};
}
/**
* Get an iterator for the headers object and yield the name and value when
* the value is not undefined only.
*
* @type {(headers: import('http').IncomingHttpHeaders) =>
* Generator<[string, string | string[]], void, unknown>}
*/
function* getHeadersIterator(headers) {
for (const [name, value] of Object.entries(headers)) {
if (value === undefined) {
console.error(
`Skipping HTTP request header "${name}" because value is undefined`
);
continue;
}
yield [name, value];
}
}
module.exports = { Bridge }; module.exports = { Bridge };

View File

@@ -23,6 +23,8 @@
"devDependencies": { "devDependencies": {
"@types/aws-lambda": "8.10.19", "@types/aws-lambda": "8.10.19",
"@types/node": "*", "@types/node": "*",
"jsonlines": "0.1.1",
"test-listen": "1.1.0",
"typescript": "4.3.4" "typescript": "4.3.4"
} }
} }

View File

@@ -1,6 +1,10 @@
const assert = require('assert'); const assert = require('assert');
const crypto = require('crypto');
const jsonlines = require('jsonlines');
const { Server } = require('http'); const { Server } = require('http');
const { Bridge } = require('../bridge'); const { Bridge } = require('../bridge');
const { runServer } = require('./run-test-server');
const { runTcpServer } = require('./run-test-server');
test('port binding', async () => { test('port binding', async () => {
const server = new Server(); const server = new Server();
@@ -184,7 +188,7 @@ test('multi-payload handling', async () => {
}); });
test('consumeEvent', async () => { test('consumeEvent', async () => {
const mockListener = jest.fn((req, res) => { const mockListener = jest.fn((_, res) => {
res.end('hello'); res.end('hello');
}); });
@@ -222,7 +226,7 @@ test('consumeEvent', async () => {
}); });
test('consumeEvent and handle decoded path', async () => { test('consumeEvent and handle decoded path', async () => {
const mockListener = jest.fn((req, res) => { const mockListener = jest.fn((_, res) => {
res.end('hello'); res.end('hello');
}); });
@@ -295,3 +299,180 @@ test('invalid request headers', async () => {
server.close(); server.close();
}); });
test('`NowProxyEvent` proxy streaming with a sync handler', async () => {
const cipherParams = {
cipher: 'aes-256-ctr',
cipherIV: crypto.randomBytes(16),
cipherKey: crypto.randomBytes(32),
};
const effects = {
callbackPayload: undefined,
callbackStream: undefined,
};
const { deferred, resolve } = createDeferred();
const httpServer = await runServer({
handler: (req, res) => {
const chunks = [];
req.on('data', chunk => {
chunks.push(chunk.toString());
});
req.on('close', () => {
effects.callbackPayload = chunks;
res.writeHead(200, 'OK', { 'content-type': 'application/json' });
res.end();
resolve();
});
},
});
const tcpServerCallback = await runTcpServer({
cipherParams,
effects,
httpServer,
});
const server = new Server((req, res) => {
res.setHeader('content-type', 'text/html');
res.end('hello');
});
const bridge = new Bridge(server);
bridge.listen();
const context = { callbackWaitsForEmptyEventLoop: true };
const result = await bridge.launcher(
{
Action: 'Invoke',
body: JSON.stringify({
method: 'POST',
responseCallbackCipher: cipherParams.cipher,
responseCallbackCipherIV: cipherParams.cipherIV.toString('base64'),
responseCallbackCipherKey: cipherParams.cipherKey.toString('base64'),
responseCallbackStream: 'abc',
responseCallbackUrl: String(tcpServerCallback.url),
headers: { foo: 'bar' },
path: '/nowproxy',
body: 'body=1',
}),
},
context
);
await deferred;
expect(result).toEqual({});
expect(context.callbackWaitsForEmptyEventLoop).toEqual(false);
expect(effects.callbackStream).toEqual('abc');
expect(effects.callbackPayload).toEqual(['hello']);
server.close();
await httpServer.close();
await tcpServerCallback.close();
});
test('`NowProxyEvent` proxy streaming with an async handler', async () => {
const effects = {
callbackHeaders: undefined,
callbackMethod: undefined,
callbackPayload: undefined,
callbackStream: undefined,
};
const cipherParams = {
cipher: 'aes-256-ctr',
cipherIV: crypto.randomBytes(16),
cipherKey: crypto.randomBytes(32),
};
const { deferred, resolve } = createDeferred();
const jsonParser = jsonlines.parse();
const httpServer = await runServer({
handler: (req, res) => {
const chunks = [];
req.pipe(jsonParser);
jsonParser.on('data', chunk => {
chunks.push(chunk);
});
req.on('close', () => {
effects.callbackMethod = req.method;
effects.callbackHeaders = req.headers;
effects.callbackPayload = chunks;
res.writeHead(200, 'OK', { 'content-type': 'application/json' });
res.end();
resolve();
});
},
});
const tcpServerCallback = await runTcpServer({
cipherParams,
httpServer,
effects,
});
const jsonStringifier = jsonlines.stringify();
const server = new Server((req, res) => {
res.setHeader('x-test', 'hello');
res.setHeader('content-type', 'text/html');
jsonStringifier.pipe(res);
jsonStringifier.write({ method: req.method });
jsonStringifier.write({ path: req.url });
setTimeout(() => {
jsonStringifier.write({ headers: req.headers });
res.end();
}, 100);
});
const bridge = new Bridge(server);
bridge.listen();
const context = { callbackWaitsForEmptyEventLoop: true };
const result = await bridge.launcher(
{
Action: 'Invoke',
body: JSON.stringify({
method: 'POST',
responseCallbackCipher: cipherParams.cipher,
responseCallbackCipherIV: cipherParams.cipherIV.toString('base64'),
responseCallbackCipherKey: cipherParams.cipherKey.toString('base64'),
responseCallbackStream: 'abc',
responseCallbackUrl: String(tcpServerCallback.url),
headers: { foo: 'bar' },
path: '/nowproxy',
body: 'body=1',
}),
},
context
);
await deferred;
expect(result).toEqual({});
expect(context.callbackWaitsForEmptyEventLoop).toEqual(false);
expect(effects.callbackStream).toEqual('abc');
expect(effects.callbackMethod).toEqual('POST');
expect(effects.callbackHeaders).toMatchObject({
'x-vercel-status-code': '200',
'x-vercel-header-x-test': 'hello',
'x-vercel-header-content-type': 'text/html',
});
expect(effects.callbackPayload).toMatchObject([
{ method: 'POST' },
{ path: '/nowproxy' },
{ headers: { foo: 'bar' } },
]);
server.close();
httpServer.close();
tcpServerCallback.close();
});
function createDeferred() {
let resolve;
const deferred = new Promise(_resolve => {
resolve = _resolve;
});
return { deferred, resolve };
}

View File

@@ -0,0 +1,78 @@
const { createServer } = require('net');
const { Server } = require('http');
const { Socket } = require('net');
const { URL } = require('url');
const crypto = require('crypto');
const listen = require('test-listen');
exports.runServer = async function runServer({ handler }) {
const server = new Server(handler);
const url = await listen(server);
return { url: new URL(url), close: getKillServer(server) };
};
function getKillServer(server) {
let sockets = [];
server.on('connection', socket => {
sockets.push(socket);
socket.once('close', () => {
sockets.splice(sockets.indexOf(socket), 1);
});
});
return () => {
return new Promise((resolve, reject) => {
server.close(err => {
if (err) {
return reject(err);
}
resolve();
});
sockets.forEach(function (socket) {
socket.destroy();
});
sockets = [];
});
};
}
exports.runTcpServer = async function runTcpServer({
effects,
httpServer,
cipherParams,
}) {
const server = createServer();
server.on('connection', connection => {
const socket = new Socket();
socket.connect(parseInt(httpServer.url.port, 10), httpServer.hostname);
const decipher = crypto.createDecipheriv(
cipherParams.cipher,
cipherParams.cipherKey,
cipherParams.cipherIV
);
decipher.pipe(socket);
const CRLF = Buffer.from('\r\n');
let accBuffer = Buffer.from([]);
connection.on('data', function onConnectionData(chunk) {
accBuffer = Buffer.concat([accBuffer, chunk]);
const idx = accBuffer.indexOf(CRLF);
if (idx !== -1) {
effects.callbackStream = accBuffer.slice(0, idx).toString();
connection.off('data', onConnectionData);
decipher.write(accBuffer.slice(idx + 2));
connection.pipe(decipher);
decipher.on('close', () => {
socket.end();
});
}
});
});
const url = await listen(server);
return { url: new URL(url), close: getKillServer(server) };
};

View File

@@ -1,5 +1,6 @@
/// <reference types="node" /> /// <reference types="node" />
import { import type { CipherCCMTypes } from 'crypto';
import type {
Server, Server,
IncomingHttpHeaders, IncomingHttpHeaders,
OutgoingHttpHeaders, OutgoingHttpHeaders,
@@ -11,12 +12,18 @@ export interface VercelProxyEvent {
body: string; body: string;
} }
export interface VercelProxyRequest { export interface VercelProxyRequest {
isApiGateway?: boolean; isApiGateway: boolean;
method: string; method: string;
path: string; path: string;
headers: IncomingHttpHeaders; headers: IncomingHttpHeaders;
body: Buffer; body: Buffer;
encoding?: string;
payloads?: Array<VercelProxyRequest>; payloads?: Array<VercelProxyRequest>;
responseCallbackCipher?: CipherCCMTypes;
responseCallbackCipherIV?: string;
responseCallbackCipherKey?: string;
responseCallbackStream?: string;
responseCallbackUrl?: string;
} }
export interface VercelProxyResponse { export interface VercelProxyResponse {
statusCode: number; statusCode: number;
@@ -24,6 +31,7 @@ export interface VercelProxyResponse {
body: string; body: string;
encoding: BufferEncoding; encoding: BufferEncoding;
} }
export type VercelStreamProxyResponse = Record<string, never>;
export interface ServerLike { export interface ServerLike {
timeout?: number; timeout?: number;
listen: ( listen: (

View File

@@ -465,6 +465,9 @@ export const build: BuildV3 = async ({
config.helpers === false || process.env.NODEJS_HELPERS === '0' config.helpers === false || process.env.NODEJS_HELPERS === '0'
); );
const experimentalResponseStreaming =
staticConfig?.experimentalResponseStreaming === true ? true : undefined;
output = new NodejsLambda({ output = new NodejsLambda({
files: preparedFiles, files: preparedFiles,
handler, handler,
@@ -472,6 +475,7 @@ export const build: BuildV3 = async ({
shouldAddHelpers, shouldAddHelpers,
shouldAddSourcemapSupport, shouldAddSourcemapSupport,
awsLambdaHandler, awsLambdaHandler,
experimentalResponseStreaming,
}); });
} }

View File

@@ -1,3 +1,4 @@
// bust cache
const assert = require('assert'); const assert = require('assert');
const { createHash } = require('crypto'); const { createHash } = require('crypto');
const path = require('path'); const path = require('path');