diff --git a/docs/1.guide/3.peer.md b/docs/1.guide/3.peer.md index e594c4c..f9917ce 100644 --- a/docs/1.guide/3.peer.md +++ b/docs/1.guide/3.peer.md @@ -88,7 +88,7 @@ To gracefully close the connection, use `peer.close()`. | `publish()` / `subscribe()` | ✓ | ⨉ | ✓ [^1] | ✓ [^1] | ✓ [^1] | ✓ | ✓ [^1] | | `close()` | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | | `terminate()` | ✓ | ✓ [^2] | ✓ | ✓ | ✓ | ✓ | ✓ [^2] | -| `request` | ✓ | ✓ | ✓ [^3] | ✓ | ✓ [^3] | ✓ [^3] | ✓ | +| `request` | ✓ | ✓ | ✓ [^30] | ✓ | ✓ [^31] | ✓ [^31] | ✓ | | `remoteAddress` | ✓ | ⨉ | ⨉ | ✓ | ✓ | ✓ | ⨉ | | `websocket.url` | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | | `websocket.extensions` | ✓ [^4] | ⨉ | ⨉ | ✓ [^4] | ✓ [^4] | ✓ [^4] | ⨉ | @@ -109,9 +109,9 @@ To gracefully close the connection, use `peer.close()`. [^2]: `close()` will be used for compatibility. -[^1]: using a proxy for [Request](https://developer.mozilla.org/en-US/docs/Web/API/Request) compatible interface (`url`, `headers` only) wrapping Node.js requests. +[^30]: After durable object's hibernation, only `request.url` (and `peer.id`) remain available due to 2048 byte in-memory state limit. -[^3]: `request` is not always available (only in `open` hook). +[^31]: using a proxy for [Request](https://developer.mozilla.org/en-US/docs/Web/API/Request) compatible interface (`url`, `headers` only) wrapping Node.js requests. [^4]: [`websocket.extensions`](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/extensions) is polyfilled using [`sec-websocket-extensions`](https://developer.mozilla.org/en-US/docs/Web/HTTP/Protocol_upgrade_mechanism#websocket-specific_headers) request header. diff --git a/docs/2.adapters/cloudflare.md b/docs/2.adapters/cloudflare.md index d1dd5b5..fceab91 100644 --- a/docs/2.adapters/cloudflare.md +++ b/docs/2.adapters/cloudflare.md @@ -77,6 +77,11 @@ export default { }; export class $DurableObject extends DurableObject { + constructor(state, env) { + super(state, env); + ws.handleDurableInit(this, state, env); + } + fetch(request) { return ws.handleDurableUpgrade(this, request); } diff --git a/src/adapters/cloudflare-durable.ts b/src/adapters/cloudflare-durable.ts index 8c1e471..6b3f58f 100644 --- a/src/adapters/cloudflare-durable.ts +++ b/src/adapters/cloudflare-durable.ts @@ -9,7 +9,166 @@ import { Peer } from "../peer.ts"; import type * as CF from "@cloudflare/workers-types"; import type { DurableObject } from "cloudflare:workers"; -// --- types +// https://developers.cloudflare.com/durable-objects/examples/websocket-hibernation-server/ + +export default defineWebSocketAdapter< + CloudflareDurableAdapter, + CloudflareOptions +>((opts) => { + const hooks = new AdapterHookable(opts); + const peers = new Set(); + return { + ...adapterUtils(peers), + handleUpgrade: async (req, env, _context) => { + const bindingName = opts?.bindingName ?? "$DurableObject"; + const instanceName = opts?.instanceName ?? "crossws"; + const binding = (env as any)[bindingName] as CF.DurableObjectNamespace; + const id = binding.idFromName(instanceName); + const stub = binding.get(id); + return stub.fetch(req as CF.Request) as unknown as Response; + }, + handleDurableInit: async (obj, state, env) => { + // placeholder + }, + handleDurableUpgrade: async (obj, request) => { + const res = await hooks.callHook("upgrade", request as Request); + if (res instanceof Response) { + return res; + } + const pair = new WebSocketPair(); + const client = pair[0]; + const server = pair[1]; + const peer = CloudflareDurablePeer._restore( + obj, + server as unknown as CF.WebSocket, + request, + ); + peers.add(peer); + (obj as DurableObjectPub).ctx.acceptWebSocket(server); + await hooks.callAdapterHook("cloudflare:accept", peer); + await hooks.callHook("open", peer); + // eslint-disable-next-line unicorn/no-null + return new Response(null, { + status: 101, + webSocket: client, + headers: res?.headers, + }); + }, + handleDurableMessage: async (obj, ws, message) => { + const peer = CloudflareDurablePeer._restore(obj, ws as CF.WebSocket); + await hooks.callAdapterHook("cloudflare:message", peer, message); + await hooks.callHook("message", peer, new Message(message, peer)); + }, + handleDurableClose: async (obj, ws, code, reason, wasClean) => { + const peer = CloudflareDurablePeer._restore(obj, ws as CF.WebSocket); + peers.delete(peer); + const details = { code, reason, wasClean }; + await hooks.callAdapterHook("cloudflare:close", peer, details); + await hooks.callHook("close", peer, details); + }, + }; +}); + +// --- peer --- + +class CloudflareDurablePeer extends Peer<{ + ws: AugmentedWebSocket; + request?: Partial; + peers?: never; + durable: DurableObjectPub; +}> { + get peers() { + return new Set( + this.#getwebsockets().map((ws) => + CloudflareDurablePeer._restore(this._internal.durable, ws), + ), + ); + } + + #getwebsockets() { + return this._internal.durable.ctx.getWebSockets() as unknown as (typeof this._internal.ws)[]; + } + + send(data: unknown) { + return this._internal.ws.send(toBufferLike(data)); + } + + subscribe(topic: string): void { + super.subscribe(topic); + const state = getAttachedState(this._internal.ws); + if (!state.t) { + state.t = new Set(); + } + state.t.add(topic); + setAttachedState(this._internal.ws, state); + } + + publish(topic: string, data: unknown): void { + const websockets = this.#getwebsockets(); + if (websockets.length < 2 /* 1 is self! */) { + return; + } + const dataBuff = toBufferLike(data); + for (const ws of websockets) { + if (ws === this._internal.ws) { + continue; + } + const state = getAttachedState(ws); + if (state.t?.has(topic)) { + ws.send(dataBuff); + } + } + } + + close(code?: number, reason?: string) { + this._internal.ws.close(code, reason); + } + + static _restore( + durable: DurableObject, + ws: AugmentedWebSocket, + request?: Request | CF.Request, + ): CloudflareDurablePeer { + let peer = ws._crosswsPeer; + if (peer) { + return peer; + } + const state = (ws.deserializeAttachment() || {}) as AttachedState; + peer = ws._crosswsPeer = new CloudflareDurablePeer({ + ws: ws as CF.WebSocket, + request: (request as Request) || { url: state.u }, + durable: durable as DurableObjectPub, + }); + if (state.i) { + peer._id = state.i; + } + if (request?.url) { + state.u = request.url; + } + state.i = peer.id; + setAttachedState(ws, state); + return peer; + } +} + +// -- attached state utils --- + +function getAttachedState(ws: AugmentedWebSocket): AttachedState { + let state = ws._crosswsState; + if (state) { + return state; + } + state = (ws.deserializeAttachment() as AttachedState) || {}; + ws._crosswsState = state; + return state; +} + +function setAttachedState(ws: AugmentedWebSocket, state: AttachedState) { + ws._crosswsState = state; + ws.serializeAttachment(state); +} + +// --- types --- declare class DurableObjectPub extends DurableObject { public ctx: DurableObject["ctx"]; @@ -17,12 +176,18 @@ declare class DurableObjectPub extends DurableObject { } type AugmentedWebSocket = CF.WebSocket & { - _crosswsState?: CrosswsState; _crosswsPeer?: CloudflareDurablePeer; + _crosswsState?: AttachedState; }; -type CrosswsState = { - topics?: Set; +/** Max serialized limit: 2048 bytes (512..2048 characters) */ +type AttachedState = { + /** Subscribed topics */ + t?: Set; + /** Peer id */ + i?: string; + /** Request url */ + u?: string; }; export interface CloudflareDurableAdapter extends AdapterInstance { @@ -32,6 +197,12 @@ export interface CloudflareDurableAdapter extends AdapterInstance { context: CF.ExecutionContext, ): Promise; + handleDurableInit( + obj: DurableObject, + state: DurableObjectState, + env: unknown, + ): void; + handleDurableUpgrade( obj: DurableObject, req: Request | CF.Request, @@ -56,147 +227,3 @@ export interface CloudflareOptions extends AdapterOptions { bindingName?: string; instanceName?: string; } - -// --- adapter --- - -// https://developers.cloudflare.com/durable-objects/examples/websocket-hibernation-server/ -export default defineWebSocketAdapter< - CloudflareDurableAdapter, - CloudflareOptions ->((opts) => { - const hooks = new AdapterHookable(opts); - const peers = new Set(); - return { - ...adapterUtils(peers), - handleUpgrade: async (req, env, _context) => { - const bindingName = opts?.bindingName ?? "$DurableObject"; - const instanceName = opts?.instanceName ?? "crossws"; - const binding = (env as any)[bindingName] as CF.DurableObjectNamespace; - const id = binding.idFromName(instanceName); - const stub = binding.get(id); - return stub.fetch(req as CF.Request) as unknown as Response; - }, - handleDurableUpgrade: async (obj, request) => { - const res = await hooks.callHook("upgrade", request as Request); - if (res instanceof Response) { - return res; - } - const pair = new WebSocketPair(); - const client = pair[0]; - const server = pair[1]; - const peer = peerFromDurableEvent( - obj, - server as unknown as CF.WebSocket, - request, - ); - peers.add(peer); - (obj as DurableObjectPub).ctx.acceptWebSocket(server); - hooks.callAdapterHook("cloudflare:accept", peer); - hooks.callHook("open", peer); - // eslint-disable-next-line unicorn/no-null - return new Response(null, { - status: 101, - webSocket: client, - headers: res?.headers, - }); - }, - handleDurableMessage: async (obj, ws, message) => { - const peer = peerFromDurableEvent(obj, ws as CF.WebSocket); - hooks.callAdapterHook("cloudflare:message", peer, message); - hooks.callHook("message", peer, new Message(message, peer)); - }, - handleDurableClose: async (obj, ws, code, reason, wasClean) => { - const peer = peerFromDurableEvent(obj, ws as CF.WebSocket); - peers.delete(peer); - const details = { code, reason, wasClean }; - hooks.callAdapterHook("cloudflare:close", peer, details); - hooks.callHook("close", peer, details); - }, - }; -}); - -function peerFromDurableEvent( - obj: DurableObject, - ws: AugmentedWebSocket, - request?: Request | CF.Request, -): CloudflareDurablePeer { - let peer = ws._crosswsPeer; - if (peer) { - return peer; - } - peer = ws._crosswsPeer = new CloudflareDurablePeer({ - ws: ws as CF.WebSocket, - request: request as Request, - cfEnv: (obj as DurableObjectPub).env, - cfCtx: (obj as DurableObjectPub).ctx, - }); - return peer; -} - -// --- peer --- - -class CloudflareDurablePeer extends Peer<{ - ws: AugmentedWebSocket; - request?: Request; - peers?: never; - cfEnv: unknown; - cfCtx: DurableObject["ctx"]; -}> { - get peers() { - const clients = - this._internal.cfCtx.getWebSockets() as unknown as (typeof this._internal.ws)[]; - return new Set( - clients.map((client) => { - let peer = client._crosswsPeer; - if (!peer) { - peer = client._crosswsPeer = new CloudflareDurablePeer({ - ws: client, - request: undefined, - cfEnv: this._internal.cfEnv, - cfCtx: this._internal.cfCtx, - }); - } - return peer; - }), - ); - } - - send(data: unknown) { - return this._internal.ws.send(toBufferLike(data)); - } - - subscribe(topic: string): void { - super.subscribe(topic); - const state: CrosswsState = { - // Max limit: 2,048 bytes - ...(this._internal.ws.deserializeAttachment() as CrosswsState), - topics: this._topics, - }; - this._internal.ws._crosswsState = state; - this._internal.ws.serializeAttachment(state); - } - - publish(topic: string, data: unknown): void { - const clients = ( - this._internal.cfCtx.getWebSockets() as unknown as (typeof this._internal.ws)[] - ).filter((c) => c !== this._internal.ws); - if (clients.length === 0) { - return; - } - const dataBuff = toBufferLike(data); - for (const client of clients) { - let state = client._crosswsState; - if (!state) { - state = client._crosswsState = - client.deserializeAttachment() as CrosswsState; - } - if (state.topics?.has(topic)) { - client.send(dataBuff); - } - } - } - - close(code?: number, reason?: string) { - this._internal.ws.close(code, reason); - } -} diff --git a/src/peer.ts b/src/peer.ts index b7a7737..5619a3e 100644 --- a/src/peer.ts +++ b/src/peer.ts @@ -10,7 +10,8 @@ export interface AdapterInternal { export abstract class Peer { protected _internal: Internal; protected _topics: Set; - #id?: string; + protected _id?: string; + #ws?: Partial; constructor(internal: Internal) { @@ -22,10 +23,10 @@ export abstract class Peer { * Unique random [uuid v4](https://developer.mozilla.org/en-US/docs/Glossary/UUID) identifier for the peer. */ get id(): string { - if (!this.#id) { - this.#id = randomUUID(); + if (!this._id) { + this._id = randomUUID(); } - return this.#id; + return this._id; } /** IP address of the peer */ diff --git a/test/fixture/_index.html.ts b/test/fixture/_index.html.ts index 8f87bbe..4530be8 100644 --- a/test/fixture/_index.html.ts +++ b/test/fixture/_index.html.ts @@ -77,12 +77,9 @@ export default function indexTemplate(opts: { sse?: boolean } = {}) { ws = new WebSocket(url); ws.addEventListener("message", async (event) => { - const data = typeof event.data === "string" ? event.data : await event.data.text(); - const { user = "system", message = "" } = data.startsWith("{") - ? JSON.parse(data) - : { message: data }; + const message = typeof event.data === "string" ? event.data : await event.data.text(); log( - user, + "", typeof message === "string" ? message : JSON.stringify(message), ); }); diff --git a/test/fixture/cloudflare-durable.ts b/test/fixture/cloudflare-durable.ts index 42c07f1..2f29c35 100644 --- a/test/fixture/cloudflare-durable.ts +++ b/test/fixture/cloudflare-durable.ts @@ -27,6 +27,11 @@ export default { }; export class $DurableObject extends DurableObject { + constructor(state: DurableObjectState, env: Record) { + super(state, env); + ws.handleDurableInit(this, state, env); + } + fetch(request: Request) { return ws.handleDurableUpgrade(this, request); }