fix(cloudflare-durable): restore peer url and id after hibernation (#71)

This commit is contained in:
Pooya Parsa
2024-08-16 02:01:13 +02:00
committed by GitHub
parent 078b51da7f
commit 02f3e824e3
6 changed files with 195 additions and 160 deletions

View File

@@ -88,7 +88,7 @@ To gracefully close the connection, use `peer.close()`.
| `publish()` / `subscribe()` | ✓ | ⨉ | ✓ [^1] | ✓ [^1] | ✓ [^1] | ✓ | ✓ [^1] |
| `close()` | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ |
| `terminate()` | ✓ | ✓ [^2] | ✓ | ✓ | ✓ | ✓ | ✓ [^2] |
| `request` | ✓ | ✓ | ✓ [^3] | ✓ | ✓ [^3] | ✓ [^3] | ✓ |
| `request` | ✓ | ✓ | ✓ [^30] | ✓ | ✓ [^31] | ✓ [^31] | ✓ |
| `remoteAddress` | ✓ | ⨉ | ⨉ | ✓ | ✓ | ✓ | ⨉ |
| `websocket.url` | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ |
| `websocket.extensions` | ✓ [^4] | ⨉ | ⨉ | ✓ [^4] | ✓ [^4] | ✓ [^4] | ⨉ |
@@ -109,9 +109,9 @@ To gracefully close the connection, use `peer.close()`.
[^2]: `close()` will be used for compatibility.
[^1]: using a proxy for [Request](https://developer.mozilla.org/en-US/docs/Web/API/Request) compatible interface (`url`, `headers` only) wrapping Node.js requests.
[^30]: After durable object's hibernation, only `request.url` (and `peer.id`) remain available due to 2048 byte in-memory state limit.
[^3]: `request` is not always available (only in `open` hook).
[^31]: using a proxy for [Request](https://developer.mozilla.org/en-US/docs/Web/API/Request) compatible interface (`url`, `headers` only) wrapping Node.js requests.
[^4]: [`websocket.extensions`](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/extensions) is polyfilled using [`sec-websocket-extensions`](https://developer.mozilla.org/en-US/docs/Web/HTTP/Protocol_upgrade_mechanism#websocket-specific_headers) request header.

View File

@@ -77,6 +77,11 @@ export default {
};
export class $DurableObject extends DurableObject {
constructor(state, env) {
super(state, env);
ws.handleDurableInit(this, state, env);
}
fetch(request) {
return ws.handleDurableUpgrade(this, request);
}

View File

@@ -9,7 +9,166 @@ import { Peer } from "../peer.ts";
import type * as CF from "@cloudflare/workers-types";
import type { DurableObject } from "cloudflare:workers";
// --- types
// https://developers.cloudflare.com/durable-objects/examples/websocket-hibernation-server/
export default defineWebSocketAdapter<
CloudflareDurableAdapter,
CloudflareOptions
>((opts) => {
const hooks = new AdapterHookable(opts);
const peers = new Set<CloudflareDurablePeer>();
return {
...adapterUtils(peers),
handleUpgrade: async (req, env, _context) => {
const bindingName = opts?.bindingName ?? "$DurableObject";
const instanceName = opts?.instanceName ?? "crossws";
const binding = (env as any)[bindingName] as CF.DurableObjectNamespace;
const id = binding.idFromName(instanceName);
const stub = binding.get(id);
return stub.fetch(req as CF.Request) as unknown as Response;
},
handleDurableInit: async (obj, state, env) => {
// placeholder
},
handleDurableUpgrade: async (obj, request) => {
const res = await hooks.callHook("upgrade", request as Request);
if (res instanceof Response) {
return res;
}
const pair = new WebSocketPair();
const client = pair[0];
const server = pair[1];
const peer = CloudflareDurablePeer._restore(
obj,
server as unknown as CF.WebSocket,
request,
);
peers.add(peer);
(obj as DurableObjectPub).ctx.acceptWebSocket(server);
await hooks.callAdapterHook("cloudflare:accept", peer);
await hooks.callHook("open", peer);
// eslint-disable-next-line unicorn/no-null
return new Response(null, {
status: 101,
webSocket: client,
headers: res?.headers,
});
},
handleDurableMessage: async (obj, ws, message) => {
const peer = CloudflareDurablePeer._restore(obj, ws as CF.WebSocket);
await hooks.callAdapterHook("cloudflare:message", peer, message);
await hooks.callHook("message", peer, new Message(message, peer));
},
handleDurableClose: async (obj, ws, code, reason, wasClean) => {
const peer = CloudflareDurablePeer._restore(obj, ws as CF.WebSocket);
peers.delete(peer);
const details = { code, reason, wasClean };
await hooks.callAdapterHook("cloudflare:close", peer, details);
await hooks.callHook("close", peer, details);
},
};
});
// --- peer ---
class CloudflareDurablePeer extends Peer<{
ws: AugmentedWebSocket;
request?: Partial<Request>;
peers?: never;
durable: DurableObjectPub;
}> {
get peers() {
return new Set(
this.#getwebsockets().map((ws) =>
CloudflareDurablePeer._restore(this._internal.durable, ws),
),
);
}
#getwebsockets() {
return this._internal.durable.ctx.getWebSockets() as unknown as (typeof this._internal.ws)[];
}
send(data: unknown) {
return this._internal.ws.send(toBufferLike(data));
}
subscribe(topic: string): void {
super.subscribe(topic);
const state = getAttachedState(this._internal.ws);
if (!state.t) {
state.t = new Set();
}
state.t.add(topic);
setAttachedState(this._internal.ws, state);
}
publish(topic: string, data: unknown): void {
const websockets = this.#getwebsockets();
if (websockets.length < 2 /* 1 is self! */) {
return;
}
const dataBuff = toBufferLike(data);
for (const ws of websockets) {
if (ws === this._internal.ws) {
continue;
}
const state = getAttachedState(ws);
if (state.t?.has(topic)) {
ws.send(dataBuff);
}
}
}
close(code?: number, reason?: string) {
this._internal.ws.close(code, reason);
}
static _restore(
durable: DurableObject,
ws: AugmentedWebSocket,
request?: Request | CF.Request,
): CloudflareDurablePeer {
let peer = ws._crosswsPeer;
if (peer) {
return peer;
}
const state = (ws.deserializeAttachment() || {}) as AttachedState;
peer = ws._crosswsPeer = new CloudflareDurablePeer({
ws: ws as CF.WebSocket,
request: (request as Request) || { url: state.u },
durable: durable as DurableObjectPub,
});
if (state.i) {
peer._id = state.i;
}
if (request?.url) {
state.u = request.url;
}
state.i = peer.id;
setAttachedState(ws, state);
return peer;
}
}
// -- attached state utils ---
function getAttachedState(ws: AugmentedWebSocket): AttachedState {
let state = ws._crosswsState;
if (state) {
return state;
}
state = (ws.deserializeAttachment() as AttachedState) || {};
ws._crosswsState = state;
return state;
}
function setAttachedState(ws: AugmentedWebSocket, state: AttachedState) {
ws._crosswsState = state;
ws.serializeAttachment(state);
}
// --- types ---
declare class DurableObjectPub extends DurableObject {
public ctx: DurableObject["ctx"];
@@ -17,12 +176,18 @@ declare class DurableObjectPub extends DurableObject {
}
type AugmentedWebSocket = CF.WebSocket & {
_crosswsState?: CrosswsState;
_crosswsPeer?: CloudflareDurablePeer;
_crosswsState?: AttachedState;
};
type CrosswsState = {
topics?: Set<string>;
/** Max serialized limit: 2048 bytes (512..2048 characters) */
type AttachedState = {
/** Subscribed topics */
t?: Set<string>;
/** Peer id */
i?: string;
/** Request url */
u?: string;
};
export interface CloudflareDurableAdapter extends AdapterInstance {
@@ -32,6 +197,12 @@ export interface CloudflareDurableAdapter extends AdapterInstance {
context: CF.ExecutionContext,
): Promise<Response>;
handleDurableInit(
obj: DurableObject,
state: DurableObjectState,
env: unknown,
): void;
handleDurableUpgrade(
obj: DurableObject,
req: Request | CF.Request,
@@ -56,147 +227,3 @@ export interface CloudflareOptions extends AdapterOptions {
bindingName?: string;
instanceName?: string;
}
// --- adapter ---
// https://developers.cloudflare.com/durable-objects/examples/websocket-hibernation-server/
export default defineWebSocketAdapter<
CloudflareDurableAdapter,
CloudflareOptions
>((opts) => {
const hooks = new AdapterHookable(opts);
const peers = new Set<CloudflareDurablePeer>();
return {
...adapterUtils(peers),
handleUpgrade: async (req, env, _context) => {
const bindingName = opts?.bindingName ?? "$DurableObject";
const instanceName = opts?.instanceName ?? "crossws";
const binding = (env as any)[bindingName] as CF.DurableObjectNamespace;
const id = binding.idFromName(instanceName);
const stub = binding.get(id);
return stub.fetch(req as CF.Request) as unknown as Response;
},
handleDurableUpgrade: async (obj, request) => {
const res = await hooks.callHook("upgrade", request as Request);
if (res instanceof Response) {
return res;
}
const pair = new WebSocketPair();
const client = pair[0];
const server = pair[1];
const peer = peerFromDurableEvent(
obj,
server as unknown as CF.WebSocket,
request,
);
peers.add(peer);
(obj as DurableObjectPub).ctx.acceptWebSocket(server);
hooks.callAdapterHook("cloudflare:accept", peer);
hooks.callHook("open", peer);
// eslint-disable-next-line unicorn/no-null
return new Response(null, {
status: 101,
webSocket: client,
headers: res?.headers,
});
},
handleDurableMessage: async (obj, ws, message) => {
const peer = peerFromDurableEvent(obj, ws as CF.WebSocket);
hooks.callAdapterHook("cloudflare:message", peer, message);
hooks.callHook("message", peer, new Message(message, peer));
},
handleDurableClose: async (obj, ws, code, reason, wasClean) => {
const peer = peerFromDurableEvent(obj, ws as CF.WebSocket);
peers.delete(peer);
const details = { code, reason, wasClean };
hooks.callAdapterHook("cloudflare:close", peer, details);
hooks.callHook("close", peer, details);
},
};
});
function peerFromDurableEvent(
obj: DurableObject,
ws: AugmentedWebSocket,
request?: Request | CF.Request,
): CloudflareDurablePeer {
let peer = ws._crosswsPeer;
if (peer) {
return peer;
}
peer = ws._crosswsPeer = new CloudflareDurablePeer({
ws: ws as CF.WebSocket,
request: request as Request,
cfEnv: (obj as DurableObjectPub).env,
cfCtx: (obj as DurableObjectPub).ctx,
});
return peer;
}
// --- peer ---
class CloudflareDurablePeer extends Peer<{
ws: AugmentedWebSocket;
request?: Request;
peers?: never;
cfEnv: unknown;
cfCtx: DurableObject["ctx"];
}> {
get peers() {
const clients =
this._internal.cfCtx.getWebSockets() as unknown as (typeof this._internal.ws)[];
return new Set(
clients.map((client) => {
let peer = client._crosswsPeer;
if (!peer) {
peer = client._crosswsPeer = new CloudflareDurablePeer({
ws: client,
request: undefined,
cfEnv: this._internal.cfEnv,
cfCtx: this._internal.cfCtx,
});
}
return peer;
}),
);
}
send(data: unknown) {
return this._internal.ws.send(toBufferLike(data));
}
subscribe(topic: string): void {
super.subscribe(topic);
const state: CrosswsState = {
// Max limit: 2,048 bytes
...(this._internal.ws.deserializeAttachment() as CrosswsState),
topics: this._topics,
};
this._internal.ws._crosswsState = state;
this._internal.ws.serializeAttachment(state);
}
publish(topic: string, data: unknown): void {
const clients = (
this._internal.cfCtx.getWebSockets() as unknown as (typeof this._internal.ws)[]
).filter((c) => c !== this._internal.ws);
if (clients.length === 0) {
return;
}
const dataBuff = toBufferLike(data);
for (const client of clients) {
let state = client._crosswsState;
if (!state) {
state = client._crosswsState =
client.deserializeAttachment() as CrosswsState;
}
if (state.topics?.has(topic)) {
client.send(dataBuff);
}
}
}
close(code?: number, reason?: string) {
this._internal.ws.close(code, reason);
}
}

View File

@@ -10,7 +10,8 @@ export interface AdapterInternal {
export abstract class Peer<Internal extends AdapterInternal = AdapterInternal> {
protected _internal: Internal;
protected _topics: Set<string>;
#id?: string;
protected _id?: string;
#ws?: Partial<web.WebSocket>;
constructor(internal: Internal) {
@@ -22,10 +23,10 @@ export abstract class Peer<Internal extends AdapterInternal = AdapterInternal> {
* Unique random [uuid v4](https://developer.mozilla.org/en-US/docs/Glossary/UUID) identifier for the peer.
*/
get id(): string {
if (!this.#id) {
this.#id = randomUUID();
if (!this._id) {
this._id = randomUUID();
}
return this.#id;
return this._id;
}
/** IP address of the peer */

View File

@@ -77,12 +77,9 @@ export default function indexTemplate(opts: { sse?: boolean } = {}) {
ws = new WebSocket(url);
ws.addEventListener("message", async (event) => {
const data = typeof event.data === "string" ? event.data : await event.data.text();
const { user = "system", message = "" } = data.startsWith("{")
? JSON.parse(data)
: { message: data };
const message = typeof event.data === "string" ? event.data : await event.data.text();
log(
user,
"",
typeof message === "string" ? message : JSON.stringify(message),
);
});

View File

@@ -27,6 +27,11 @@ export default {
};
export class $DurableObject extends DurableObject {
constructor(state: DurableObjectState, env: Record<string, any>) {
super(state, env);
ws.handleDurableInit(this, state, env);
}
fetch(request: Request) {
return ws.handleDurableUpgrade(this, request);
}