refactor!: remove adapter hooks (#72)

This commit is contained in:
Pooya Parsa
2024-08-16 02:24:13 +02:00
committed by GitHub
parent 02f3e824e3
commit f535ff7c75
15 changed files with 11 additions and 184 deletions

View File

@@ -32,16 +32,6 @@ Bun.serve({
}); });
``` ```
## Adapter Hooks
- `bun:message (peer, ws, message)`
- `bun:open (peer, ws)`
- `bun:close (peer, ws)`
- `bun:drain (peer)`
- `bun:error (peer, ws, error)`
- `bun:ping (peer, ws, data)`
- `bun:pong (peer, ws, data)`
::read-more ::read-more
See [`test/fixture/bun.ts`](https://github.com/unjs/crossws/blob/main/test/fixture/bun.ts) for demo and [`src/adapters/bun.ts`](https://github.com/unjs/crossws/blob/main/src/adapters/bun.ts) for implementation. See [`test/fixture/bun.ts`](https://github.com/unjs/crossws/blob/main/test/fixture/bun.ts) for demo and [`src/adapters/bun.ts`](https://github.com/unjs/crossws/blob/main/src/adapters/bun.ts) for implementation.
:: ::

View File

@@ -33,13 +33,6 @@ export default {
}; };
``` ```
## Adapter Hooks
- `cloudflare:accept (peer)`
- `cloudflare:message (peer, event)`
- `cloudflare:error (peer, event)`
- `cloudflare:close (peer, event)`
::read-more ::read-more
See [`test/fixture/cloudflare.ts`](https://github.com/unjs/crossws/blob/main/test/fixture/cloudflare.ts) for demo and [`src/adapters/cloudflare.ts`](https://github.com/unjs/crossws/blob/main/src/adapters/cloudflare.ts) for implementation. See [`test/fixture/cloudflare.ts`](https://github.com/unjs/crossws/blob/main/test/fixture/cloudflare.ts) for demo and [`src/adapters/cloudflare.ts`](https://github.com/unjs/crossws/blob/main/src/adapters/cloudflare.ts) for implementation.
:: ::

View File

@@ -28,13 +28,6 @@ Deno.serve({ port: 3000 }, (request, info) => {
}); });
``` ```
## Adapter Hooks
- `deno:open (peer)`
- `deno:message (peer, event)`
- `deno:close (peer)`
- `deno:error (peer, error)`
::read-more ::read-more
See [`test/fixture/deno.ts`](./test/fixture/deno.ts) for demo and [`src/adapters/deno.ts`](./src/adapters/deno.ts) for implementation. See [`test/fixture/deno.ts`](./test/fixture/deno.ts) for demo and [`src/adapters/deno.ts`](./src/adapters/deno.ts) for implementation.
:: ::

View File

@@ -27,17 +27,6 @@ const server = createServer((req, res) => {
server.on("upgrade", ws.handleUpgrade); server.on("upgrade", ws.handleUpgrade);
``` ```
## Adapter Hooks
- `node:open (peer)`
- `node:message (peer, data, isBinary)`
- `node:close (peer, code, reason)`
- `node:error (peer, error)`
- `node:ping (peer)`
- `node:pong (peer)`
- `node:unexpected-response (peer, req, res)`
- `node:upgrade (peer, req)`
::read-more ::read-more
See [`test/fixture/node.ts`](https://github.com/unjs/crossws/blob/main/test/fixture/node.ts) for demo and [`src/adapters/node.ts`](https://github.com/unjs/crossws/blob/main/src/adapters/node.ts) for implementation. See [`test/fixture/node.ts`](https://github.com/unjs/crossws/blob/main/test/fixture/node.ts) for demo and [`src/adapters/node.ts`](https://github.com/unjs/crossws/blob/main/src/adapters/node.ts) for implementation.
:: ::
@@ -72,17 +61,6 @@ server.listen(3001, () => {
}); });
``` ```
## Adapter Hooks
- `uws:open (ws)`
- `uws:message (ws, message, isBinary)`
- `uws:close (ws, code, message)`
- `uws:ping (ws, message)`
- `uws:pong (ws, message)`
- `uws:drain (ws)`
- `uws:upgrade (res, req, context)`
- `uws:subscription (ws, topic, newCount, oldCount)`
::read-more ::read-more
See [`test/fixture/node-uws.ts`](https://github.com/unjs/crossws/blob/main/test/fixture/node-uws.ts) for demo and [`src/adapters/node-uws.ts`](https://github.com/unjs/crossws/blob/main/src/adapters/node-uws.ts) for implementation. See [`test/fixture/node-uws.ts`](https://github.com/unjs/crossws/blob/main/test/fixture/node-uws.ts) for demo and [`src/adapters/node-uws.ts`](https://github.com/unjs/crossws/blob/main/src/adapters/node-uws.ts) for implementation.
:: ::

View File

@@ -1,4 +1,4 @@
import type { AdapterHooks, Hooks, ResolveHooks } from "./hooks.ts"; import type { Hooks, ResolveHooks } from "./hooks.ts";
import type { Peer } from "./peer.ts"; import type { Peer } from "./peer.ts";
export function adapterUtils(peers: Set<Peer>) { export function adapterUtils(peers: Set<Peer>) {
@@ -24,7 +24,6 @@ export interface AdapterInstance {
export interface AdapterOptions { export interface AdapterOptions {
resolve?: ResolveHooks; resolve?: ResolveHooks;
hooks?: Hooks; hooks?: Hooks;
adapterHooks?: AdapterHooks;
} }
export type Adapter< export type Adapter<

View File

@@ -54,27 +54,13 @@ export default defineWebSocketAdapter<BunAdapter, BunOptions>(
open: (ws) => { open: (ws) => {
const peer = getPeer(ws, peers); const peer = getPeer(ws, peers);
peers.add(peer); peers.add(peer);
hooks.callAdapterHook("bun:open", peer, ws);
hooks.callHook("open", peer); hooks.callHook("open", peer);
}, },
close: (ws) => { close: (ws) => {
const peer = getPeer(ws, peers); const peer = getPeer(ws, peers);
peers.delete(peer); peers.delete(peer);
hooks.callAdapterHook("bun:close", peer, ws);
hooks.callHook("close", peer, {}); hooks.callHook("close", peer, {});
}, },
drain: (ws) => {
const peer = getPeer(ws, peers);
hooks.callAdapterHook("bun:drain", peer);
},
ping(ws, data) {
const peer = getPeer(ws, peers);
hooks.callAdapterHook("bun:ping", peer, ws, data);
},
pong(ws, data) {
const peer = getPeer(ws, peers);
hooks.callAdapterHook("bun:pong", peer, ws, data);
},
}, },
}; };
}, },

View File

@@ -45,7 +45,6 @@ export default defineWebSocketAdapter<
); );
peers.add(peer); peers.add(peer);
(obj as DurableObjectPub).ctx.acceptWebSocket(server); (obj as DurableObjectPub).ctx.acceptWebSocket(server);
await hooks.callAdapterHook("cloudflare:accept", peer);
await hooks.callHook("open", peer); await hooks.callHook("open", peer);
// eslint-disable-next-line unicorn/no-null // eslint-disable-next-line unicorn/no-null
return new Response(null, { return new Response(null, {
@@ -56,14 +55,12 @@ export default defineWebSocketAdapter<
}, },
handleDurableMessage: async (obj, ws, message) => { handleDurableMessage: async (obj, ws, message) => {
const peer = CloudflareDurablePeer._restore(obj, ws as CF.WebSocket); const peer = CloudflareDurablePeer._restore(obj, ws as CF.WebSocket);
await hooks.callAdapterHook("cloudflare:message", peer, message);
await hooks.callHook("message", peer, new Message(message, peer)); await hooks.callHook("message", peer, new Message(message, peer));
}, },
handleDurableClose: async (obj, ws, code, reason, wasClean) => { handleDurableClose: async (obj, ws, code, reason, wasClean) => {
const peer = CloudflareDurablePeer._restore(obj, ws as CF.WebSocket); const peer = CloudflareDurablePeer._restore(obj, ws as CF.WebSocket);
peers.delete(peer); peers.delete(peer);
const details = { code, reason, wasClean }; const details = { code, reason, wasClean };
await hooks.callAdapterHook("cloudflare:close", peer, details);
await hooks.callHook("close", peer, details); await hooks.callHook("close", peer, details);
}, },
}; };

View File

@@ -53,10 +53,8 @@ export default defineWebSocketAdapter<CloudflareAdapter, CloudflareOptions>(
}); });
peers.add(peer); peers.add(peer);
server.accept(); server.accept();
hooks.callAdapterHook("cloudflare:accept", peer);
hooks.callHook("open", peer); hooks.callHook("open", peer);
server.addEventListener("message", (event) => { server.addEventListener("message", (event) => {
hooks.callAdapterHook("cloudflare:message", peer, event);
hooks.callHook( hooks.callHook(
"message", "message",
peer, peer,
@@ -65,12 +63,10 @@ export default defineWebSocketAdapter<CloudflareAdapter, CloudflareOptions>(
}); });
server.addEventListener("error", (event) => { server.addEventListener("error", (event) => {
peers.delete(peer); peers.delete(peer);
hooks.callAdapterHook("cloudflare:error", peer, event);
hooks.callHook("error", peer, new WSError(event.error)); hooks.callHook("error", peer, new WSError(event.error));
}); });
server.addEventListener("close", (event) => { server.addEventListener("close", (event) => {
peers.delete(peer); peers.delete(peer);
hooks.callAdapterHook("cloudflare:close", peer, event);
hooks.callHook("close", peer, event); hooks.callHook("close", peer, event);
}); });
// eslint-disable-next-line unicorn/no-null // eslint-disable-next-line unicorn/no-null

View File

@@ -51,21 +51,17 @@ export default defineWebSocketAdapter<DenoAdapter, DenoOptions>(
}); });
peers.add(peer); peers.add(peer);
upgrade.socket.addEventListener("open", () => { upgrade.socket.addEventListener("open", () => {
hooks.callAdapterHook("deno:open", peer);
hooks.callHook("open", peer); hooks.callHook("open", peer);
}); });
upgrade.socket.addEventListener("message", (event) => { upgrade.socket.addEventListener("message", (event) => {
hooks.callAdapterHook("deno:message", peer, event);
hooks.callHook("message", peer, new Message(event.data, peer, event)); hooks.callHook("message", peer, new Message(event.data, peer, event));
}); });
upgrade.socket.addEventListener("close", () => { upgrade.socket.addEventListener("close", () => {
peers.delete(peer); peers.delete(peer);
hooks.callAdapterHook("deno:close", peer);
hooks.callHook("close", peer, {}); hooks.callHook("close", peer, {});
}); });
upgrade.socket.addEventListener("error", (error) => { upgrade.socket.addEventListener("error", (error) => {
peers.delete(peer); peers.delete(peer);
hooks.callAdapterHook("deno:error", peer, error);
hooks.callHook("error", peer, new WSError(error)); hooks.callHook("error", peer, new WSError(error));
}); });
return upgrade.response; return upgrade.response;

View File

@@ -53,11 +53,8 @@ export default defineWebSocketAdapter<NodeAdapter, NodeOptions>(
const request = new NodeReqProxy(nodeReq); const request = new NodeReqProxy(nodeReq);
const peer = new NodePeer({ ws, request, peers, nodeReq }); const peer = new NodePeer({ ws, request, peers, nodeReq });
peers.add(peer); peers.add(peer);
hooks.callHook("open", peer); hooks.callHook("open", peer); // ws is already open
ws.on("message", (data: unknown) => {
// Managed socket-level events
ws.on("message", (data: unknown, isBinary: boolean) => {
hooks.callAdapterHook("node:message", peer, data, isBinary);
if (Array.isArray(data)) { if (Array.isArray(data)) {
data = Buffer.concat(data); data = Buffer.concat(data);
} }
@@ -65,37 +62,15 @@ export default defineWebSocketAdapter<NodeAdapter, NodeOptions>(
}); });
ws.on("error", (error: Error) => { ws.on("error", (error: Error) => {
peers.delete(peer); peers.delete(peer);
hooks.callAdapterHook("node:error", peer, error);
hooks.callHook("error", peer, new WSError(error)); hooks.callHook("error", peer, new WSError(error));
}); });
ws.on("close", (code: number, reason: Buffer) => { ws.on("close", (code: number, reason: Buffer) => {
peers.delete(peer); peers.delete(peer);
hooks.callAdapterHook("node:close", peer, code, reason);
hooks.callHook("close", peer, { hooks.callHook("close", peer, {
code, code,
reason: reason?.toString(), reason: reason?.toString(),
}); });
}); });
ws.on("open", () => {
hooks.callAdapterHook("node:open", peer);
});
// Unmanaged socket-level events
ws.on("ping", (data: Buffer) => {
hooks.callAdapterHook("node:ping", peer, data);
});
ws.on("pong", (data: Buffer) => {
hooks.callAdapterHook("node:pong", peer, data);
});
ws.on(
"unexpected-response",
(req: ClientRequest, res: IncomingMessage) => {
hooks.callAdapterHook("node:unexpected-response", peer, req, res);
},
);
ws.on("upgrade", (req: IncomingMessage) => {
hooks.callAdapterHook("node:upgrade", peer, req);
});
}); });
wss.on("headers", (outgoingHeaders, req) => { wss.on("headers", (outgoingHeaders, req) => {

View File

@@ -54,7 +54,6 @@ export default defineWebSocketAdapter<UWSAdapter, UWSOptions>(
((peer as any)._internal.ws as UwsWebSocketProxy).readyState = ((peer as any)._internal.ws as UwsWebSocketProxy).readyState =
2 /* CLOSING */; 2 /* CLOSING */;
peers.delete(peer); peers.delete(peer);
hooks.callAdapterHook("uws:close", peer, ws, code, message);
hooks.callHook("close", peer, { hooks.callHook("close", peer, {
code, code,
reason: message?.toString(), reason: message?.toString(),
@@ -62,40 +61,15 @@ export default defineWebSocketAdapter<UWSAdapter, UWSOptions>(
((peer as any)._internal.ws as UwsWebSocketProxy).readyState = ((peer as any)._internal.ws as UwsWebSocketProxy).readyState =
3 /* CLOSED */; 3 /* CLOSED */;
}, },
drain(ws) {
const peer = getPeer(ws, peers);
hooks.callAdapterHook("uws:drain", peer, ws);
},
message(ws, message, isBinary) { message(ws, message, isBinary) {
const peer = getPeer(ws, peers); const peer = getPeer(ws, peers);
hooks.callAdapterHook("uws:message", peer, ws, message, isBinary);
hooks.callHook("message", peer, new Message(message, peer)); hooks.callHook("message", peer, new Message(message, peer));
}, },
open(ws) { open(ws) {
const peer = getPeer(ws, peers); const peer = getPeer(ws, peers);
peers.add(peer); peers.add(peer);
hooks.callAdapterHook("uws:open", peer, ws);
hooks.callHook("open", peer); hooks.callHook("open", peer);
}, },
ping(ws, message) {
const peer = getPeer(ws, peers);
hooks.callAdapterHook("uws:ping", peer, ws, message);
},
pong(ws, message) {
const peer = getPeer(ws, peers);
hooks.callAdapterHook("uws:pong", peer, ws, message);
},
subscription(ws, topic, newCount, oldCount) {
const peer = getPeer(ws, peers);
hooks.callAdapterHook(
"uws:subscription",
peer,
ws,
topic,
newCount,
oldCount,
);
},
async upgrade(res, req, context) { async upgrade(res, req, context) {
let aborted = false; let aborted = false;
res.onAborted(() => { res.onAborted(() => {

View File

@@ -39,13 +39,6 @@ export class AdapterHookable {
}, },
) as Promise<any>; ) as Promise<any>;
} }
callAdapterHook<N extends keyof AdapterHooks>(
name: N,
...args: Parameters<AdapterHooks[N]>
): ReturnType<AdapterHooks[N]> {
return this.options.adapterHooks?.[name]?.apply(undefined, args);
}
} }
// --- types --- // --- types ---
@@ -93,48 +86,3 @@ export interface Hooks {
/** An error occurs */ /** An error occurs */
error: (peer: Peer, error: WSError) => MaybePromise<void>; error: (peer: Peer, error: WSError) => MaybePromise<void>;
} }
export interface AdapterHooks extends Record<string, HookFn<any[], any>> {
// Bun
"bun:message": HookFn<[ws: any, message: any]>;
"bun:open": HookFn<[ws: any]>;
"bun:close": HookFn<[ws: any]>;
"bun:drain": HookFn<[]>;
"bun:error": HookFn<[ws: any, error: any]>;
"bun:ping": HookFn<[ws: any, data: any]>;
"bun:pong": HookFn<[ws: any, data: any]>;
// Cloudflare
"cloudflare:accept": HookFn<[]>;
"cloudflare:message": HookFn<[event: any]>;
"cloudflare:error": HookFn<[event: any]>;
"cloudflare:close": HookFn<[event: any]>;
// Deno
"deno:open": HookFn<[]>;
"deno:message": HookFn<[event: any]>;
"deno:close": HookFn<[]>;
"deno:error": HookFn<[error: any]>;
// ws (Node)
"node:open": HookFn<[]>;
"node:message": HookFn<[data: any, isBinary: boolean]>;
"node:close": HookFn<[code: number, reason: Buffer]>;
"node:error": HookFn<[error: any]>;
"node:ping": HookFn<[data: Buffer]>;
"node:pong": HookFn<[data: Buffer]>;
"node:unexpected-response": HookFn<[req: any, res: any]>;
"node:upgrade": HookFn<[req: any]>;
// uws (Node)
"uws:open": HookFn<[ws: any]>;
"uws:message": HookFn<[ws: any, message: any, isBinary: boolean]>;
"uws:close": HookFn<[ws: any, code: number, message: any]>;
"uws:ping": HookFn<[ws: any, message: any]>;
"uws:pong": HookFn<[ws: any, message: any]>;
"uws:drain": HookFn<[ws: any]>;
"uws:upgrade": HookFn<[res: any, req: any, context: any]>;
"uws:subscription": HookFn<
[ws: any, topic: any, newCount: number, oldCount: number]
>;
}

View File

@@ -1,6 +1,6 @@
// Hooks // Hooks
export { defineHooks } from "./hooks.ts"; export { defineHooks } from "./hooks.ts";
export type { Hooks, AdapterHooks, ResolveHooks } from "./hooks.ts"; export type { Hooks, ResolveHooks } from "./hooks.ts";
// Adapter // Adapter
export { defineWebSocketAdapter } from "./adapter.ts"; export { defineWebSocketAdapter } from "./adapter.ts";

View File

@@ -138,7 +138,7 @@ class WebSocketInspector extends Agent {
export function wsTestsExec( export function wsTestsExec(
cmd: string, cmd: string,
opts: Parameters<typeof wsTests>[1], opts: Parameters<typeof wsTests>[1] & { silent?: boolean },
tests = wsTests, tests = wsTests,
) { ) {
let childProc: ExecaRes; let childProc: ExecaRes;
@@ -156,9 +156,11 @@ export function wsTestsExec(
console.error(error); console.error(error);
} }
}); });
if (process.env.TEST_DEBUG || !opts.silent) {
childProc.stderr!.on("data", (chunk) => { childProc.stderr!.on("data", (chunk) => {
console.log(chunk.toString()); console.log(chunk.toString());
}); });
}
if (process.env.TEST_DEBUG) { if (process.env.TEST_DEBUG) {
childProc.stdout!.on("data", (chunk) => { childProc.stdout!.on("data", (chunk) => {
console.log(chunk.toString()); console.log(chunk.toString());

View File

@@ -4,6 +4,6 @@ import { wsTestsExec } from "../_utils";
describe("cloudflare", () => { describe("cloudflare", () => {
wsTestsExec( wsTestsExec(
"wrangler dev -c ./wrangler.toml --inspector-port 0 --port $PORT", "wrangler dev -c ./wrangler.toml --inspector-port 0 --port $PORT",
{ adapter: "cloudflare", pubsub: false }, { adapter: "cloudflare", pubsub: false, silent: true },
); );
}); });