feat!: overhaul peer and message interface (#70)

This commit is contained in:
Pooya Parsa
2024-08-15 21:04:26 +02:00
committed by GitHub
parent b4071b7371
commit c8518ff8f3
17 changed files with 722 additions and 468 deletions

1
.gitignore vendored
View File

@@ -9,4 +9,5 @@ dist
.wrangler .wrangler
/adapters /adapters
/websocket
websocket.d.ts websocket.d.ts

View File

@@ -6,38 +6,38 @@ icon: mynaui:api
> Peer object allows easily interacting with connected clients. > Peer object allows easily interacting with connected clients.
Websocket [hooks](/guide/hooks) accept a peer instance as their first argument. You can use peer object to get information about each connected client or send a message to them. When a new client connects to the server, crossws creates a peer instance that allows getting information from clients and sending messages to them.
> [!TIP] ## Instance properties
> You can safely log a peer instance to the console using `console.log` it will be automatically stringified with useful information including the remote address and connection status!
## Properties
### `peer.url`
Request http url during upgrade. You can use it to do actions based on path and search params.
### `peer.headers`
Request http headers during upgrade. Youb can use it to do authentication and access upgrade headers.
### `peer.addr`
The IP address of the client.
### `peer.id` ### `peer.id`
A unique id assigned to the peer. Unique random identifier ([uuid v4](https://developer.mozilla.org/en-US/docs/Glossary/UUID)) for the peer.
### `peer.readyState` ### `peer.request?`
Client connection status (might be `undefined`) Access to the upgrade request info. You can use it to do authentication and access users headers and cookies.
:read-more{to="https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/readyState" title="readyState in MDN"} > [!NOTE]
> This property is compatible with web [Request](https://developer.mozilla.org/en-US/docs/Web/API/Request) interface, However interface is emulated for Node.js and sometimes unavailable. Refer to the [compatibility table](#compatibility) for more info.
## Methods ### `peer.remoteAddress?`
### `peer.send(message, compress)` The IP address of the client.
> [!NOTE]
> Not all adapters provide this. Refer to the [compatibility table](#compatibility) for more info.
### `peer.websocket`
Direct access to the [`WebSocket`](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket) instance.
> [!NOTE]
> WebSocket properties vary across runtimes. When accessing `peer.websocket`, a lightweight proxy increases stablity. Refer to the [compatibility table](#compatibility) for more info.
## Instance methods
### `peer.send(message, { compress? })`
Send a message to the connected client. Send a message to the connected client.
@@ -79,3 +79,44 @@ To close the connection abruptly, use `peer.terminate()`.
Abruptly close the connection. Abruptly close the connection.
To gracefully close the connection, use `peer.close()`. To gracefully close the connection, use `peer.close()`.
## Compatibility
| | [Bun][bun] | [Cloudflare][cfw] | [Cloudflare (durable)][cfd] | [Deno][deno] | [Node (ws)][nodews] | [Node (μWebSockets)][nodeuws] | [SSE][sse] |
| --------------------------- | ---------- | ----------------- | --------------------------- | ------------ | ------------------- | ----------------------------- | ---------- |
| `send()` | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ |
| `publish()` / `subscribe()` | ✓ | ⨉ | ✓ [^1] | ✓ [^1] | ✓ [^1] | ✓ | ✓ [^1] |
| `close()` | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ |
| `terminate()` | ✓ | ✓ [^2] | ✓ | ✓ | ✓ | ✓ | ✓ [^2] |
| `request` | ✓ | ✓ | ✓ [^3] | ✓ | ✓ [^3] | ✓ [^3] | ✓ |
| `remoteAddress` | ✓ | ⨉ | ⨉ | ✓ | ✓ | ✓ | ⨉ |
| `websocket.url` | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ |
| `websocket.extensions` | ✓ [^4] | ⨉ | ⨉ | ✓ [^4] | ✓ [^4] | ✓ [^4] | ⨉ |
| `websocket.protocol` | ✓ [^5] | ✓ [^5] | ✓ [^5] | [^5] ✓ | ✓ [^5] | ✓ [^5] | ⨉ |
| `websocket.readyState` | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ [^6] | ✓ [^6] |
| `websocket.binaryType` | ✓ [^7] | ⨉ | ⨉ | ✓ | ✓ [^7] | ✓ | ⨉ |
| `websocket.bufferedAmount` | ⨉ | ⨉ | ⨉ | ✓ | ✓ | ✓ | ⨉ |
[bun]: /adapters/bun
[cfw]: /adapters/cloudflare
[cfd]: /adapters/cloudflare#durable-objects
[deno]: /adapters/deno
[nodews]: /adapters/node
[nodeuws]: /adapters/node#uwebsockets
[sse]: adapters/sse
[^1]: pubsub is not natively handled by runtime. peers are internally tracked.
[^2]: `close()` will be used for compatibility.
[^1]: using a proxy for [Request](https://developer.mozilla.org/en-US/docs/Web/API/Request) compatible interface (`url`, `headers` only) wrapping Node.js requests.
[^3]: `request` is not always available (only in `open` hook).
[^4]: [`websocket.extensions`](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/extensions) is polyfilled using [`sec-websocket-extensions`](https://developer.mozilla.org/en-US/docs/Web/HTTP/Protocol_upgrade_mechanism#websocket-specific_headers) request header.
[^5]: [`websocket.protocol`](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/protocol) is polyfilled using [`sec-websocket-protocol`](https://developer.mozilla.org/en-US/docs/Web/HTTP/Protocol_upgrade_mechanism#websocket-specific_headers) request header.
[^6]: [`websocket.readyState`](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/readyState) is polyfilled by tracking open/close events.
[^7]: Some runtimes have non standard values including `"nodebuffer"` and `"uint8array"`. crossws auto converts them for [`message.data`](/guide/message#messagedata).

View File

@@ -4,21 +4,73 @@ icon: solar:letter-line-duotone
# Message # Message
On `message` [hook](/guide/hooks), you receive a message object containing an incoming message from the client. On `message` [hook](/guide/hooks), you receive a message object containing data from the client.
> [!TIP] > [!NOTE]
> You can safely log `message` object to the console using `console.log` it will be automatically stringified! > Message object is API-compatible with standard Websocket [`MessageEvent`](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/message_event) with convenient superset of utils.
## API ## Instance properties
### `message.text()` ### `message.id`
Get stringified text version of the message Unique random identifier ([uuid v4](https://developer.mozilla.org/en-US/docs/Glossary/UUID)) for the message.
### `message.event`
Access to the original [message event](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/message_event) if available.
### `message.peer`
Access to the [peer instance](/guide/peer) that emitted the message.
### `message.rawData` ### `message.rawData`
Raw message data Raw message data (can be of any type).
### `message.isBinary` ### `message.data`
Indicates if the message is binary (might be `undefined`) Message data (value varies based on [`peer.binaryType`](/guide/peer#peerbinarytype)).
## Instance methods
### `message.text()`
Get stringified text version of the message.
If raw data is in any other format, it will be automatically converted or decoded.
### `message.json()`
Get parsed version of the message text with [`JSON.parse()`](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/JSON/parse).
### `message.uint8Array()`
Get data as [`Uint8Array`](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Uint8Array) value.
If raw data is in any other format or string, it will be automatically converted or encoded.
### `message.arrayBuffer()`
Get data as [`ArrayBuffer`](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/ArrayBuffer) or [`SharedArrayBuffer`](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/SharedArrayBuffer) value.
If raw data is in any other format or string, it will be automatically converted or encoded.
### `message.blob()`
Get data as [`Blob`](https://developer.mozilla.org/en-US/docs/Web/API/Blob) value.
If raw data is in any other format or string, it will be automatically converted or encoded.
## Adapter support
| | [Bun][bun] | [Cloudflare][cfw] | [Cloudflare (durable)][cfd] | [Deno][deno] | [Node (ws)][nodews] | [Node (μWebSockets)][nodeuws] | [SSE][sse] |
| ------- | ---------- | ----------------- | --------------------------- | ------------ | ------------------- | ----------------------------- | ---------- |
| `event` | ⨉ | ✓ | ⨉ | ✓ | ⨉ | ⨉ | ⨉ |
[bun]: /adapters/bun
[cfw]: /adapters/cloudflare
[cfd]: /adapters/cloudflare#durable-objects
[deno]: /adapters/deno
[nodews]: /adapters/node
[nodeuws]: /adapters/node#uwebsockets
[sse]: adapters/sse

View File

@@ -1,5 +1,4 @@
import type { WebSocketHandler, ServerWebSocket, Server } from "bun"; import type { WebSocketHandler, ServerWebSocket, Server } from "bun";
import type { AdapterOptions, AdapterInstance } from "../adapter.ts"; import type { AdapterOptions, AdapterInstance } from "../adapter.ts";
import { toBufferLike } from "../utils.ts"; import { toBufferLike } from "../utils.ts";
import { defineWebSocketAdapter, adapterUtils } from "../adapter.ts"; import { defineWebSocketAdapter, adapterUtils } from "../adapter.ts";
@@ -17,9 +16,8 @@ export interface BunAdapter extends AdapterInstance {
export interface BunOptions extends AdapterOptions {} export interface BunOptions extends AdapterOptions {}
type ContextData = { type ContextData = {
_peer?: BunPeer; peer?: BunPeer;
request?: Request; request: Request;
requestUrl?: string;
server?: Server; server?: Server;
}; };
@@ -41,7 +39,6 @@ export default defineWebSocketAdapter<BunAdapter, BunOptions>(
data: { data: {
server, server,
request, request,
requestUrl: request.url,
} satisfies ContextData, } satisfies ContextData,
headers: res?.headers, headers: res?.headers,
}); });
@@ -52,7 +49,7 @@ export default defineWebSocketAdapter<BunAdapter, BunOptions>(
websocket: { websocket: {
message: (ws, message) => { message: (ws, message) => {
const peer = getPeer(ws, peers); const peer = getPeer(ws, peers);
hooks.callHook("message", peer, new Message(message)); hooks.callHook("message", peer, new Message(message, peer));
}, },
open: (ws) => { open: (ws) => {
const peer = getPeer(ws, peers); const peer = getPeer(ws, peers);
@@ -89,66 +86,51 @@ function getPeer(
ws: ServerWebSocket<ContextData>, ws: ServerWebSocket<ContextData>,
peers: Set<BunPeer>, peers: Set<BunPeer>,
): BunPeer { ): BunPeer {
if (ws.data?._peer) { if (ws.data?.peer) {
return ws.data._peer; return ws.data.peer;
} }
const peer = new BunPeer({ peers, bun: { ws } }); const peer = new BunPeer({ ws, request: ws.data.request, peers });
ws.data = { ws.data = {
...ws.data, ...ws.data,
_peer: peer, peer,
}; };
return peer; return peer;
} }
class BunPeer extends Peer<{ class BunPeer extends Peer<{
ws: ServerWebSocket<ContextData>;
request: Request;
peers: Set<BunPeer>; peers: Set<BunPeer>;
bun: { ws: ServerWebSocket<ContextData> };
}> { }> {
get addr() { get remoteAddress() {
let addr = this._internal.bun.ws.remoteAddress; return this._internal.ws.remoteAddress;
if (addr.includes(":")) {
addr = `[${addr}]`;
}
return addr;
} }
get readyState() { send(data: unknown, options?: { compress?: boolean }) {
return this._internal.bun.ws.readyState as any; return this._internal.ws.send(toBufferLike(data), options?.compress);
} }
get url() { publish(topic: string, data: unknown, options?: { compress?: boolean }) {
return this._internal.bun.ws.data.requestUrl || "/"; return this._internal.ws.publish(
}
get headers() {
return this._internal.bun.ws.data.request?.headers;
}
send(message: any, options?: { compress?: boolean }) {
return this._internal.bun.ws.send(toBufferLike(message), options?.compress);
}
publish(topic: string, message: any, options?: { compress?: boolean }) {
return this._internal.bun.ws.publish(
topic, topic,
toBufferLike(message), toBufferLike(data),
options?.compress, options?.compress,
); );
} }
subscribe(topic: string): void { subscribe(topic: string): void {
this._internal.bun.ws.subscribe(topic); this._internal.ws.subscribe(topic);
} }
unsubscribe(topic: string): void { unsubscribe(topic: string): void {
this._internal.bun.ws.unsubscribe(topic); this._internal.ws.unsubscribe(topic);
} }
close(code?: number, reason?: string) { close(code?: number, reason?: string) {
this._internal.bun.ws.close(code, reason); this._internal.ws.close(code, reason);
} }
terminate() { terminate() {
this._internal.bun.ws.terminate(); this._internal.ws.terminate();
} }
} }

View File

@@ -1,4 +1,5 @@
import type { AdapterOptions, AdapterInstance } from "../adapter.ts"; import type { AdapterOptions, AdapterInstance } from "../adapter.ts";
import type * as web from "../../types/web.ts";
import { toBufferLike } from "../utils.ts"; import { toBufferLike } from "../utils.ts";
import { defineWebSocketAdapter, adapterUtils } from "../adapter.ts"; import { defineWebSocketAdapter, adapterUtils } from "../adapter.ts";
import { AdapterHookable } from "../hooks.ts"; import { AdapterHookable } from "../hooks.ts";
@@ -38,13 +39,13 @@ export interface CloudflareDurableAdapter extends AdapterInstance {
handleDurableMessage( handleDurableMessage(
obj: DurableObject, obj: DurableObject,
ws: WebSocket | CF.WebSocket, ws: WebSocket | CF.WebSocket | web.WebSocket,
message: ArrayBuffer | string, message: ArrayBuffer | string,
): Promise<void>; ): Promise<void>;
handleDurableClose( handleDurableClose(
obj: DurableObject, obj: DurableObject,
ws: WebSocket | CF.WebSocket, ws: WebSocket | CF.WebSocket | web.WebSocket,
code: number, code: number,
reason: string, reason: string,
wasClean: boolean, wasClean: boolean,
@@ -102,7 +103,7 @@ export default defineWebSocketAdapter<
handleDurableMessage: async (obj, ws, message) => { handleDurableMessage: async (obj, ws, message) => {
const peer = peerFromDurableEvent(obj, ws as CF.WebSocket); const peer = peerFromDurableEvent(obj, ws as CF.WebSocket);
hooks.callAdapterHook("cloudflare:message", peer, message); hooks.callAdapterHook("cloudflare:message", peer, message);
hooks.callHook("message", peer, new Message(message)); hooks.callHook("message", peer, new Message(message, peer));
}, },
handleDurableClose: async (obj, ws, code, reason, wasClean) => { handleDurableClose: async (obj, ws, code, reason, wasClean) => {
const peer = peerFromDurableEvent(obj, ws as CF.WebSocket); const peer = peerFromDurableEvent(obj, ws as CF.WebSocket);
@@ -124,12 +125,10 @@ function peerFromDurableEvent(
return peer; return peer;
} }
peer = ws._crosswsPeer = new CloudflareDurablePeer({ peer = ws._crosswsPeer = new CloudflareDurablePeer({
cloudflare: {
ws: ws as CF.WebSocket, ws: ws as CF.WebSocket,
request, request: request as Request,
env: (obj as DurableObjectPub).env, cfEnv: (obj as DurableObjectPub).env,
context: (obj as DurableObjectPub).ctx, cfCtx: (obj as DurableObjectPub).ctx,
},
}); });
return peer; return peer;
} }
@@ -137,60 +136,24 @@ function peerFromDurableEvent(
// --- peer --- // --- peer ---
class CloudflareDurablePeer extends Peer<{ class CloudflareDurablePeer extends Peer<{
peers?: never;
cloudflare: {
ws: AugmentedWebSocket; ws: AugmentedWebSocket;
request?: Request | CF.Request; request?: Request;
env: unknown; peers?: never;
context: DurableObject["ctx"]; cfEnv: unknown;
}; cfCtx: DurableObject["ctx"];
}> { }> {
get url() {
return (
this._internal.cloudflare.request?.url ||
this._internal.cloudflare.ws.url ||
""
);
}
get headers() {
return this._internal.cloudflare.request?.headers as Headers;
}
get readyState() {
return this._internal.cloudflare.ws.readyState as -1 | 0 | 1 | 2 | 3;
}
send(message: any) {
this._internal.cloudflare.ws.send(toBufferLike(message));
return 0;
}
subscribe(topic: string): void {
super.subscribe(topic);
const state: CrosswsState = {
// Max limit: 2,048 bytes
...(this._internal.cloudflare.ws.deserializeAttachment() as CrosswsState),
topics: this._topics,
};
this._internal.cloudflare.ws._crosswsState = state;
this._internal.cloudflare.ws.serializeAttachment(state);
}
get peers() { get peers() {
const clients = const clients =
this._internal.cloudflare.context.getWebSockets() as unknown as (typeof this._internal.cloudflare.ws)[]; this._internal.cfCtx.getWebSockets() as unknown as (typeof this._internal.ws)[];
return new Set( return new Set(
clients.map((client) => { clients.map((client) => {
let peer = client._crosswsPeer; let peer = client._crosswsPeer;
if (!peer) { if (!peer) {
peer = client._crosswsPeer = new CloudflareDurablePeer({ peer = client._crosswsPeer = new CloudflareDurablePeer({
cloudflare: {
ws: client, ws: client,
request: undefined, request: undefined,
env: this._internal.cloudflare.env, cfEnv: this._internal.cfEnv,
context: this._internal.cloudflare.context, cfCtx: this._internal.cfCtx,
},
}); });
} }
return peer; return peer;
@@ -198,14 +161,29 @@ class CloudflareDurablePeer extends Peer<{
); );
} }
publish(topic: string, message: any): void { send(data: unknown) {
return this._internal.ws.send(toBufferLike(data));
}
subscribe(topic: string): void {
super.subscribe(topic);
const state: CrosswsState = {
// Max limit: 2,048 bytes
...(this._internal.ws.deserializeAttachment() as CrosswsState),
topics: this._topics,
};
this._internal.ws._crosswsState = state;
this._internal.ws.serializeAttachment(state);
}
publish(topic: string, data: unknown): void {
const clients = ( const clients = (
this._internal.cloudflare.context.getWebSockets() as unknown as (typeof this._internal.cloudflare.ws)[] this._internal.cfCtx.getWebSockets() as unknown as (typeof this._internal.ws)[]
).filter((c) => c !== this._internal.cloudflare.ws); ).filter((c) => c !== this._internal.ws);
if (clients.length === 0) { if (clients.length === 0) {
return; return;
} }
const data = toBufferLike(message); const dataBuff = toBufferLike(data);
for (const client of clients) { for (const client of clients) {
let state = client._crosswsState; let state = client._crosswsState;
if (!state) { if (!state) {
@@ -213,16 +191,12 @@ class CloudflareDurablePeer extends Peer<{
client.deserializeAttachment() as CrosswsState; client.deserializeAttachment() as CrosswsState;
} }
if (state.topics?.has(topic)) { if (state.topics?.has(topic)) {
client.send(data); client.send(dataBuff);
} }
} }
} }
close(code?: number, reason?: string) { close(code?: number, reason?: string) {
this._internal.cloudflare.ws.close(code, reason); this._internal.ws.close(code, reason);
}
terminate(): void {
this.close();
} }
} }

View File

@@ -44,8 +44,12 @@ export default defineWebSocketAdapter<CloudflareAdapter, CloudflareOptions>(
const client = pair[0]; const client = pair[0];
const server = pair[1]; const server = pair[1];
const peer = new CloudflarePeer({ const peer = new CloudflarePeer({
ws: client,
peers, peers,
cloudflare: { client, server, request, env, context }, wsServer: server,
request: request as unknown as Request,
cfEnv: env,
cfCtx: context,
}); });
peers.add(peer); peers.add(peer);
server.accept(); server.accept();
@@ -53,7 +57,11 @@ export default defineWebSocketAdapter<CloudflareAdapter, CloudflareOptions>(
hooks.callHook("open", peer); hooks.callHook("open", peer);
server.addEventListener("message", (event) => { server.addEventListener("message", (event) => {
hooks.callAdapterHook("cloudflare:message", peer, event); hooks.callAdapterHook("cloudflare:message", peer, event);
hooks.callHook("message", peer, new Message(event.data)); hooks.callHook(
"message",
peer,
new Message(event.data, peer, event as MessageEvent),
);
}); });
server.addEventListener("error", (event) => { server.addEventListener("error", (event) => {
peers.delete(peer); peers.delete(peer);
@@ -79,51 +87,23 @@ export default defineWebSocketAdapter<CloudflareAdapter, CloudflareOptions>(
// --- peer --- // --- peer ---
class CloudflarePeer extends Peer<{ class CloudflarePeer extends Peer<{
ws: _cf.WebSocket;
request: Request;
peers: Set<CloudflarePeer>; peers: Set<CloudflarePeer>;
cloudflare: { wsServer: _cf.WebSocket;
client: _cf.WebSocket; cfEnv: unknown;
server: _cf.WebSocket; cfCtx: _cf.ExecutionContext;
request: _cf.Request;
env: unknown;
context: _cf.ExecutionContext;
};
}> { }> {
get addr() { send(data: unknown) {
return undefined; this._internal.wsServer.send(toBufferLike(data));
}
get url() {
return this._internal.cloudflare.request.url;
}
get headers() {
return this._internal.cloudflare.request.headers as unknown as Headers;
}
get readyState() {
return this._internal.cloudflare.client.readyState as -1 | 0 | 1 | 2 | 3;
}
send(message: any) {
this._internal.cloudflare.server.send(toBufferLike(message));
return 0; return 0;
} }
publish(_topic: string, _message: any): void { publish(_topic: string, _message: any): void {
// Not supported // not supported
// Throws: A hanging Promise was canceled
// for (const peer of this._internal.peers) {
// if (peer !== this && peer._topics.has(_topic)) {
// peer.publish(_topic, _message);
// }
// }
} }
close(code?: number, reason?: string) { close(code?: number, reason?: string) {
this._internal.cloudflare.client.close(code, reason); this._internal.ws.close(code, reason);
}
terminate(): void {
this.close();
} }
} }

View File

@@ -19,7 +19,9 @@ declare global {
} }
type WebSocketUpgrade = import("@deno/types").Deno.WebSocketUpgrade; type WebSocketUpgrade = import("@deno/types").Deno.WebSocketUpgrade;
type ServeHandlerInfo = unknown; // TODO type ServeHandlerInfo = {
remoteAddr?: { transport: string; hostname: string; port: number };
};
// --- adapter --- // --- adapter ---
@@ -42,8 +44,10 @@ export default defineWebSocketAdapter<DenoAdapter, DenoOptions>(
headers: res?.headers, headers: res?.headers,
}); });
const peer = new DenoPeer({ const peer = new DenoPeer({
ws: upgrade.socket,
request,
peers, peers,
deno: { ws: upgrade.socket, request, info }, denoInfo: info,
}); });
peers.add(peer); peers.add(peer);
upgrade.socket.addEventListener("open", () => { upgrade.socket.addEventListener("open", () => {
@@ -52,7 +56,7 @@ export default defineWebSocketAdapter<DenoAdapter, DenoOptions>(
}); });
upgrade.socket.addEventListener("message", (event) => { upgrade.socket.addEventListener("message", (event) => {
hooks.callAdapterHook("deno:message", peer, event); hooks.callAdapterHook("deno:message", peer, event);
hooks.callHook("message", peer, new Message(event.data)); hooks.callHook("message", peer, new Message(event.data, peer, event));
}); });
upgrade.socket.addEventListener("close", () => { upgrade.socket.addEventListener("close", () => {
peers.delete(peer); peers.delete(peer);
@@ -73,50 +77,33 @@ export default defineWebSocketAdapter<DenoAdapter, DenoOptions>(
// --- peer --- // --- peer ---
class DenoPeer extends Peer<{ class DenoPeer extends Peer<{
peers: Set<DenoPeer>;
deno: {
ws: WebSocketUpgrade["socket"]; ws: WebSocketUpgrade["socket"];
request: Request; request: Request;
info: ServeHandlerInfo; peers: Set<DenoPeer>;
}; denoInfo: ServeHandlerInfo;
}> { }> {
get addr() { get remoteAddress() {
// @ts-expect-error types missing return this._internal.denoInfo.remoteAddr?.hostname;
return this._internal.deno.ws.remoteAddress;
} }
get readyState() { send(data: unknown) {
return this._internal.deno.ws.readyState as -1 | 0 | 1 | 2 | 3; return this._internal.ws.send(toBufferLike(data));
} }
get url() { publish(topic: string, data: unknown) {
return this._internal.deno.request.url; const dataBuff = toBufferLike(data);
}
get headers() {
return this._internal.deno.request.headers || new Headers();
}
send(message: any) {
this._internal.deno.ws.send(toBufferLike(message));
return 0;
}
publish(topic: string, message: any) {
const data = toBufferLike(message);
for (const peer of this._internal.peers) { for (const peer of this._internal.peers) {
if (peer !== this && peer._topics.has(topic)) { if (peer !== this && peer._topics.has(topic)) {
peer._internal.deno.ws.send(data); peer._internal.ws.send(dataBuff);
} }
} }
} }
close(code?: number, reason?: string) { close(code?: number, reason?: string) {
this._internal.deno.ws.close(code, reason); this._internal.ws.close(code, reason);
} }
terminate(): void { terminate(): void {
// @ts-ignore (terminate is Deno-only api) (this._internal.ws as any).terminate();
this._internal.deno.ws.terminate();
} }
} }

View File

@@ -1,4 +1,5 @@
import type { AdapterOptions, AdapterInstance } from "../adapter.ts"; import type { AdapterOptions, AdapterInstance } from "../adapter.ts";
import type { WebSocket } from "../../types/web.ts";
import { toBufferLike } from "../utils.ts"; import { toBufferLike } from "../utils.ts";
import { defineWebSocketAdapter, adapterUtils } from "../adapter.ts"; import { defineWebSocketAdapter, adapterUtils } from "../adapter.ts";
import { AdapterHookable } from "../hooks.ts"; import { AdapterHookable } from "../hooks.ts";
@@ -11,14 +12,16 @@ import type { Duplex } from "node:stream";
import { WebSocketServer as _WebSocketServer } from "ws"; import { WebSocketServer as _WebSocketServer } from "ws";
import type { import type {
ServerOptions, ServerOptions,
RawData,
WebSocketServer, WebSocketServer,
WebSocket as WebSocketT, WebSocket as WebSocketT,
} from "../../types/ws"; } from "../../types/ws";
// --- types --- // --- types ---
type AugmentedReq = IncomingMessage & { _upgradeHeaders?: HeadersInit }; type AugmentedReq = IncomingMessage & {
_request: NodeReqProxy;
_upgradeHeaders?: HeadersInit;
};
export interface NodeAdapter extends AdapterInstance { export interface NodeAdapter extends AdapterInstance {
handleUpgrade(req: IncomingMessage, socket: Duplex, head: Buffer): void; handleUpgrade(req: IncomingMessage, socket: Duplex, head: Buffer): void;
@@ -46,18 +49,19 @@ export default defineWebSocketAdapter<NodeAdapter, NodeOptions>(
...(options.serverOptions as any), ...(options.serverOptions as any),
}) as WebSocketServer); }) as WebSocketServer);
wss.on("connection", (ws, req) => { wss.on("connection", (ws, nodeReq) => {
const peer = new NodePeer({ peers, node: { ws, req, server: wss } }); const request = new NodeReqProxy(nodeReq);
const peer = new NodePeer({ ws, request, peers, nodeReq });
peers.add(peer); peers.add(peer);
hooks.callHook("open", peer); hooks.callHook("open", peer);
// Managed socket-level events // Managed socket-level events
ws.on("message", (data: RawData, isBinary: boolean) => { ws.on("message", (data: unknown, isBinary: boolean) => {
hooks.callAdapterHook("node:message", peer, data, isBinary); hooks.callAdapterHook("node:message", peer, data, isBinary);
if (Array.isArray(data)) { if (Array.isArray(data)) {
data = Buffer.concat(data); data = Buffer.concat(data);
} }
hooks.callHook("message", peer, new Message(data, isBinary)); hooks.callHook("message", peer, new Message(data, peer));
}); });
ws.on("error", (error: Error) => { ws.on("error", (error: Error) => {
peers.delete(peer); peers.delete(peer);
@@ -94,7 +98,7 @@ export default defineWebSocketAdapter<NodeAdapter, NodeOptions>(
}); });
}); });
wss.on("headers", function (outgoingHeaders, req) { wss.on("headers", (outgoingHeaders, req) => {
const upgradeHeaders = (req as AugmentedReq)._upgradeHeaders; const upgradeHeaders = (req as AugmentedReq)._upgradeHeaders;
if (upgradeHeaders) { if (upgradeHeaders) {
for (const [key, value] of new Headers(upgradeHeaders)) { for (const [key, value] of new Headers(upgradeHeaders)) {
@@ -105,14 +109,16 @@ export default defineWebSocketAdapter<NodeAdapter, NodeOptions>(
return { return {
...adapterUtils(peers), ...adapterUtils(peers),
handleUpgrade: async (req, socket, head) => { handleUpgrade: async (nodeReq, socket, head) => {
const res = await hooks.callHook("upgrade", new NodeReqProxy(req)); const request = new NodeReqProxy(nodeReq);
const res = await hooks.callHook("upgrade", request);
if (res instanceof Response) { if (res instanceof Response) {
return sendResponse(socket, res); return sendResponse(socket, res);
} }
(req as AugmentedReq)._upgradeHeaders = res?.headers; (nodeReq as AugmentedReq)._request = request;
wss.handleUpgrade(req, socket, head, (ws) => { (nodeReq as AugmentedReq)._upgradeHeaders = res?.headers;
wss.emit("connection", ws, req); wss.handleUpgrade(nodeReq, socket, head, (ws) => {
wss.emit("connection", ws, nodeReq);
}); });
}, },
closeAll: (code, data) => { closeAll: (code, data) => {
@@ -128,49 +134,18 @@ export default defineWebSocketAdapter<NodeAdapter, NodeOptions>(
class NodePeer extends Peer<{ class NodePeer extends Peer<{
peers: Set<NodePeer>; peers: Set<NodePeer>;
node: { request: NodeReqProxy;
server: WebSocketServer; nodeReq: IncomingMessage;
req: IncomingMessage;
ws: WebSocketT & { _peer?: NodePeer }; ws: WebSocketT & { _peer?: NodePeer };
};
}> { }> {
_req: NodeReqProxy; get remoteAddress() {
constructor(ctx: NodePeer["_internal"]) { return this._internal.nodeReq.socket?.remoteAddress;
super(ctx);
this._req = new NodeReqProxy(ctx.node.req);
ctx.node.ws._peer = this;
} }
get addr() { send(data: unknown, options?: { compress?: boolean }) {
const socket = this._internal.node.req.socket; const dataBuff = toBufferLike(data);
if (!socket) {
return undefined;
}
const headers = this._internal.node.req.headers;
let addr = headers["x-forwarded-for"] || socket.remoteAddress || "??";
if (addr.includes(":")) {
addr = `[${addr}]`;
}
const port = headers["x-forwarded-port"] || socket.remotePort || "??";
return `${addr}:${port}`;
}
get url() {
return this._req.url;
}
get headers() {
return this._req.headers;
}
get readyState() {
return this._internal.node.ws.readyState;
}
send(message: any, options?: { compress?: boolean }) {
const data = toBufferLike(message);
const isBinary = typeof data !== "string"; const isBinary = typeof data !== "string";
this._internal.node.ws.send(data, { this._internal.ws.send(dataBuff, {
compress: options?.compress, compress: options?.compress,
binary: isBinary, binary: isBinary,
...options, ...options,
@@ -178,8 +153,12 @@ class NodePeer extends Peer<{
return 0; return 0;
} }
publish(topic: string, message: any, options?: { compress?: boolean }): void { publish(
const data = toBufferLike(message); topic: string,
data: unknown,
options?: { compress?: boolean },
): void {
const dataBuff = toBufferLike(data);
const isBinary = typeof data !== "string"; const isBinary = typeof data !== "string";
const sendOptions = { const sendOptions = {
compress: options?.compress, compress: options?.compress,
@@ -188,17 +167,17 @@ class NodePeer extends Peer<{
}; };
for (const peer of this._internal.peers) { for (const peer of this._internal.peers) {
if (peer !== this && peer._topics.has(topic)) { if (peer !== this && peer._topics.has(topic)) {
peer._internal.node.ws.send(data, sendOptions); peer._internal.ws.send(dataBuff, sendOptions);
} }
} }
} }
close(code?: number, data?: string | Buffer) { close(code?: number, data?: string | Buffer) {
this._internal.node.ws.close(code, data); this._internal.ws.close(code, data);
} }
terminate() { terminate() {
this._internal.node.ws.terminate(); this._internal.ws.terminate();
} }
} }

View File

@@ -1,4 +1,5 @@
import type { AdapterOptions, AdapterInstance } from "../adapter.ts"; import type { AdapterOptions, AdapterInstance } from "../adapter.ts";
import type * as web from "../../types/web.ts";
import { toString } from "../utils.ts"; import { toString } from "../utils.ts";
import { defineWebSocketAdapter, adapterUtils } from "../adapter.ts"; import { defineWebSocketAdapter, adapterUtils } from "../adapter.ts";
import { AdapterHookable } from "../hooks.ts"; import { AdapterHookable } from "../hooks.ts";
@@ -43,7 +44,7 @@ export default defineWebSocketAdapter<SSEAdapter, SSEOptions>((opts = {}) => {
const stream = request.body.pipeThrough(new TextDecoderStream()); const stream = request.body.pipeThrough(new TextDecoderStream());
try { try {
for await (const chunk of stream) { for await (const chunk of stream) {
hooks.callHook("message", peer, new Message(chunk)); hooks.callHook("message", peer, new Message(chunk, peer));
} }
} catch { } catch {
await stream.cancel().catch(() => {}); await stream.cancel().catch(() => {});
@@ -52,18 +53,13 @@ export default defineWebSocketAdapter<SSEAdapter, SSEOptions>((opts = {}) => {
return new Response(null, {}); return new Response(null, {});
} else { } else {
// Add a new peer // Add a new peer
const ws = new SSEWebSocketStub();
peer = new SSEPeer({ peer = new SSEPeer({
peers, peers,
sse: { peersMap,
request, request,
hooks, hooks,
onClose: () => { ws,
peers.delete(peer);
if (opts.bidir) {
peersMap!.delete(peer.id);
}
},
},
}); });
peers.add(peer); peers.add(peer);
if (opts.bidir) { if (opts.bidir) {
@@ -96,37 +92,36 @@ export default defineWebSocketAdapter<SSEAdapter, SSEOptions>((opts = {}) => {
class SSEPeer extends Peer<{ class SSEPeer extends Peer<{
peers: Set<SSEPeer>; peers: Set<SSEPeer>;
sse: { peersMap?: Map<string, SSEPeer>;
request: Request; request: Request;
ws: SSEWebSocketStub;
hooks: AdapterHookable; hooks: AdapterHookable;
onClose: (peer: SSEPeer) => void;
};
}> { }> {
_sseStream: ReadableStream; _sseStream: ReadableStream; // server -> client
_sseStreamController?: ReadableStreamDefaultController; _sseStreamController?: ReadableStreamDefaultController;
constructor(internal: SSEPeer["_internal"]) { constructor(_internal: SSEPeer["_internal"]) {
super(internal); super(_internal);
_internal.ws.readyState = 0 /* CONNECTING */;
this._sseStream = new ReadableStream({ this._sseStream = new ReadableStream({
start: (controller) => { start: (controller) => {
_internal.ws.readyState = 1 /* OPEN */;
this._sseStreamController = controller; this._sseStreamController = controller;
this._internal.sse.hooks.callHook("open", this); _internal.hooks.callHook("open", this);
}, },
cancel: () => { cancel: () => {
this._internal.sse.onClose(this); _internal.ws.readyState = 2 /* CLOSING */;
this._internal.sse.hooks.callHook("close", this); _internal.peers.delete(this);
_internal.peersMap?.delete(this.id);
Promise.resolve(this._internal.hooks.callHook("close", this)).finally(
() => {
_internal.ws.readyState = 3 /* CLOSED */;
},
);
}, },
}).pipeThrough(new TextEncoderStream()); }).pipeThrough(new TextEncoderStream());
} }
get url() {
return this._internal.sse.request.url;
}
get headers() {
return this._internal.sse.request.headers;
}
_sendEvent(event: string, data: string) { _sendEvent(event: string, data: string) {
const lines = data.split("\n"); const lines = data.split("\n");
this._sseStreamController?.enqueue( this._sseStreamController?.enqueue(
@@ -134,16 +129,16 @@ class SSEPeer extends Peer<{
); );
} }
send(message: any) { send(data: unknown) {
this._sendEvent("message", toString(message)); this._sendEvent("message", toString(data));
return 0; return 0;
} }
publish(topic: string, message: any) { publish(topic: string, data: unknown) {
const data = toString(message); const dataBuff = toString(data);
for (const peer of this._internal.peers) { for (const peer of this._internal.peers) {
if (peer !== this && peer._topics.has(topic)) { if (peer !== this && peer._topics.has(topic)) {
peer._sendEvent("message", data); peer._sendEvent("message", dataBuff);
} }
} }
} }
@@ -151,8 +146,10 @@ class SSEPeer extends Peer<{
close() { close() {
this._sseStreamController?.close(); this._sseStreamController?.close();
} }
}
terminate() {
this.close(); // --- web compat ---
}
class SSEWebSocketStub implements Partial<web.WebSocket> {
readyState?: number | undefined;
} }

View File

@@ -1,28 +1,23 @@
import type { AdapterOptions, AdapterInstance } from "../adapter.ts"; import type { AdapterOptions, AdapterInstance } from "../adapter.ts";
import type { WebSocket } from "../../types/web.ts";
import type uws from "uWebSockets.js";
import { toBufferLike } from "../utils.ts"; import { toBufferLike } from "../utils.ts";
import { defineWebSocketAdapter, adapterUtils } from "../adapter.ts"; import { defineWebSocketAdapter, adapterUtils } from "../adapter.ts";
import { AdapterHookable } from "../hooks.ts"; import { AdapterHookable } from "../hooks.ts";
import { Message } from "../message.ts"; import { Message } from "../message.ts";
import { Peer } from "../peer.ts"; import { Peer } from "../peer.ts";
import type {
WebSocketBehavior,
WebSocket,
HttpRequest,
HttpResponse,
RecognizedString,
} from "uWebSockets.js";
// --- types --- // --- types ---
type UserData = { type UserData = {
_peer?: any; peer?: UWSPeer;
req: HttpRequest; req: uws.HttpRequest;
res: HttpResponse; res: uws.HttpResponse;
context: any; protocol: string;
extensions: string;
}; };
type WebSocketHandler = WebSocketBehavior<UserData>; type WebSocketHandler = uws.WebSocketBehavior<UserData>;
export interface UWSAdapter extends AdapterInstance { export interface UWSAdapter extends AdapterInstance {
websocket: WebSocketHandler; websocket: WebSocketHandler;
@@ -30,7 +25,7 @@ export interface UWSAdapter extends AdapterInstance {
export interface UWSOptions extends AdapterOptions { export interface UWSOptions extends AdapterOptions {
uws?: Exclude< uws?: Exclude<
WebSocketBehavior<any>, uws.WebSocketBehavior<any>,
| "close" | "close"
| "drain" | "drain"
| "message" | "message"
@@ -56,12 +51,16 @@ export default defineWebSocketAdapter<UWSAdapter, UWSOptions>(
...options.uws, ...options.uws,
close(ws, code, message) { close(ws, code, message) {
const peer = getPeer(ws, peers); const peer = getPeer(ws, peers);
((peer as any)._internal.ws as UwsWebSocketProxy).readyState =
2 /* CLOSING */;
peers.delete(peer); peers.delete(peer);
hooks.callAdapterHook("uws:close", peer, ws, code, message); hooks.callAdapterHook("uws:close", peer, ws, code, message);
hooks.callHook("close", peer, { hooks.callHook("close", peer, {
code, code,
reason: message?.toString(), reason: message?.toString(),
}); });
((peer as any)._internal.ws as UwsWebSocketProxy).readyState =
3 /* CLOSED */;
}, },
drain(ws) { drain(ws) {
const peer = getPeer(ws, peers); const peer = getPeer(ws, peers);
@@ -70,8 +69,7 @@ export default defineWebSocketAdapter<UWSAdapter, UWSOptions>(
message(ws, message, isBinary) { message(ws, message, isBinary) {
const peer = getPeer(ws, peers); const peer = getPeer(ws, peers);
hooks.callAdapterHook("uws:message", peer, ws, message, isBinary); hooks.callAdapterHook("uws:message", peer, ws, message, isBinary);
const msg = new Message(message, isBinary); hooks.callHook("message", peer, new Message(message, peer));
hooks.callHook("message", peer, msg);
}, },
open(ws) { open(ws) {
const peer = getPeer(ws, peers); const peer = getPeer(ws, peers);
@@ -131,16 +129,21 @@ export default defineWebSocketAdapter<UWSAdapter, UWSOptions>(
res.writeHeader(key, value); res.writeHeader(key, value);
} }
} }
res.cork(() => { res.cork(() => {
const key = req.getHeader("sec-websocket-key");
const protocol = req.getHeader("sec-websocket-protocol");
const extensions = req.getHeader("sec-websocket-extensions");
res.upgrade( res.upgrade(
{ {
req, req,
res, res,
context, protocol,
extensions,
}, },
req.getHeader("sec-websocket-key"), key,
req.getHeader("sec-websocket-protocol"), protocol,
req.getHeader("sec-websocket-extensions"), extensions,
context, context,
); );
}); });
@@ -152,73 +155,63 @@ export default defineWebSocketAdapter<UWSAdapter, UWSOptions>(
// --- peer --- // --- peer ---
function getPeer(ws: WebSocket<UserData>, peers: Set<UWSPeer>): UWSPeer { function getPeer(uws: uws.WebSocket<UserData>, peers: Set<UWSPeer>): UWSPeer {
const userData = ws.getUserData(); const uwsData = uws.getUserData();
if (userData._peer) { if (uwsData.peer) {
return userData._peer as UWSPeer; return uwsData.peer;
} }
const peer = new UWSPeer({ peers, uws: { ws, userData } }); const peer = new UWSPeer({
userData._peer = peer; peers,
uws,
ws: new UwsWebSocketProxy(uws),
request: new UWSReqProxy(uwsData.req),
uwsData,
});
uwsData.peer = peer;
return peer; return peer;
} }
class UWSPeer extends Peer<{ class UWSPeer extends Peer<{
peers: Set<UWSPeer>; peers: Set<UWSPeer>;
uws: { request: UWSReqProxy;
ws: WebSocket<UserData>; uws: uws.WebSocket<UserData>;
userData: UserData; ws: UwsWebSocketProxy;
}; uwsData: UserData;
}> { }> {
_decoder = new TextDecoder(); get remoteAddress() {
_req: UWSReqProxy;
constructor(ctx: UWSPeer["_internal"]) {
super(ctx);
this._req = new UWSReqProxy(ctx.uws.userData.req);
}
get addr() {
try { try {
const addr = this._decoder.decode( const addr = new TextDecoder().decode(
this._internal.uws.ws?.getRemoteAddressAsText(), this._internal.uws.getRemoteAddressAsText(),
); );
return addr.replace(/(0000:)+/, ""); return addr;
} catch { } catch {
// Error: Invalid access of closed uWS.WebSocket/SSLWebSocket. // Error: Invalid access of closed uWS.WebSocket/SSLWebSocket.
} }
} }
get url() { send(data: unknown, options?: { compress?: boolean }) {
return this._req.url; const dataBuff = toBufferLike(data);
}
get headers() {
return this._req.headers;
}
send(message: any, options?: { compress?: boolean }) {
const data = toBufferLike(message);
const isBinary = typeof data !== "string"; const isBinary = typeof data !== "string";
return this._internal.uws.ws.send(data, isBinary, options?.compress); return this._internal.uws.send(dataBuff, isBinary, options?.compress);
} }
subscribe(topic: string): void { subscribe(topic: string): void {
this._internal.uws.ws.subscribe(topic); this._internal.uws.subscribe(topic);
} }
publish(topic: string, message: string, options?: { compress?: boolean }) { publish(topic: string, message: string, options?: { compress?: boolean }) {
const data = toBufferLike(message); const data = toBufferLike(message);
const isBinary = typeof data !== "string"; const isBinary = typeof data !== "string";
this._internal.uws.ws.publish(topic, data, isBinary, options?.compress); this._internal.uws.publish(topic, data, isBinary, options?.compress);
return 0; return 0;
} }
close(code?: number, reason?: RecognizedString) { close(code?: number, reason?: uws.RecognizedString) {
this._internal.uws.ws.end(code, reason); this._internal.uws.end(code, reason);
} }
terminate(): void { terminate(): void {
this._internal.uws.ws.close(); this._internal.uws.close();
} }
} }
@@ -227,11 +220,10 @@ class UWSPeer extends Peer<{
class UWSReqProxy { class UWSReqProxy {
private _headers?: Headers; private _headers?: Headers;
private _rawHeaders: [string, string][] = []; private _rawHeaders: [string, string][] = [];
url: string; url: string;
constructor(_req: HttpRequest) { constructor(_req: uws.HttpRequest) {
// We need to precompute values since uws doesn't provide them after handler.
// Headers // Headers
let host = "localhost"; let host = "localhost";
let proto = "http"; let proto = "http";
@@ -244,7 +236,6 @@ class UWSReqProxy {
} }
this._rawHeaders.push([key, value]); this._rawHeaders.push([key, value]);
}); });
// URL // URL
const query = _req.getQuery(); const query = _req.getQuery();
const pathname = _req.getUrl(); const pathname = _req.getUrl();
@@ -258,3 +249,21 @@ class UWSReqProxy {
return this._headers; return this._headers;
} }
} }
class UwsWebSocketProxy implements Partial<WebSocket> {
readyState?: number = 1 /* OPEN */;
constructor(private _uws: uws.WebSocket<UserData>) {}
get bufferedAmount() {
return this._uws?.getBufferedAmount();
}
get protocol() {
return this._uws?.getUserData().protocol;
}
get extensions() {
return this._uws?.getUserData().extensions;
}
}

View File

@@ -1,27 +1,202 @@
import { toBufferLike } from "./utils.ts"; import type { Peer } from "./peer.ts";
import { randomUUID } from "uncrypto";
export class Message { export class Message implements Partial<MessageEvent> {
constructor( /** Access to the original [message event](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/message_event) if available. */
public readonly rawData: any, readonly event?: MessageEvent;
public readonly isBinary?: boolean,
) {}
/** Access to the Peer that emitted the message. */
readonly peer?: Peer;
/** Raw message data (can be of any type). */
readonly rawData: unknown;
#id?: string;
#uint8Array?: Uint8Array;
#arrayBuffer?: ArrayBuffer | SharedArrayBuffer;
#blob?: Blob;
#text?: string;
#json?: unknown;
constructor(rawData: unknown, peer: Peer, event?: MessageEvent) {
this.rawData = rawData || "";
this.peer = peer;
this.event = event;
}
/**
* Unique random [uuid v4](https://developer.mozilla.org/en-US/docs/Glossary/UUID) identifier for the message.
*/
get id(): string {
if (!this.#id) {
this.#id = randomUUID();
}
return this.#id;
}
// --- data views ---
/**
* Get data as [Uint8Array](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Uint8Array) value.
*
* If raw data is in any other format or string, it will be automatically converted and encoded.
*/
uint8Array() {
// Cached
const _uint8Array = this.#uint8Array;
if (_uint8Array) {
return _uint8Array;
}
const rawData = this.rawData;
// Uint8Array
if (rawData instanceof Uint8Array) {
return (this.#uint8Array = rawData);
}
// ArrayBuffer
if (
rawData instanceof ArrayBuffer ||
rawData instanceof SharedArrayBuffer
) {
this.#arrayBuffer = rawData;
return (this.#uint8Array = new Uint8Array(rawData));
}
// String
if (typeof rawData === "string") {
this.#text = rawData;
return (this.#uint8Array = new TextEncoder().encode(this.#text));
}
// Iterable and ArrayLike
if (Symbol.iterator in (rawData as Iterable<number>)) {
return (this.#uint8Array = new Uint8Array(rawData as Iterable<number>));
}
if (typeof (rawData as ArrayLike<number>)?.length === "number") {
return (this.#uint8Array = new Uint8Array(rawData as ArrayLike<number>));
}
// DataView
if (rawData instanceof DataView) {
return (this.#uint8Array = new Uint8Array(
rawData.buffer,
rawData.byteOffset,
rawData.byteLength,
));
}
throw new TypeError(
`Unsupported message type: ${Object.prototype.toString.call(rawData)}`,
);
}
/**
* Get data as [ArrayBuffer](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/ArrayBuffer) or [SharedArrayBuffer](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/SharedArrayBuffer) value.
*
* If raw data is in any other format or string, it will be automatically converted and encoded.
*/
arrayBuffer(): ArrayBuffer | SharedArrayBuffer {
// Cached
const _arrayBuffer = this.#arrayBuffer;
if (_arrayBuffer) {
return _arrayBuffer;
}
const rawData = this.rawData;
// Use as-is
if (
rawData instanceof ArrayBuffer ||
rawData instanceof SharedArrayBuffer
) {
return (this.#arrayBuffer = rawData);
}
// Fallback to UInt8Array
return (this.#arrayBuffer = this.uint8Array().buffer);
}
/**
* Get data as [Blob](https://developer.mozilla.org/en-US/docs/Web/API/Blob) value.
*
* If raw data is in any other format or string, it will be automatically converted and encoded. */
blob(): Blob {
// Cached
const _blob = this.#blob;
if (_blob) {
return _blob;
}
const rawData = this.rawData;
// Use as-is
if (rawData instanceof Blob) {
return (this.#blob = rawData);
}
// Fallback to UInt8Array
return (this.#blob = new Blob([this.uint8Array()]));
}
/**
* Get stringified text version of the message.
*
* If raw data is in any other format, it will be automatically converted and decoded.
*/
text(): string { text(): string {
if (typeof this.rawData === "string") { // Cached
const _text = this.#text;
if (_text) {
return _text;
}
const rawData = this.rawData;
// Use as-is
if (typeof rawData === "string") {
return (this.#text = rawData);
}
// Fallback to UInt8Array
return (this.#text = new TextDecoder().decode(this.uint8Array()));
}
/**
* Get parsed version of the message text with [`JSON.parse()`](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/JSON/parse).
*/
json<T = unknown>(): T {
const _json = this.#json;
if (_json) {
return _json as T;
}
return (this.#json = JSON.parse(this.text()));
}
/**
* Message data (value varies based on `peer.websocket.binaryType`).
*/
get data() {
switch (this.peer?.websocket?.binaryType as string) {
case "arraybuffer": {
return this.arrayBuffer();
}
case "blob": {
return this.blob();
}
case "nodebuffer": {
return globalThis.Buffer
? Buffer.from(this.uint8Array())
: this.uint8Array();
}
case "uint8array": {
return this.uint8Array();
}
case "text": {
return this.text();
}
default: {
return this.rawData; return this.rawData;
} }
const buff = toBufferLike(this.rawData);
if (typeof buff === "string") {
return buff;
} }
return new TextDecoder().decode(buff);
} }
// --- inspect ---
toString() { toString() {
return this.text(); return this.text();
} }
[Symbol.for("nodejs.util.inspect.custom")]() { [Symbol.toPrimitive]() {
return this.text(); return this.text();
} }
[Symbol.for("nodejs.util.inspect.custom")]() {
return { data: this.rawData };
}
} }

View File

@@ -1,111 +1,142 @@
import type * as web from "../types/web.ts";
import { randomUUID } from "uncrypto"; import { randomUUID } from "uncrypto";
// https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/readyState
type ReadyState = 0 | 1 | 2 | 3;
const ReadyStateMap = {
"-1": "unknown",
0: "connecting",
1: "open",
2: "closing",
3: "closed",
} as const;
export interface AdapterInternal { export interface AdapterInternal {
ws: unknown;
request?: Request | Partial<Request>;
peers?: Set<Peer>; peers?: Set<Peer>;
} }
export abstract class Peer<Internal extends AdapterInternal = AdapterInternal> { export abstract class Peer<Internal extends AdapterInternal = AdapterInternal> {
protected _internal: Internal; protected _internal: Internal;
protected _topics: Set<string>; protected _topics: Set<string>;
#id?: string;
private _id?: string; #ws?: Partial<web.WebSocket>;
constructor(internal: Internal) { constructor(internal: Internal) {
this._topics = new Set(); this._topics = new Set();
this._internal = internal; this._internal = internal;
} }
/**
* Unique random [uuid v4](https://developer.mozilla.org/en-US/docs/Glossary/UUID) identifier for the peer.
*/
get id(): string { get id(): string {
if (!this._id) { if (!this.#id) {
this._id = randomUUID(); this.#id = randomUUID();
} }
return this._id; return this.#id;
} }
get addr(): string | undefined { /** IP address of the peer */
get remoteAddress(): string | undefined {
return undefined; return undefined;
} }
get url(): string { /** upgrade request */
return ""; get request(): Request | Partial<Request> | undefined {
return this._internal.request;
} }
get headers(): Headers | undefined { /**
return undefined; * Get the [WebSocket](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket) instance.
} *
* **Note:** crossws adds polyfill for the following properties if native values are not available:
get readyState(): ReadyState | -1 { * - `protocol`: Extracted from the `sec-websocket-protocol` header.
return -1; * - `extensions`: Extracted from the `sec-websocket-extensions` header.
* - `url`: Extracted from the request URL (http -> ws).
* */
get websocket(): Partial<web.WebSocket> {
if (!this.#ws) {
const _ws = this._internal.ws as Partial<web.WebSocket>;
const _request = this._internal.request;
this.#ws = _request ? createWsProxy(_ws, _request) : _ws;
}
return this.#ws;
} }
/** All connected peers to the server */
get peers(): Set<Peer> { get peers(): Set<Peer> {
return this._internal.peers || new Set(); return this._internal.peers || new Set();
} }
abstract send(message: any, options?: { compress?: boolean }): number; abstract close(code?: number, reason?: string): void;
abstract publish( /** Abruptly close the connection */
topic: string, terminate() {
message: any, this.close();
options?: { compress?: boolean }, }
): void;
/** Subscribe to a topic */
subscribe(topic: string) { subscribe(topic: string) {
this._topics.add(topic); this._topics.add(topic);
} }
/** Unsubscribe from a topic */
unsubscribe(topic: string) { unsubscribe(topic: string) {
this._topics.delete(topic); this._topics.delete(topic);
} }
/** Send a message to the peer. */
abstract send(
data: unknown,
options?: { compress?: boolean },
): number | void | undefined;
/** Send message to subscribes of topic */
abstract publish(
topic: string,
data: unknown,
options?: { compress?: boolean },
): void;
// --- inspect ---
toString() { toString() {
return this.id; return this.id;
} }
[Symbol.for("nodejs.util.inspect.custom")]() { [Symbol.toPrimitive]() {
const _id = this.toString(); return this.id;
const _addr = this.addr ? ` (${this.addr})` : "";
const _state =
this.readyState === 1 || this.readyState === -1
? ""
: ` [${ReadyStateMap[this.readyState]}]`;
return `${_id}${_addr}${_state}`;
} }
/** [Symbol.toStringTag]() {
* Closes the connection. return "WebSocket";
* }
* Here is a list of close codes:
*
* - `1000` means "normal closure" (default)
* - `1009` means a message was too big and was rejected
* - `1011` means the server encountered an error
* - `1012` means the server is restarting
* - `1013` means the server is too busy or the client is rate-limited
* - `4000` through `4999` are reserved for applications (you can use it!)
*
* To close the connection abruptly, use `terminate()`.
*
* @param code The close code to send
* @param reason The close reason to send
*/
abstract close(code?: number, reason?: string): void;
/** [Symbol.for("nodejs.util.inspect.custom")]() {
* Abruptly close the connection. return Object.fromEntries(
* [
* To gracefully close the connection, use `close()`. ["id", this.id],
*/ ["remoteAddress", this.remoteAddress],
abstract terminate(): void; ["peers", this.peers],
["webSocket", this.websocket],
].filter((p) => p[1]),
);
}
}
function createWsProxy(
ws: Partial<web.WebSocket>,
request: Partial<Request>,
): Partial<web.WebSocket> {
return new Proxy(ws, {
get: (target, prop) => {
const value = Reflect.get(target, prop);
if (!value) {
switch (prop) {
case "protocol": {
return request?.headers?.get("sec-websocket-protocol") || "";
}
case "extensions": {
return request?.headers?.get("sec-websocket-extensions") || "";
}
case "url": {
return request?.url?.replace(/^http/, "ws") || undefined;
}
}
}
return value;
},
});
} }

View File

@@ -26,6 +26,8 @@ export function wsConnect(
headers: opts?.headers, headers: opts?.headers,
dispatcher: inspector, dispatcher: inspector,
}); });
ws.binaryType = "arraybuffer";
websockets.add(ws); websockets.add(ws);
const send = async (data: any): Promise<any> => { const send = async (data: any): Promise<any> => {
@@ -51,13 +53,22 @@ export function wsConnect(
nextIndex += count; nextIndex += count;
}; };
ws.addEventListener("message", (event) => { ws.addEventListener("message", async (event) => {
const str = let text: string;
typeof event.data === "string" if (typeof event.data === "string") {
? event.data text = event.data;
: new TextDecoder().decode(event.data); } else {
const payload = str[0] === "{" ? JSON.parse(str) : str; let rawData = event.data;
if (rawData instanceof Blob) {
rawData = await event.data.arrayBuffer();
} else if (rawData instanceof Uint8Array) {
rawData = rawData.buffer;
}
text = new TextDecoder().decode(rawData);
}
const payload = text[0] === "{" ? JSON.parse(text) : text;
messages.push(payload); messages.push(payload);
const index = messages.length - 1; const index = messages.length - 1;
if (waitCallbacks[index]) { if (waitCallbacks[index]) {
waitCallbacks[index](payload); waitCallbacks[index](payload);

View File

@@ -27,9 +27,19 @@ export function createDemo<T extends Adapter<any, any>>(
case "debug": { case "debug": {
peer.send({ peer.send({
id: peer.id, id: peer.id,
ip: peer.addr, remoteAddress: peer.remoteAddress,
url: peer.url, request: {
headers: Object.fromEntries(peer.headers || []), url: peer.request?.url,
headers: Object.fromEntries(peer.request?.headers || []),
},
websocket: {
readyState: peer.websocket.readyState,
protocol: peer.websocket.protocol,
extensions: peer.websocket.extensions,
url: peer.websocket.url,
binaryType: peer.websocket.binaryType,
bufferedAmount: peer.websocket.bufferedAmount,
},
}); });
break; break;
} }

View File

@@ -1,4 +1,4 @@
// You can run this demo using `deno run -A ./deno.ts` or `npm run play:deno` in repo // You can run this demo using `npm run play:deno` in repo
import denoAdapter from "../../src/adapters/deno.ts"; import denoAdapter from "../../src/adapters/deno.ts";

View File

@@ -1,10 +1,13 @@
import { expect, test } from "vitest"; import { expect, test } from "vitest";
import { wsConnect } from "./_utils"; import { wsConnect } from "./_utils";
export function wsTests( export interface WSTestOpts {
getURL: () => string, adapter: string;
opts: { adapter: string; pubsub?: boolean; resHeaders?: boolean }, pubsub?: boolean;
) { resHeaders?: boolean;
}
export function wsTests(getURL: () => string, opts: WSTestOpts) {
test("http works", async () => { test("http works", async () => {
const response = await fetch(getURL().replace("ws", "http")); const response = await fetch(getURL().replace("ws", "http"));
expect(response.status).toBe(200); expect(response.status).toBe(200);
@@ -60,28 +63,50 @@ export function wsTests(
}, },
); );
test("upgrade request headers", async () => { test("peer.request (headers, url, remoteAddress)", async () => {
const ws = await wsConnect(getURL(), { const ws = await wsConnect(getURL() + "?foo=bar", {
skip: 1, skip: 1,
headers: { "x-test": "1" }, headers: { "x-test": "1" },
}); });
await ws.send("debug"); await ws.send("debug");
const { headers } = await ws.next(); const { request, remoteAddress } = await ws.next();
// Headers
if (opts.adapter === "sse") { if (opts.adapter === "sse") {
expect(headers["connection"]).toBe("keep-alive"); expect(request.headers["connection"]).toBe("keep-alive");
} else { } else {
expect(headers["connection"]).toMatch(/^upgrade$/i); expect(request.headers["connection"]).toMatch(/^upgrade$/i);
expect(headers["x-test"]).toBe("1"); expect(request.headers["x-test"]).toBe("1");
}
// URL
expect(request.url).toMatch(/^http:\/\/localhost:\d+\/\?foo=bar$/);
const url = new URL(request.url);
expect(url.search).toBe("?foo=bar");
// Remote address
if (!/sse|cloudflare/.test(opts.adapter)) {
expect(remoteAddress).toMatch(/:{2}1|(?:0{4}:){7}0{3}1|127\.0\.\0\.1/);
} }
}); });
test("upgrade request url", async () => { test("peer.websocket", async () => {
const ws = await wsConnect(getURL() + "?foo=bar", { skip: 1 }); const ws = await wsConnect(getURL() + "?foo=bar", {
skip: 1,
headers: {
"Sec-WebSocket-Protocol": "crossws",
},
});
await ws.send("debug"); await ws.send("debug");
const info = await ws.next(); const { websocket } = await ws.next();
expect(info.url).toMatch(/^http:\/\/localhost:\d+\/\?foo=bar$/); expect(websocket).toMatchObject({
const url = new URL(info.url); readyState: 1,
expect(url.search).toBe("?foo=bar"); protocol: /ss/.test(opts.adapter) ? "" : "crossws",
extensions: /sse|cloudflare/.test(opts.adapter)
? ""
: "permessage-deflate; client_max_window_bits",
url: getURL() + "?foo=bar",
});
}); });
test.skipIf(opts.adapter === "sse")("upgrade fail response", async () => { test.skipIf(opts.adapter === "sse")("upgrade fail response", async () => {

View File

@@ -255,7 +255,7 @@ export interface WebSocket extends EventTarget {
* *
* [MDN Reference](https://developer.mozilla.org/docs/Web/API/WebSocket/binaryType) * [MDN Reference](https://developer.mozilla.org/docs/Web/API/WebSocket/binaryType)
*/ */
binaryType: BinaryType; binaryType: BinaryType | (string & {});
/** /**
* Returns the number of bytes of application data (UTF-8 text and binary data) that have been queued using send() but not yet been transmitted to the network. * Returns the number of bytes of application data (UTF-8 text and binary data) that have been queued using send() but not yet been transmitted to the network.
* *