diff --git a/.gitignore b/.gitignore index d306155..289ef81 100644 --- a/.gitignore +++ b/.gitignore @@ -9,4 +9,5 @@ dist .wrangler /adapters +/websocket websocket.d.ts diff --git a/docs/1.guide/3.peer.md b/docs/1.guide/3.peer.md index a52dc66..e594c4c 100644 --- a/docs/1.guide/3.peer.md +++ b/docs/1.guide/3.peer.md @@ -6,38 +6,38 @@ icon: mynaui:api > Peer object allows easily interacting with connected clients. -Websocket [hooks](/guide/hooks) accept a peer instance as their first argument. You can use peer object to get information about each connected client or send a message to them. +When a new client connects to the server, crossws creates a peer instance that allows getting information from clients and sending messages to them. -> [!TIP] -> You can safely log a peer instance to the console using `console.log` it will be automatically stringified with useful information including the remote address and connection status! - -## Properties - -### `peer.url` - -Request http url during upgrade. You can use it to do actions based on path and search params. - -### `peer.headers` - -Request http headers during upgrade. Youb can use it to do authentication and access upgrade headers. - -### `peer.addr` - -The IP address of the client. +## Instance properties ### `peer.id` -A unique id assigned to the peer. +Unique random identifier ([uuid v4](https://developer.mozilla.org/en-US/docs/Glossary/UUID)) for the peer. -### `peer.readyState` +### `peer.request?` -Client connection status (might be `undefined`) +Access to the upgrade request info. You can use it to do authentication and access users headers and cookies. -:read-more{to="https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/readyState" title="readyState in MDN"} +> [!NOTE] +> This property is compatible with web [Request](https://developer.mozilla.org/en-US/docs/Web/API/Request) interface, However interface is emulated for Node.js and sometimes unavailable. Refer to the [compatibility table](#compatibility) for more info. -## Methods +### `peer.remoteAddress?` -### `peer.send(message, compress)` +The IP address of the client. + +> [!NOTE] +> Not all adapters provide this. Refer to the [compatibility table](#compatibility) for more info. + +### `peer.websocket` + +Direct access to the [`WebSocket`](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket) instance. + +> [!NOTE] +> WebSocket properties vary across runtimes. When accessing `peer.websocket`, a lightweight proxy increases stablity. Refer to the [compatibility table](#compatibility) for more info. + +## Instance methods + +### `peer.send(message, { compress? })` Send a message to the connected client. @@ -79,3 +79,44 @@ To close the connection abruptly, use `peer.terminate()`. Abruptly close the connection. To gracefully close the connection, use `peer.close()`. + +## Compatibility + +| | [Bun][bun] | [Cloudflare][cfw] | [Cloudflare (durable)][cfd] | [Deno][deno] | [Node (ws)][nodews] | [Node (μWebSockets)][nodeuws] | [SSE][sse] | +| --------------------------- | ---------- | ----------------- | --------------------------- | ------------ | ------------------- | ----------------------------- | ---------- | +| `send()` | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | +| `publish()` / `subscribe()` | ✓ | ⨉ | ✓ [^1] | ✓ [^1] | ✓ [^1] | ✓ | ✓ [^1] | +| `close()` | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | +| `terminate()` | ✓ | ✓ [^2] | ✓ | ✓ | ✓ | ✓ | ✓ [^2] | +| `request` | ✓ | ✓ | ✓ [^3] | ✓ | ✓ [^3] | ✓ [^3] | ✓ | +| `remoteAddress` | ✓ | ⨉ | ⨉ | ✓ | ✓ | ✓ | ⨉ | +| `websocket.url` | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | +| `websocket.extensions` | ✓ [^4] | ⨉ | ⨉ | ✓ [^4] | ✓ [^4] | ✓ [^4] | ⨉ | +| `websocket.protocol` | ✓ [^5] | ✓ [^5] | ✓ [^5] | [^5] ✓ | ✓ [^5] | ✓ [^5] | ⨉ | +| `websocket.readyState` | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ [^6] | ✓ [^6] | +| `websocket.binaryType` | ✓ [^7] | ⨉ | ⨉ | ✓ | ✓ [^7] | ✓ | ⨉ | +| `websocket.bufferedAmount` | ⨉ | ⨉ | ⨉ | ✓ | ✓ | ✓ | ⨉ | + +[bun]: /adapters/bun +[cfw]: /adapters/cloudflare +[cfd]: /adapters/cloudflare#durable-objects +[deno]: /adapters/deno +[nodews]: /adapters/node +[nodeuws]: /adapters/node#uwebsockets +[sse]: adapters/sse + +[^1]: pubsub is not natively handled by runtime. peers are internally tracked. + +[^2]: `close()` will be used for compatibility. + +[^1]: using a proxy for [Request](https://developer.mozilla.org/en-US/docs/Web/API/Request) compatible interface (`url`, `headers` only) wrapping Node.js requests. + +[^3]: `request` is not always available (only in `open` hook). + +[^4]: [`websocket.extensions`](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/extensions) is polyfilled using [`sec-websocket-extensions`](https://developer.mozilla.org/en-US/docs/Web/HTTP/Protocol_upgrade_mechanism#websocket-specific_headers) request header. + +[^5]: [`websocket.protocol`](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/protocol) is polyfilled using [`sec-websocket-protocol`](https://developer.mozilla.org/en-US/docs/Web/HTTP/Protocol_upgrade_mechanism#websocket-specific_headers) request header. + +[^6]: [`websocket.readyState`](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/readyState) is polyfilled by tracking open/close events. + +[^7]: Some runtimes have non standard values including `"nodebuffer"` and `"uint8array"`. crossws auto converts them for [`message.data`](/guide/message#messagedata). diff --git a/docs/1.guide/4.message.md b/docs/1.guide/4.message.md index 058a6ab..736d64a 100644 --- a/docs/1.guide/4.message.md +++ b/docs/1.guide/4.message.md @@ -4,21 +4,73 @@ icon: solar:letter-line-duotone # Message -On `message` [hook](/guide/hooks), you receive a message object containing an incoming message from the client. +On `message` [hook](/guide/hooks), you receive a message object containing data from the client. -> [!TIP] -> You can safely log `message` object to the console using `console.log` it will be automatically stringified! +> [!NOTE] +> Message object is API-compatible with standard Websocket [`MessageEvent`](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/message_event) with convenient superset of utils. -## API +## Instance properties -### `message.text()` +### `message.id` -Get stringified text version of the message +Unique random identifier ([uuid v4](https://developer.mozilla.org/en-US/docs/Glossary/UUID)) for the message. + +### `message.event` + +Access to the original [message event](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/message_event) if available. + +### `message.peer` + +Access to the [peer instance](/guide/peer) that emitted the message. ### `message.rawData` -Raw message data +Raw message data (can be of any type). -### `message.isBinary` +### `message.data` -Indicates if the message is binary (might be `undefined`) +Message data (value varies based on [`peer.binaryType`](/guide/peer#peerbinarytype)). + +## Instance methods + +### `message.text()` + +Get stringified text version of the message. + +If raw data is in any other format, it will be automatically converted or decoded. + +### `message.json()` + +Get parsed version of the message text with [`JSON.parse()`](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/JSON/parse). + +### `message.uint8Array()` + +Get data as [`Uint8Array`](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Uint8Array) value. + +If raw data is in any other format or string, it will be automatically converted or encoded. + +### `message.arrayBuffer()` + +Get data as [`ArrayBuffer`](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/ArrayBuffer) or [`SharedArrayBuffer`](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/SharedArrayBuffer) value. + +If raw data is in any other format or string, it will be automatically converted or encoded. + +### `message.blob()` + +Get data as [`Blob`](https://developer.mozilla.org/en-US/docs/Web/API/Blob) value. + +If raw data is in any other format or string, it will be automatically converted or encoded. + +## Adapter support + +| | [Bun][bun] | [Cloudflare][cfw] | [Cloudflare (durable)][cfd] | [Deno][deno] | [Node (ws)][nodews] | [Node (μWebSockets)][nodeuws] | [SSE][sse] | +| ------- | ---------- | ----------------- | --------------------------- | ------------ | ------------------- | ----------------------------- | ---------- | +| `event` | ⨉ | ✓ | ⨉ | ✓ | ⨉ | ⨉ | ⨉ | + +[bun]: /adapters/bun +[cfw]: /adapters/cloudflare +[cfd]: /adapters/cloudflare#durable-objects +[deno]: /adapters/deno +[nodews]: /adapters/node +[nodeuws]: /adapters/node#uwebsockets +[sse]: adapters/sse diff --git a/src/adapters/bun.ts b/src/adapters/bun.ts index b36624a..6a84bff 100644 --- a/src/adapters/bun.ts +++ b/src/adapters/bun.ts @@ -1,5 +1,4 @@ import type { WebSocketHandler, ServerWebSocket, Server } from "bun"; - import type { AdapterOptions, AdapterInstance } from "../adapter.ts"; import { toBufferLike } from "../utils.ts"; import { defineWebSocketAdapter, adapterUtils } from "../adapter.ts"; @@ -17,9 +16,8 @@ export interface BunAdapter extends AdapterInstance { export interface BunOptions extends AdapterOptions {} type ContextData = { - _peer?: BunPeer; - request?: Request; - requestUrl?: string; + peer?: BunPeer; + request: Request; server?: Server; }; @@ -41,7 +39,6 @@ export default defineWebSocketAdapter( data: { server, request, - requestUrl: request.url, } satisfies ContextData, headers: res?.headers, }); @@ -52,7 +49,7 @@ export default defineWebSocketAdapter( websocket: { message: (ws, message) => { const peer = getPeer(ws, peers); - hooks.callHook("message", peer, new Message(message)); + hooks.callHook("message", peer, new Message(message, peer)); }, open: (ws) => { const peer = getPeer(ws, peers); @@ -89,66 +86,51 @@ function getPeer( ws: ServerWebSocket, peers: Set, ): BunPeer { - if (ws.data?._peer) { - return ws.data._peer; + if (ws.data?.peer) { + return ws.data.peer; } - const peer = new BunPeer({ peers, bun: { ws } }); + const peer = new BunPeer({ ws, request: ws.data.request, peers }); ws.data = { ...ws.data, - _peer: peer, + peer, }; return peer; } class BunPeer extends Peer<{ + ws: ServerWebSocket; + request: Request; peers: Set; - bun: { ws: ServerWebSocket }; }> { - get addr() { - let addr = this._internal.bun.ws.remoteAddress; - if (addr.includes(":")) { - addr = `[${addr}]`; - } - return addr; + get remoteAddress() { + return this._internal.ws.remoteAddress; } - get readyState() { - return this._internal.bun.ws.readyState as any; + send(data: unknown, options?: { compress?: boolean }) { + return this._internal.ws.send(toBufferLike(data), options?.compress); } - get url() { - return this._internal.bun.ws.data.requestUrl || "/"; - } - - get headers() { - return this._internal.bun.ws.data.request?.headers; - } - - send(message: any, options?: { compress?: boolean }) { - return this._internal.bun.ws.send(toBufferLike(message), options?.compress); - } - - publish(topic: string, message: any, options?: { compress?: boolean }) { - return this._internal.bun.ws.publish( + publish(topic: string, data: unknown, options?: { compress?: boolean }) { + return this._internal.ws.publish( topic, - toBufferLike(message), + toBufferLike(data), options?.compress, ); } subscribe(topic: string): void { - this._internal.bun.ws.subscribe(topic); + this._internal.ws.subscribe(topic); } unsubscribe(topic: string): void { - this._internal.bun.ws.unsubscribe(topic); + this._internal.ws.unsubscribe(topic); } close(code?: number, reason?: string) { - this._internal.bun.ws.close(code, reason); + this._internal.ws.close(code, reason); } terminate() { - this._internal.bun.ws.terminate(); + this._internal.ws.terminate(); } } diff --git a/src/adapters/cloudflare-durable.ts b/src/adapters/cloudflare-durable.ts index b3741b7..8c1e471 100644 --- a/src/adapters/cloudflare-durable.ts +++ b/src/adapters/cloudflare-durable.ts @@ -1,4 +1,5 @@ import type { AdapterOptions, AdapterInstance } from "../adapter.ts"; +import type * as web from "../../types/web.ts"; import { toBufferLike } from "../utils.ts"; import { defineWebSocketAdapter, adapterUtils } from "../adapter.ts"; import { AdapterHookable } from "../hooks.ts"; @@ -38,13 +39,13 @@ export interface CloudflareDurableAdapter extends AdapterInstance { handleDurableMessage( obj: DurableObject, - ws: WebSocket | CF.WebSocket, + ws: WebSocket | CF.WebSocket | web.WebSocket, message: ArrayBuffer | string, ): Promise; handleDurableClose( obj: DurableObject, - ws: WebSocket | CF.WebSocket, + ws: WebSocket | CF.WebSocket | web.WebSocket, code: number, reason: string, wasClean: boolean, @@ -102,7 +103,7 @@ export default defineWebSocketAdapter< handleDurableMessage: async (obj, ws, message) => { const peer = peerFromDurableEvent(obj, ws as CF.WebSocket); hooks.callAdapterHook("cloudflare:message", peer, message); - hooks.callHook("message", peer, new Message(message)); + hooks.callHook("message", peer, new Message(message, peer)); }, handleDurableClose: async (obj, ws, code, reason, wasClean) => { const peer = peerFromDurableEvent(obj, ws as CF.WebSocket); @@ -124,12 +125,10 @@ function peerFromDurableEvent( return peer; } peer = ws._crosswsPeer = new CloudflareDurablePeer({ - cloudflare: { - ws: ws as CF.WebSocket, - request, - env: (obj as DurableObjectPub).env, - context: (obj as DurableObjectPub).ctx, - }, + ws: ws as CF.WebSocket, + request: request as Request, + cfEnv: (obj as DurableObjectPub).env, + cfCtx: (obj as DurableObjectPub).ctx, }); return peer; } @@ -137,60 +136,24 @@ function peerFromDurableEvent( // --- peer --- class CloudflareDurablePeer extends Peer<{ + ws: AugmentedWebSocket; + request?: Request; peers?: never; - cloudflare: { - ws: AugmentedWebSocket; - request?: Request | CF.Request; - env: unknown; - context: DurableObject["ctx"]; - }; + cfEnv: unknown; + cfCtx: DurableObject["ctx"]; }> { - get url() { - return ( - this._internal.cloudflare.request?.url || - this._internal.cloudflare.ws.url || - "" - ); - } - - get headers() { - return this._internal.cloudflare.request?.headers as Headers; - } - - get readyState() { - return this._internal.cloudflare.ws.readyState as -1 | 0 | 1 | 2 | 3; - } - - send(message: any) { - this._internal.cloudflare.ws.send(toBufferLike(message)); - return 0; - } - - subscribe(topic: string): void { - super.subscribe(topic); - const state: CrosswsState = { - // Max limit: 2,048 bytes - ...(this._internal.cloudflare.ws.deserializeAttachment() as CrosswsState), - topics: this._topics, - }; - this._internal.cloudflare.ws._crosswsState = state; - this._internal.cloudflare.ws.serializeAttachment(state); - } - get peers() { const clients = - this._internal.cloudflare.context.getWebSockets() as unknown as (typeof this._internal.cloudflare.ws)[]; + this._internal.cfCtx.getWebSockets() as unknown as (typeof this._internal.ws)[]; return new Set( clients.map((client) => { let peer = client._crosswsPeer; if (!peer) { peer = client._crosswsPeer = new CloudflareDurablePeer({ - cloudflare: { - ws: client, - request: undefined, - env: this._internal.cloudflare.env, - context: this._internal.cloudflare.context, - }, + ws: client, + request: undefined, + cfEnv: this._internal.cfEnv, + cfCtx: this._internal.cfCtx, }); } return peer; @@ -198,14 +161,29 @@ class CloudflareDurablePeer extends Peer<{ ); } - publish(topic: string, message: any): void { + send(data: unknown) { + return this._internal.ws.send(toBufferLike(data)); + } + + subscribe(topic: string): void { + super.subscribe(topic); + const state: CrosswsState = { + // Max limit: 2,048 bytes + ...(this._internal.ws.deserializeAttachment() as CrosswsState), + topics: this._topics, + }; + this._internal.ws._crosswsState = state; + this._internal.ws.serializeAttachment(state); + } + + publish(topic: string, data: unknown): void { const clients = ( - this._internal.cloudflare.context.getWebSockets() as unknown as (typeof this._internal.cloudflare.ws)[] - ).filter((c) => c !== this._internal.cloudflare.ws); + this._internal.cfCtx.getWebSockets() as unknown as (typeof this._internal.ws)[] + ).filter((c) => c !== this._internal.ws); if (clients.length === 0) { return; } - const data = toBufferLike(message); + const dataBuff = toBufferLike(data); for (const client of clients) { let state = client._crosswsState; if (!state) { @@ -213,16 +191,12 @@ class CloudflareDurablePeer extends Peer<{ client.deserializeAttachment() as CrosswsState; } if (state.topics?.has(topic)) { - client.send(data); + client.send(dataBuff); } } } close(code?: number, reason?: string) { - this._internal.cloudflare.ws.close(code, reason); - } - - terminate(): void { - this.close(); + this._internal.ws.close(code, reason); } } diff --git a/src/adapters/cloudflare.ts b/src/adapters/cloudflare.ts index acf8146..60f2f50 100644 --- a/src/adapters/cloudflare.ts +++ b/src/adapters/cloudflare.ts @@ -44,8 +44,12 @@ export default defineWebSocketAdapter( const client = pair[0]; const server = pair[1]; const peer = new CloudflarePeer({ + ws: client, peers, - cloudflare: { client, server, request, env, context }, + wsServer: server, + request: request as unknown as Request, + cfEnv: env, + cfCtx: context, }); peers.add(peer); server.accept(); @@ -53,7 +57,11 @@ export default defineWebSocketAdapter( hooks.callHook("open", peer); server.addEventListener("message", (event) => { hooks.callAdapterHook("cloudflare:message", peer, event); - hooks.callHook("message", peer, new Message(event.data)); + hooks.callHook( + "message", + peer, + new Message(event.data, peer, event as MessageEvent), + ); }); server.addEventListener("error", (event) => { peers.delete(peer); @@ -79,51 +87,23 @@ export default defineWebSocketAdapter( // --- peer --- class CloudflarePeer extends Peer<{ + ws: _cf.WebSocket; + request: Request; peers: Set; - cloudflare: { - client: _cf.WebSocket; - server: _cf.WebSocket; - request: _cf.Request; - env: unknown; - context: _cf.ExecutionContext; - }; + wsServer: _cf.WebSocket; + cfEnv: unknown; + cfCtx: _cf.ExecutionContext; }> { - get addr() { - return undefined; - } - - get url() { - return this._internal.cloudflare.request.url; - } - - get headers() { - return this._internal.cloudflare.request.headers as unknown as Headers; - } - - get readyState() { - return this._internal.cloudflare.client.readyState as -1 | 0 | 1 | 2 | 3; - } - - send(message: any) { - this._internal.cloudflare.server.send(toBufferLike(message)); + send(data: unknown) { + this._internal.wsServer.send(toBufferLike(data)); return 0; } publish(_topic: string, _message: any): void { - // Not supported - // Throws: A hanging Promise was canceled - // for (const peer of this._internal.peers) { - // if (peer !== this && peer._topics.has(_topic)) { - // peer.publish(_topic, _message); - // } - // } + // not supported } close(code?: number, reason?: string) { - this._internal.cloudflare.client.close(code, reason); - } - - terminate(): void { - this.close(); + this._internal.ws.close(code, reason); } } diff --git a/src/adapters/deno.ts b/src/adapters/deno.ts index ace78d1..7ae94c0 100644 --- a/src/adapters/deno.ts +++ b/src/adapters/deno.ts @@ -19,7 +19,9 @@ declare global { } type WebSocketUpgrade = import("@deno/types").Deno.WebSocketUpgrade; -type ServeHandlerInfo = unknown; // TODO +type ServeHandlerInfo = { + remoteAddr?: { transport: string; hostname: string; port: number }; +}; // --- adapter --- @@ -42,8 +44,10 @@ export default defineWebSocketAdapter( headers: res?.headers, }); const peer = new DenoPeer({ + ws: upgrade.socket, + request, peers, - deno: { ws: upgrade.socket, request, info }, + denoInfo: info, }); peers.add(peer); upgrade.socket.addEventListener("open", () => { @@ -52,7 +56,7 @@ export default defineWebSocketAdapter( }); upgrade.socket.addEventListener("message", (event) => { hooks.callAdapterHook("deno:message", peer, event); - hooks.callHook("message", peer, new Message(event.data)); + hooks.callHook("message", peer, new Message(event.data, peer, event)); }); upgrade.socket.addEventListener("close", () => { peers.delete(peer); @@ -73,50 +77,33 @@ export default defineWebSocketAdapter( // --- peer --- class DenoPeer extends Peer<{ + ws: WebSocketUpgrade["socket"]; + request: Request; peers: Set; - deno: { - ws: WebSocketUpgrade["socket"]; - request: Request; - info: ServeHandlerInfo; - }; + denoInfo: ServeHandlerInfo; }> { - get addr() { - // @ts-expect-error types missing - return this._internal.deno.ws.remoteAddress; + get remoteAddress() { + return this._internal.denoInfo.remoteAddr?.hostname; } - get readyState() { - return this._internal.deno.ws.readyState as -1 | 0 | 1 | 2 | 3; + send(data: unknown) { + return this._internal.ws.send(toBufferLike(data)); } - get url() { - return this._internal.deno.request.url; - } - - get headers() { - return this._internal.deno.request.headers || new Headers(); - } - - send(message: any) { - this._internal.deno.ws.send(toBufferLike(message)); - return 0; - } - - publish(topic: string, message: any) { - const data = toBufferLike(message); + publish(topic: string, data: unknown) { + const dataBuff = toBufferLike(data); for (const peer of this._internal.peers) { if (peer !== this && peer._topics.has(topic)) { - peer._internal.deno.ws.send(data); + peer._internal.ws.send(dataBuff); } } } close(code?: number, reason?: string) { - this._internal.deno.ws.close(code, reason); + this._internal.ws.close(code, reason); } terminate(): void { - // @ts-ignore (terminate is Deno-only api) - this._internal.deno.ws.terminate(); + (this._internal.ws as any).terminate(); } } diff --git a/src/adapters/node.ts b/src/adapters/node.ts index 9c9bd65..4c7e2af 100644 --- a/src/adapters/node.ts +++ b/src/adapters/node.ts @@ -1,4 +1,5 @@ import type { AdapterOptions, AdapterInstance } from "../adapter.ts"; +import type { WebSocket } from "../../types/web.ts"; import { toBufferLike } from "../utils.ts"; import { defineWebSocketAdapter, adapterUtils } from "../adapter.ts"; import { AdapterHookable } from "../hooks.ts"; @@ -11,14 +12,16 @@ import type { Duplex } from "node:stream"; import { WebSocketServer as _WebSocketServer } from "ws"; import type { ServerOptions, - RawData, WebSocketServer, WebSocket as WebSocketT, } from "../../types/ws"; // --- types --- -type AugmentedReq = IncomingMessage & { _upgradeHeaders?: HeadersInit }; +type AugmentedReq = IncomingMessage & { + _request: NodeReqProxy; + _upgradeHeaders?: HeadersInit; +}; export interface NodeAdapter extends AdapterInstance { handleUpgrade(req: IncomingMessage, socket: Duplex, head: Buffer): void; @@ -46,18 +49,19 @@ export default defineWebSocketAdapter( ...(options.serverOptions as any), }) as WebSocketServer); - wss.on("connection", (ws, req) => { - const peer = new NodePeer({ peers, node: { ws, req, server: wss } }); + wss.on("connection", (ws, nodeReq) => { + const request = new NodeReqProxy(nodeReq); + const peer = new NodePeer({ ws, request, peers, nodeReq }); peers.add(peer); hooks.callHook("open", peer); // Managed socket-level events - ws.on("message", (data: RawData, isBinary: boolean) => { + ws.on("message", (data: unknown, isBinary: boolean) => { hooks.callAdapterHook("node:message", peer, data, isBinary); if (Array.isArray(data)) { data = Buffer.concat(data); } - hooks.callHook("message", peer, new Message(data, isBinary)); + hooks.callHook("message", peer, new Message(data, peer)); }); ws.on("error", (error: Error) => { peers.delete(peer); @@ -94,7 +98,7 @@ export default defineWebSocketAdapter( }); }); - wss.on("headers", function (outgoingHeaders, req) { + wss.on("headers", (outgoingHeaders, req) => { const upgradeHeaders = (req as AugmentedReq)._upgradeHeaders; if (upgradeHeaders) { for (const [key, value] of new Headers(upgradeHeaders)) { @@ -105,14 +109,16 @@ export default defineWebSocketAdapter( return { ...adapterUtils(peers), - handleUpgrade: async (req, socket, head) => { - const res = await hooks.callHook("upgrade", new NodeReqProxy(req)); + handleUpgrade: async (nodeReq, socket, head) => { + const request = new NodeReqProxy(nodeReq); + const res = await hooks.callHook("upgrade", request); if (res instanceof Response) { return sendResponse(socket, res); } - (req as AugmentedReq)._upgradeHeaders = res?.headers; - wss.handleUpgrade(req, socket, head, (ws) => { - wss.emit("connection", ws, req); + (nodeReq as AugmentedReq)._request = request; + (nodeReq as AugmentedReq)._upgradeHeaders = res?.headers; + wss.handleUpgrade(nodeReq, socket, head, (ws) => { + wss.emit("connection", ws, nodeReq); }); }, closeAll: (code, data) => { @@ -128,49 +134,18 @@ export default defineWebSocketAdapter( class NodePeer extends Peer<{ peers: Set; - node: { - server: WebSocketServer; - req: IncomingMessage; - ws: WebSocketT & { _peer?: NodePeer }; - }; + request: NodeReqProxy; + nodeReq: IncomingMessage; + ws: WebSocketT & { _peer?: NodePeer }; }> { - _req: NodeReqProxy; - constructor(ctx: NodePeer["_internal"]) { - super(ctx); - this._req = new NodeReqProxy(ctx.node.req); - ctx.node.ws._peer = this; + get remoteAddress() { + return this._internal.nodeReq.socket?.remoteAddress; } - get addr() { - const socket = this._internal.node.req.socket; - if (!socket) { - return undefined; - } - const headers = this._internal.node.req.headers; - let addr = headers["x-forwarded-for"] || socket.remoteAddress || "??"; - if (addr.includes(":")) { - addr = `[${addr}]`; - } - const port = headers["x-forwarded-port"] || socket.remotePort || "??"; - return `${addr}:${port}`; - } - - get url() { - return this._req.url; - } - - get headers() { - return this._req.headers; - } - - get readyState() { - return this._internal.node.ws.readyState; - } - - send(message: any, options?: { compress?: boolean }) { - const data = toBufferLike(message); + send(data: unknown, options?: { compress?: boolean }) { + const dataBuff = toBufferLike(data); const isBinary = typeof data !== "string"; - this._internal.node.ws.send(data, { + this._internal.ws.send(dataBuff, { compress: options?.compress, binary: isBinary, ...options, @@ -178,8 +153,12 @@ class NodePeer extends Peer<{ return 0; } - publish(topic: string, message: any, options?: { compress?: boolean }): void { - const data = toBufferLike(message); + publish( + topic: string, + data: unknown, + options?: { compress?: boolean }, + ): void { + const dataBuff = toBufferLike(data); const isBinary = typeof data !== "string"; const sendOptions = { compress: options?.compress, @@ -188,17 +167,17 @@ class NodePeer extends Peer<{ }; for (const peer of this._internal.peers) { if (peer !== this && peer._topics.has(topic)) { - peer._internal.node.ws.send(data, sendOptions); + peer._internal.ws.send(dataBuff, sendOptions); } } } close(code?: number, data?: string | Buffer) { - this._internal.node.ws.close(code, data); + this._internal.ws.close(code, data); } terminate() { - this._internal.node.ws.terminate(); + this._internal.ws.terminate(); } } diff --git a/src/adapters/sse.ts b/src/adapters/sse.ts index 70fcae9..c443f3e 100644 --- a/src/adapters/sse.ts +++ b/src/adapters/sse.ts @@ -1,4 +1,5 @@ import type { AdapterOptions, AdapterInstance } from "../adapter.ts"; +import type * as web from "../../types/web.ts"; import { toString } from "../utils.ts"; import { defineWebSocketAdapter, adapterUtils } from "../adapter.ts"; import { AdapterHookable } from "../hooks.ts"; @@ -43,7 +44,7 @@ export default defineWebSocketAdapter((opts = {}) => { const stream = request.body.pipeThrough(new TextDecoderStream()); try { for await (const chunk of stream) { - hooks.callHook("message", peer, new Message(chunk)); + hooks.callHook("message", peer, new Message(chunk, peer)); } } catch { await stream.cancel().catch(() => {}); @@ -52,18 +53,13 @@ export default defineWebSocketAdapter((opts = {}) => { return new Response(null, {}); } else { // Add a new peer + const ws = new SSEWebSocketStub(); peer = new SSEPeer({ peers, - sse: { - request, - hooks, - onClose: () => { - peers.delete(peer); - if (opts.bidir) { - peersMap!.delete(peer.id); - } - }, - }, + peersMap, + request, + hooks, + ws, }); peers.add(peer); if (opts.bidir) { @@ -96,37 +92,36 @@ export default defineWebSocketAdapter((opts = {}) => { class SSEPeer extends Peer<{ peers: Set; - sse: { - request: Request; - hooks: AdapterHookable; - onClose: (peer: SSEPeer) => void; - }; + peersMap?: Map; + request: Request; + ws: SSEWebSocketStub; + hooks: AdapterHookable; }> { - _sseStream: ReadableStream; + _sseStream: ReadableStream; // server -> client _sseStreamController?: ReadableStreamDefaultController; - constructor(internal: SSEPeer["_internal"]) { - super(internal); + constructor(_internal: SSEPeer["_internal"]) { + super(_internal); + _internal.ws.readyState = 0 /* CONNECTING */; this._sseStream = new ReadableStream({ start: (controller) => { + _internal.ws.readyState = 1 /* OPEN */; this._sseStreamController = controller; - this._internal.sse.hooks.callHook("open", this); + _internal.hooks.callHook("open", this); }, cancel: () => { - this._internal.sse.onClose(this); - this._internal.sse.hooks.callHook("close", this); + _internal.ws.readyState = 2 /* CLOSING */; + _internal.peers.delete(this); + _internal.peersMap?.delete(this.id); + Promise.resolve(this._internal.hooks.callHook("close", this)).finally( + () => { + _internal.ws.readyState = 3 /* CLOSED */; + }, + ); }, }).pipeThrough(new TextEncoderStream()); } - get url() { - return this._internal.sse.request.url; - } - - get headers() { - return this._internal.sse.request.headers; - } - _sendEvent(event: string, data: string) { const lines = data.split("\n"); this._sseStreamController?.enqueue( @@ -134,16 +129,16 @@ class SSEPeer extends Peer<{ ); } - send(message: any) { - this._sendEvent("message", toString(message)); + send(data: unknown) { + this._sendEvent("message", toString(data)); return 0; } - publish(topic: string, message: any) { - const data = toString(message); + publish(topic: string, data: unknown) { + const dataBuff = toString(data); for (const peer of this._internal.peers) { if (peer !== this && peer._topics.has(topic)) { - peer._sendEvent("message", data); + peer._sendEvent("message", dataBuff); } } } @@ -151,8 +146,10 @@ class SSEPeer extends Peer<{ close() { this._sseStreamController?.close(); } - - terminate() { - this.close(); - } +} + +// --- web compat --- + +class SSEWebSocketStub implements Partial { + readyState?: number | undefined; } diff --git a/src/adapters/uws.ts b/src/adapters/uws.ts index 603050e..6f835c9 100644 --- a/src/adapters/uws.ts +++ b/src/adapters/uws.ts @@ -1,28 +1,23 @@ import type { AdapterOptions, AdapterInstance } from "../adapter.ts"; +import type { WebSocket } from "../../types/web.ts"; +import type uws from "uWebSockets.js"; import { toBufferLike } from "../utils.ts"; import { defineWebSocketAdapter, adapterUtils } from "../adapter.ts"; import { AdapterHookable } from "../hooks.ts"; import { Message } from "../message.ts"; import { Peer } from "../peer.ts"; -import type { - WebSocketBehavior, - WebSocket, - HttpRequest, - HttpResponse, - RecognizedString, -} from "uWebSockets.js"; - // --- types --- type UserData = { - _peer?: any; - req: HttpRequest; - res: HttpResponse; - context: any; + peer?: UWSPeer; + req: uws.HttpRequest; + res: uws.HttpResponse; + protocol: string; + extensions: string; }; -type WebSocketHandler = WebSocketBehavior; +type WebSocketHandler = uws.WebSocketBehavior; export interface UWSAdapter extends AdapterInstance { websocket: WebSocketHandler; @@ -30,7 +25,7 @@ export interface UWSAdapter extends AdapterInstance { export interface UWSOptions extends AdapterOptions { uws?: Exclude< - WebSocketBehavior, + uws.WebSocketBehavior, | "close" | "drain" | "message" @@ -56,12 +51,16 @@ export default defineWebSocketAdapter( ...options.uws, close(ws, code, message) { const peer = getPeer(ws, peers); + ((peer as any)._internal.ws as UwsWebSocketProxy).readyState = + 2 /* CLOSING */; peers.delete(peer); hooks.callAdapterHook("uws:close", peer, ws, code, message); hooks.callHook("close", peer, { code, reason: message?.toString(), }); + ((peer as any)._internal.ws as UwsWebSocketProxy).readyState = + 3 /* CLOSED */; }, drain(ws) { const peer = getPeer(ws, peers); @@ -70,8 +69,7 @@ export default defineWebSocketAdapter( message(ws, message, isBinary) { const peer = getPeer(ws, peers); hooks.callAdapterHook("uws:message", peer, ws, message, isBinary); - const msg = new Message(message, isBinary); - hooks.callHook("message", peer, msg); + hooks.callHook("message", peer, new Message(message, peer)); }, open(ws) { const peer = getPeer(ws, peers); @@ -131,16 +129,21 @@ export default defineWebSocketAdapter( res.writeHeader(key, value); } } + res.cork(() => { + const key = req.getHeader("sec-websocket-key"); + const protocol = req.getHeader("sec-websocket-protocol"); + const extensions = req.getHeader("sec-websocket-extensions"); res.upgrade( { req, res, - context, + protocol, + extensions, }, - req.getHeader("sec-websocket-key"), - req.getHeader("sec-websocket-protocol"), - req.getHeader("sec-websocket-extensions"), + key, + protocol, + extensions, context, ); }); @@ -152,73 +155,63 @@ export default defineWebSocketAdapter( // --- peer --- -function getPeer(ws: WebSocket, peers: Set): UWSPeer { - const userData = ws.getUserData(); - if (userData._peer) { - return userData._peer as UWSPeer; +function getPeer(uws: uws.WebSocket, peers: Set): UWSPeer { + const uwsData = uws.getUserData(); + if (uwsData.peer) { + return uwsData.peer; } - const peer = new UWSPeer({ peers, uws: { ws, userData } }); - userData._peer = peer; + const peer = new UWSPeer({ + peers, + uws, + ws: new UwsWebSocketProxy(uws), + request: new UWSReqProxy(uwsData.req), + uwsData, + }); + uwsData.peer = peer; return peer; } class UWSPeer extends Peer<{ peers: Set; - uws: { - ws: WebSocket; - userData: UserData; - }; + request: UWSReqProxy; + uws: uws.WebSocket; + ws: UwsWebSocketProxy; + uwsData: UserData; }> { - _decoder = new TextDecoder(); - _req: UWSReqProxy; - - constructor(ctx: UWSPeer["_internal"]) { - super(ctx); - this._req = new UWSReqProxy(ctx.uws.userData.req); - } - - get addr() { + get remoteAddress() { try { - const addr = this._decoder.decode( - this._internal.uws.ws?.getRemoteAddressAsText(), + const addr = new TextDecoder().decode( + this._internal.uws.getRemoteAddressAsText(), ); - return addr.replace(/(0000:)+/, ""); + return addr; } catch { // Error: Invalid access of closed uWS.WebSocket/SSLWebSocket. } } - get url() { - return this._req.url; - } - - get headers() { - return this._req.headers; - } - - send(message: any, options?: { compress?: boolean }) { - const data = toBufferLike(message); + send(data: unknown, options?: { compress?: boolean }) { + const dataBuff = toBufferLike(data); const isBinary = typeof data !== "string"; - return this._internal.uws.ws.send(data, isBinary, options?.compress); + return this._internal.uws.send(dataBuff, isBinary, options?.compress); } subscribe(topic: string): void { - this._internal.uws.ws.subscribe(topic); + this._internal.uws.subscribe(topic); } publish(topic: string, message: string, options?: { compress?: boolean }) { const data = toBufferLike(message); const isBinary = typeof data !== "string"; - this._internal.uws.ws.publish(topic, data, isBinary, options?.compress); + this._internal.uws.publish(topic, data, isBinary, options?.compress); return 0; } - close(code?: number, reason?: RecognizedString) { - this._internal.uws.ws.end(code, reason); + close(code?: number, reason?: uws.RecognizedString) { + this._internal.uws.end(code, reason); } terminate(): void { - this._internal.uws.ws.close(); + this._internal.uws.close(); } } @@ -227,11 +220,10 @@ class UWSPeer extends Peer<{ class UWSReqProxy { private _headers?: Headers; private _rawHeaders: [string, string][] = []; + url: string; - constructor(_req: HttpRequest) { - // We need to precompute values since uws doesn't provide them after handler. - + constructor(_req: uws.HttpRequest) { // Headers let host = "localhost"; let proto = "http"; @@ -244,7 +236,6 @@ class UWSReqProxy { } this._rawHeaders.push([key, value]); }); - // URL const query = _req.getQuery(); const pathname = _req.getUrl(); @@ -258,3 +249,21 @@ class UWSReqProxy { return this._headers; } } + +class UwsWebSocketProxy implements Partial { + readyState?: number = 1 /* OPEN */; + + constructor(private _uws: uws.WebSocket) {} + + get bufferedAmount() { + return this._uws?.getBufferedAmount(); + } + + get protocol() { + return this._uws?.getUserData().protocol; + } + + get extensions() { + return this._uws?.getUserData().extensions; + } +} diff --git a/src/message.ts b/src/message.ts index 4dbe3d9..019f067 100644 --- a/src/message.ts +++ b/src/message.ts @@ -1,27 +1,202 @@ -import { toBufferLike } from "./utils.ts"; +import type { Peer } from "./peer.ts"; +import { randomUUID } from "uncrypto"; -export class Message { - constructor( - public readonly rawData: any, - public readonly isBinary?: boolean, - ) {} +export class Message implements Partial { + /** Access to the original [message event](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/message_event) if available. */ + readonly event?: MessageEvent; - text(): string { - if (typeof this.rawData === "string") { - return this.rawData; - } - const buff = toBufferLike(this.rawData); - if (typeof buff === "string") { - return buff; - } - return new TextDecoder().decode(buff); + /** Access to the Peer that emitted the message. */ + readonly peer?: Peer; + + /** Raw message data (can be of any type). */ + readonly rawData: unknown; + + #id?: string; + #uint8Array?: Uint8Array; + #arrayBuffer?: ArrayBuffer | SharedArrayBuffer; + #blob?: Blob; + #text?: string; + #json?: unknown; + + constructor(rawData: unknown, peer: Peer, event?: MessageEvent) { + this.rawData = rawData || ""; + this.peer = peer; + this.event = event; } + /** + * Unique random [uuid v4](https://developer.mozilla.org/en-US/docs/Glossary/UUID) identifier for the message. + */ + get id(): string { + if (!this.#id) { + this.#id = randomUUID(); + } + return this.#id; + } + + // --- data views --- + + /** + * Get data as [Uint8Array](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Uint8Array) value. + * + * If raw data is in any other format or string, it will be automatically converted and encoded. + */ + uint8Array() { + // Cached + const _uint8Array = this.#uint8Array; + if (_uint8Array) { + return _uint8Array; + } + const rawData = this.rawData; + // Uint8Array + if (rawData instanceof Uint8Array) { + return (this.#uint8Array = rawData); + } + // ArrayBuffer + if ( + rawData instanceof ArrayBuffer || + rawData instanceof SharedArrayBuffer + ) { + this.#arrayBuffer = rawData; + return (this.#uint8Array = new Uint8Array(rawData)); + } + // String + if (typeof rawData === "string") { + this.#text = rawData; + return (this.#uint8Array = new TextEncoder().encode(this.#text)); + } + // Iterable and ArrayLike + if (Symbol.iterator in (rawData as Iterable)) { + return (this.#uint8Array = new Uint8Array(rawData as Iterable)); + } + if (typeof (rawData as ArrayLike)?.length === "number") { + return (this.#uint8Array = new Uint8Array(rawData as ArrayLike)); + } + // DataView + if (rawData instanceof DataView) { + return (this.#uint8Array = new Uint8Array( + rawData.buffer, + rawData.byteOffset, + rawData.byteLength, + )); + } + throw new TypeError( + `Unsupported message type: ${Object.prototype.toString.call(rawData)}`, + ); + } + + /** + * Get data as [ArrayBuffer](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/ArrayBuffer) or [SharedArrayBuffer](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/SharedArrayBuffer) value. + * + * If raw data is in any other format or string, it will be automatically converted and encoded. + */ + arrayBuffer(): ArrayBuffer | SharedArrayBuffer { + // Cached + const _arrayBuffer = this.#arrayBuffer; + if (_arrayBuffer) { + return _arrayBuffer; + } + const rawData = this.rawData; + // Use as-is + if ( + rawData instanceof ArrayBuffer || + rawData instanceof SharedArrayBuffer + ) { + return (this.#arrayBuffer = rawData); + } + // Fallback to UInt8Array + return (this.#arrayBuffer = this.uint8Array().buffer); + } + + /** + * Get data as [Blob](https://developer.mozilla.org/en-US/docs/Web/API/Blob) value. + * + * If raw data is in any other format or string, it will be automatically converted and encoded. */ + blob(): Blob { + // Cached + const _blob = this.#blob; + if (_blob) { + return _blob; + } + const rawData = this.rawData; + // Use as-is + if (rawData instanceof Blob) { + return (this.#blob = rawData); + } + // Fallback to UInt8Array + return (this.#blob = new Blob([this.uint8Array()])); + } + + /** + * Get stringified text version of the message. + * + * If raw data is in any other format, it will be automatically converted and decoded. + */ + text(): string { + // Cached + const _text = this.#text; + if (_text) { + return _text; + } + const rawData = this.rawData; + // Use as-is + if (typeof rawData === "string") { + return (this.#text = rawData); + } + // Fallback to UInt8Array + return (this.#text = new TextDecoder().decode(this.uint8Array())); + } + + /** + * Get parsed version of the message text with [`JSON.parse()`](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/JSON/parse). + */ + json(): T { + const _json = this.#json; + if (_json) { + return _json as T; + } + return (this.#json = JSON.parse(this.text())); + } + + /** + * Message data (value varies based on `peer.websocket.binaryType`). + */ + get data() { + switch (this.peer?.websocket?.binaryType as string) { + case "arraybuffer": { + return this.arrayBuffer(); + } + case "blob": { + return this.blob(); + } + case "nodebuffer": { + return globalThis.Buffer + ? Buffer.from(this.uint8Array()) + : this.uint8Array(); + } + case "uint8array": { + return this.uint8Array(); + } + case "text": { + return this.text(); + } + default: { + return this.rawData; + } + } + } + + // --- inspect --- + toString() { return this.text(); } - [Symbol.for("nodejs.util.inspect.custom")]() { + [Symbol.toPrimitive]() { return this.text(); } + + [Symbol.for("nodejs.util.inspect.custom")]() { + return { data: this.rawData }; + } } diff --git a/src/peer.ts b/src/peer.ts index f554d41..b7a7737 100644 --- a/src/peer.ts +++ b/src/peer.ts @@ -1,111 +1,142 @@ +import type * as web from "../types/web.ts"; import { randomUUID } from "uncrypto"; -// https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/readyState -type ReadyState = 0 | 1 | 2 | 3; -const ReadyStateMap = { - "-1": "unknown", - 0: "connecting", - 1: "open", - 2: "closing", - 3: "closed", -} as const; - export interface AdapterInternal { + ws: unknown; + request?: Request | Partial; peers?: Set; } export abstract class Peer { protected _internal: Internal; protected _topics: Set; - - private _id?: string; + #id?: string; + #ws?: Partial; constructor(internal: Internal) { this._topics = new Set(); this._internal = internal; } + /** + * Unique random [uuid v4](https://developer.mozilla.org/en-US/docs/Glossary/UUID) identifier for the peer. + */ get id(): string { - if (!this._id) { - this._id = randomUUID(); + if (!this.#id) { + this.#id = randomUUID(); } - return this._id; + return this.#id; } - get addr(): string | undefined { + /** IP address of the peer */ + get remoteAddress(): string | undefined { return undefined; } - get url(): string { - return ""; + /** upgrade request */ + get request(): Request | Partial | undefined { + return this._internal.request; } - get headers(): Headers | undefined { - return undefined; - } - - get readyState(): ReadyState | -1 { - return -1; + /** + * Get the [WebSocket](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket) instance. + * + * **Note:** crossws adds polyfill for the following properties if native values are not available: + * - `protocol`: Extracted from the `sec-websocket-protocol` header. + * - `extensions`: Extracted from the `sec-websocket-extensions` header. + * - `url`: Extracted from the request URL (http -> ws). + * */ + get websocket(): Partial { + if (!this.#ws) { + const _ws = this._internal.ws as Partial; + const _request = this._internal.request; + this.#ws = _request ? createWsProxy(_ws, _request) : _ws; + } + return this.#ws; } + /** All connected peers to the server */ get peers(): Set { return this._internal.peers || new Set(); } - abstract send(message: any, options?: { compress?: boolean }): number; + abstract close(code?: number, reason?: string): void; - abstract publish( - topic: string, - message: any, - options?: { compress?: boolean }, - ): void; + /** Abruptly close the connection */ + terminate() { + this.close(); + } + /** Subscribe to a topic */ subscribe(topic: string) { this._topics.add(topic); } + /** Unsubscribe from a topic */ unsubscribe(topic: string) { this._topics.delete(topic); } + /** Send a message to the peer. */ + abstract send( + data: unknown, + options?: { compress?: boolean }, + ): number | void | undefined; + + /** Send message to subscribes of topic */ + abstract publish( + topic: string, + data: unknown, + options?: { compress?: boolean }, + ): void; + + // --- inspect --- + toString() { return this.id; } - [Symbol.for("nodejs.util.inspect.custom")]() { - const _id = this.toString(); - const _addr = this.addr ? ` (${this.addr})` : ""; - const _state = - this.readyState === 1 || this.readyState === -1 - ? "" - : ` [${ReadyStateMap[this.readyState]}]`; - - return `${_id}${_addr}${_state}`; + [Symbol.toPrimitive]() { + return this.id; } - /** - * Closes the connection. - * - * Here is a list of close codes: - * - * - `1000` means "normal closure" (default) - * - `1009` means a message was too big and was rejected - * - `1011` means the server encountered an error - * - `1012` means the server is restarting - * - `1013` means the server is too busy or the client is rate-limited - * - `4000` through `4999` are reserved for applications (you can use it!) - * - * To close the connection abruptly, use `terminate()`. - * - * @param code The close code to send - * @param reason The close reason to send - */ - abstract close(code?: number, reason?: string): void; + [Symbol.toStringTag]() { + return "WebSocket"; + } - /** - * Abruptly close the connection. - * - * To gracefully close the connection, use `close()`. - */ - abstract terminate(): void; + [Symbol.for("nodejs.util.inspect.custom")]() { + return Object.fromEntries( + [ + ["id", this.id], + ["remoteAddress", this.remoteAddress], + ["peers", this.peers], + ["webSocket", this.websocket], + ].filter((p) => p[1]), + ); + } +} + +function createWsProxy( + ws: Partial, + request: Partial, +): Partial { + return new Proxy(ws, { + get: (target, prop) => { + const value = Reflect.get(target, prop); + if (!value) { + switch (prop) { + case "protocol": { + return request?.headers?.get("sec-websocket-protocol") || ""; + } + case "extensions": { + return request?.headers?.get("sec-websocket-extensions") || ""; + } + case "url": { + return request?.url?.replace(/^http/, "ws") || undefined; + } + } + } + return value; + }, + }); } diff --git a/test/_utils.ts b/test/_utils.ts index 0b0c3f7..3d39298 100644 --- a/test/_utils.ts +++ b/test/_utils.ts @@ -26,6 +26,8 @@ export function wsConnect( headers: opts?.headers, dispatcher: inspector, }); + ws.binaryType = "arraybuffer"; + websockets.add(ws); const send = async (data: any): Promise => { @@ -51,13 +53,22 @@ export function wsConnect( nextIndex += count; }; - ws.addEventListener("message", (event) => { - const str = - typeof event.data === "string" - ? event.data - : new TextDecoder().decode(event.data); - const payload = str[0] === "{" ? JSON.parse(str) : str; + ws.addEventListener("message", async (event) => { + let text: string; + if (typeof event.data === "string") { + text = event.data; + } else { + let rawData = event.data; + if (rawData instanceof Blob) { + rawData = await event.data.arrayBuffer(); + } else if (rawData instanceof Uint8Array) { + rawData = rawData.buffer; + } + text = new TextDecoder().decode(rawData); + } + const payload = text[0] === "{" ? JSON.parse(text) : text; messages.push(payload); + const index = messages.length - 1; if (waitCallbacks[index]) { waitCallbacks[index](payload); diff --git a/test/fixture/_shared.ts b/test/fixture/_shared.ts index 3b15321..c3aac96 100644 --- a/test/fixture/_shared.ts +++ b/test/fixture/_shared.ts @@ -27,9 +27,19 @@ export function createDemo>( case "debug": { peer.send({ id: peer.id, - ip: peer.addr, - url: peer.url, - headers: Object.fromEntries(peer.headers || []), + remoteAddress: peer.remoteAddress, + request: { + url: peer.request?.url, + headers: Object.fromEntries(peer.request?.headers || []), + }, + websocket: { + readyState: peer.websocket.readyState, + protocol: peer.websocket.protocol, + extensions: peer.websocket.extensions, + url: peer.websocket.url, + binaryType: peer.websocket.binaryType, + bufferedAmount: peer.websocket.bufferedAmount, + }, }); break; } diff --git a/test/fixture/deno.ts b/test/fixture/deno.ts index 59ad329..bf18628 100644 --- a/test/fixture/deno.ts +++ b/test/fixture/deno.ts @@ -1,4 +1,4 @@ -// You can run this demo using `deno run -A ./deno.ts` or `npm run play:deno` in repo +// You can run this demo using `npm run play:deno` in repo import denoAdapter from "../../src/adapters/deno.ts"; diff --git a/test/tests.ts b/test/tests.ts index d4d850b..a2b30ad 100644 --- a/test/tests.ts +++ b/test/tests.ts @@ -1,10 +1,13 @@ import { expect, test } from "vitest"; import { wsConnect } from "./_utils"; -export function wsTests( - getURL: () => string, - opts: { adapter: string; pubsub?: boolean; resHeaders?: boolean }, -) { +export interface WSTestOpts { + adapter: string; + pubsub?: boolean; + resHeaders?: boolean; +} + +export function wsTests(getURL: () => string, opts: WSTestOpts) { test("http works", async () => { const response = await fetch(getURL().replace("ws", "http")); expect(response.status).toBe(200); @@ -60,28 +63,50 @@ export function wsTests( }, ); - test("upgrade request headers", async () => { - const ws = await wsConnect(getURL(), { + test("peer.request (headers, url, remoteAddress)", async () => { + const ws = await wsConnect(getURL() + "?foo=bar", { skip: 1, headers: { "x-test": "1" }, }); await ws.send("debug"); - const { headers } = await ws.next(); + const { request, remoteAddress } = await ws.next(); + + // Headers if (opts.adapter === "sse") { - expect(headers["connection"]).toBe("keep-alive"); + expect(request.headers["connection"]).toBe("keep-alive"); } else { - expect(headers["connection"]).toMatch(/^upgrade$/i); - expect(headers["x-test"]).toBe("1"); + expect(request.headers["connection"]).toMatch(/^upgrade$/i); + expect(request.headers["x-test"]).toBe("1"); + } + + // URL + expect(request.url).toMatch(/^http:\/\/localhost:\d+\/\?foo=bar$/); + const url = new URL(request.url); + expect(url.search).toBe("?foo=bar"); + + // Remote address + if (!/sse|cloudflare/.test(opts.adapter)) { + expect(remoteAddress).toMatch(/:{2}1|(?:0{4}:){7}0{3}1|127\.0\.\0\.1/); } }); - test("upgrade request url", async () => { - const ws = await wsConnect(getURL() + "?foo=bar", { skip: 1 }); + test("peer.websocket", async () => { + const ws = await wsConnect(getURL() + "?foo=bar", { + skip: 1, + headers: { + "Sec-WebSocket-Protocol": "crossws", + }, + }); await ws.send("debug"); - const info = await ws.next(); - expect(info.url).toMatch(/^http:\/\/localhost:\d+\/\?foo=bar$/); - const url = new URL(info.url); - expect(url.search).toBe("?foo=bar"); + const { websocket } = await ws.next(); + expect(websocket).toMatchObject({ + readyState: 1, + protocol: /ss/.test(opts.adapter) ? "" : "crossws", + extensions: /sse|cloudflare/.test(opts.adapter) + ? "" + : "permessage-deflate; client_max_window_bits", + url: getURL() + "?foo=bar", + }); }); test.skipIf(opts.adapter === "sse")("upgrade fail response", async () => { diff --git a/types/web.ts b/types/web.ts index aa12672..b546e9b 100644 --- a/types/web.ts +++ b/types/web.ts @@ -255,7 +255,7 @@ export interface WebSocket extends EventTarget { * * [MDN Reference](https://developer.mozilla.org/docs/Web/API/WebSocket/binaryType) */ - binaryType: BinaryType; + binaryType: BinaryType | (string & {}); /** * Returns the number of bytes of application data (UTF-8 text and binary data) that have been queued using send() but not yet been transmitted to the network. *