mirror of
https://github.com/LukeHagar/crossws.git
synced 2025-12-06 04:19:26 +00:00
163 lines
4.4 KiB
TypeScript
163 lines
4.4 KiB
TypeScript
import { Agent, WebSocket as WebSocketUndici } from "undici";
|
|
import { afterAll, beforeAll, afterEach } from "vitest";
|
|
import { execa, ResultPromise as ExecaRes } from "execa";
|
|
import { fileURLToPath } from "node:url";
|
|
import { getRandomPort, waitForPort } from "get-port-please";
|
|
import { wsTests } from "./tests";
|
|
|
|
const fixtureDir = fileURLToPath(new URL("fixture", import.meta.url));
|
|
|
|
const websockets = new Set<WebSocket>();
|
|
afterEach(() => {
|
|
for (const ws of websockets) {
|
|
ws.close();
|
|
}
|
|
websockets.clear();
|
|
});
|
|
|
|
export function wsConnect(
|
|
url: string,
|
|
opts?: { skip?: number; headers?: HeadersInit },
|
|
) {
|
|
const inspector = new WebSocketInspector();
|
|
const _WebSocket = globalThis.WebSocket || WebSocketUndici;
|
|
const ws = new _WebSocket(url, {
|
|
// @ts-expect-error
|
|
headers: opts?.headers,
|
|
dispatcher: inspector,
|
|
});
|
|
websockets.add(ws);
|
|
|
|
const send = async (data: any): Promise<any> => {
|
|
ws.send(
|
|
typeof data === "string" ? data : JSON.stringify({ message: data }),
|
|
);
|
|
};
|
|
|
|
const messages: unknown[] = [];
|
|
|
|
const waitCallbacks: Record<string, (message: any) => void> = {};
|
|
let nextIndex = opts?.skip || 0;
|
|
const next = () => {
|
|
const index = nextIndex++;
|
|
if (index < messages.length) {
|
|
return Promise.resolve(messages[index]);
|
|
}
|
|
return new Promise<any>((resolve) => {
|
|
waitCallbacks[index] = resolve;
|
|
});
|
|
};
|
|
const skip = (count: number = 1) => {
|
|
nextIndex += count;
|
|
};
|
|
|
|
ws.addEventListener("message", (event) => {
|
|
const str =
|
|
typeof event.data === "string"
|
|
? event.data
|
|
: new TextDecoder().decode(event.data);
|
|
const payload = str[0] === "{" ? JSON.parse(str) : str;
|
|
messages.push(payload);
|
|
const index = messages.length - 1;
|
|
if (waitCallbacks[index]) {
|
|
waitCallbacks[index](payload);
|
|
delete waitCallbacks[index];
|
|
}
|
|
});
|
|
|
|
const res = {
|
|
ws,
|
|
send,
|
|
next,
|
|
skip,
|
|
messages,
|
|
inspector,
|
|
error: undefined as Error | undefined,
|
|
};
|
|
|
|
const connectPromise = new Promise((resolve, reject) => {
|
|
ws.addEventListener("open", () => resolve(res));
|
|
ws.addEventListener("error", (error) => {
|
|
// @ts-expect-error
|
|
res.error = error;
|
|
resolve(res);
|
|
});
|
|
});
|
|
|
|
return Object.assign(connectPromise, res) as Promise<typeof res>;
|
|
}
|
|
|
|
class WebSocketInspector extends Agent {
|
|
status?: number;
|
|
statusText?: string;
|
|
headers?: Record<string, string>;
|
|
error?: Error;
|
|
|
|
_normalizeHeaders(rawHeaders: string[] | Buffer[] | null) {
|
|
const headerEntries: [string, string][] = [];
|
|
for (let i = 0; i < rawHeaders!.length; i += 2) {
|
|
headerEntries.push([
|
|
decodeURIComponent(rawHeaders![i].toString()).toLowerCase(),
|
|
decodeURIComponent(rawHeaders![i + 1].toString()),
|
|
]);
|
|
}
|
|
return Object.fromEntries(headerEntries);
|
|
}
|
|
|
|
dispatch(opts: any, handler: any) {
|
|
return super.dispatch(opts, {
|
|
...handler,
|
|
onHeaders: (statusCode, headers, resume, statusText) => {
|
|
this.status = statusCode;
|
|
this.statusText = statusText;
|
|
this.headers = this._normalizeHeaders(headers);
|
|
return handler.onHeaders(statusCode, headers, resume, statusText);
|
|
},
|
|
onError: (error) => {
|
|
this.error = error;
|
|
return handler.onError(error);
|
|
},
|
|
onUpgrade: (statusCode, rawHeaders = [], socket) => {
|
|
this.headers = this._normalizeHeaders(rawHeaders);
|
|
return handler.onUpgrade(statusCode, rawHeaders, socket);
|
|
},
|
|
});
|
|
}
|
|
}
|
|
|
|
export function wsTestsExec(
|
|
cmd: string,
|
|
opts: Parameters<typeof wsTests>[1],
|
|
tests = wsTests,
|
|
) {
|
|
let childProc: ExecaRes;
|
|
let url: string;
|
|
beforeAll(async () => {
|
|
const port = await getRandomPort("localhost");
|
|
url = `ws://localhost:${port}/`;
|
|
const [bin, ...args] = cmd
|
|
.replace("$PORT", String(port))
|
|
.replace("./", fixtureDir + "/")
|
|
.split(" ");
|
|
childProc = execa(bin, args, { env: { PORT: port.toString() } });
|
|
childProc.catch((error) => {
|
|
if (error.signal !== "SIGTERM") {
|
|
console.error(error);
|
|
}
|
|
});
|
|
childProc.stderr!.on("data", (chunk) => {
|
|
console.log(chunk.toString());
|
|
});
|
|
if (process.env.TEST_DEBUG) {
|
|
childProc.stdout!.on("data", (chunk) => {
|
|
console.log(chunk.toString());
|
|
});
|
|
}
|
|
await waitForPort(port, { host: "localhost", delay: 50, retries: 100 });
|
|
});
|
|
afterAll(async () => {
|
|
await childProc.kill();
|
|
});
|
|
tests(() => url, opts);
|
|
}
|