mirror of
https://github.com/LukeHagar/crossws.git
synced 2025-12-06 12:27:46 +00:00
feat: universal access to all peers (#60)
This commit is contained in:
@@ -3,11 +3,15 @@
|
||||
import type { WebSocketHandler, ServerWebSocket, Server } from "bun";
|
||||
import { Message } from "../message";
|
||||
import { Peer } from "../peer";
|
||||
import { AdapterOptions, defineWebSocketAdapter } from "../types";
|
||||
import {
|
||||
AdapterOptions,
|
||||
AdapterInstance,
|
||||
defineWebSocketAdapter,
|
||||
} from "../types";
|
||||
import { AdapterHookable } from "../hooks";
|
||||
import { toBufferLike } from "../_utils";
|
||||
|
||||
export interface BunAdapter {
|
||||
export interface BunAdapter extends AdapterInstance {
|
||||
websocket: WebSocketHandler<ContextData>;
|
||||
handleUpgrade(req: Request, server: Server): Promise<Response | undefined>;
|
||||
}
|
||||
@@ -15,7 +19,7 @@ export interface BunAdapter {
|
||||
export interface BunOptions extends AdapterOptions {}
|
||||
|
||||
type ContextData = {
|
||||
_peer?: Peer;
|
||||
_peer?: BunPeer;
|
||||
request?: Request;
|
||||
requestUrl?: string;
|
||||
server?: Server;
|
||||
@@ -24,7 +28,9 @@ type ContextData = {
|
||||
export default defineWebSocketAdapter<BunAdapter, BunOptions>(
|
||||
(options = {}) => {
|
||||
const hooks = new AdapterHookable(options);
|
||||
const peers = new Set<BunPeer>();
|
||||
return {
|
||||
peers,
|
||||
async handleUpgrade(request, server) {
|
||||
const res = await hooks.callHook("upgrade", request);
|
||||
if (res instanceof Response) {
|
||||
@@ -38,34 +44,37 @@ export default defineWebSocketAdapter<BunAdapter, BunOptions>(
|
||||
} satisfies ContextData,
|
||||
headers: res?.headers,
|
||||
});
|
||||
return upgradeOK
|
||||
? undefined
|
||||
: new Response("Upgrade failed", { status: 500 });
|
||||
if (!upgradeOK) {
|
||||
return new Response("Upgrade failed", { status: 500 });
|
||||
}
|
||||
},
|
||||
websocket: {
|
||||
message: (ws, message) => {
|
||||
const peer = getPeer(ws);
|
||||
const peer = getPeer(ws, peers);
|
||||
hooks.callHook("message", peer, new Message(message));
|
||||
},
|
||||
open: (ws) => {
|
||||
const peer = getPeer(ws);
|
||||
const peer = getPeer(ws, peers);
|
||||
peers.add(peer);
|
||||
hooks.callAdapterHook("bun:open", peer, ws);
|
||||
hooks.callHook("open", peer);
|
||||
},
|
||||
close: (ws) => {
|
||||
const peer = getPeer(ws);
|
||||
const peer = getPeer(ws, peers);
|
||||
peers.delete(peer);
|
||||
hooks.callAdapterHook("bun:close", peer, ws);
|
||||
hooks.callHook("close", peer, {});
|
||||
},
|
||||
drain: (ws) => {
|
||||
const peer = getPeer(ws);
|
||||
const peer = getPeer(ws, peers);
|
||||
hooks.callAdapterHook("bun:drain", peer);
|
||||
},
|
||||
ping(ws, data) {
|
||||
const peer = getPeer(ws);
|
||||
const peer = getPeer(ws, peers);
|
||||
hooks.callAdapterHook("bun:ping", peer, ws, data);
|
||||
},
|
||||
pong(ws, data) {
|
||||
const peer = getPeer(ws);
|
||||
const peer = getPeer(ws, peers);
|
||||
hooks.callAdapterHook("bun:pong", peer, ws, data);
|
||||
},
|
||||
},
|
||||
@@ -73,11 +82,14 @@ export default defineWebSocketAdapter<BunAdapter, BunOptions>(
|
||||
},
|
||||
);
|
||||
|
||||
function getPeer(ws: ServerWebSocket<ContextData>) {
|
||||
function getPeer(
|
||||
ws: ServerWebSocket<ContextData>,
|
||||
peers: Set<BunPeer>,
|
||||
): BunPeer {
|
||||
if (ws.data?._peer) {
|
||||
return ws.data._peer;
|
||||
}
|
||||
const peer = new BunPeer({ bun: { ws } });
|
||||
const peer = new BunPeer({ peers, bun: { ws } });
|
||||
ws.data = {
|
||||
...ws.data,
|
||||
_peer: peer,
|
||||
@@ -86,6 +98,7 @@ function getPeer(ws: ServerWebSocket<ContextData>) {
|
||||
}
|
||||
|
||||
class BunPeer extends Peer<{
|
||||
peers: Set<BunPeer>;
|
||||
bun: { ws: ServerWebSocket<ContextData> };
|
||||
}> {
|
||||
get addr() {
|
||||
|
||||
@@ -2,7 +2,11 @@
|
||||
|
||||
import type * as CF from "@cloudflare/workers-types";
|
||||
import type { DurableObject } from "cloudflare:workers";
|
||||
import { AdapterOptions, defineWebSocketAdapter } from "../types";
|
||||
import {
|
||||
AdapterOptions,
|
||||
AdapterInstance,
|
||||
defineWebSocketAdapter,
|
||||
} from "../types";
|
||||
import { Peer } from "../peer";
|
||||
import { Message } from "../message";
|
||||
import { AdapterHookable } from "../hooks";
|
||||
@@ -22,7 +26,7 @@ type CrosswsState = {
|
||||
topics?: Set<string>;
|
||||
};
|
||||
|
||||
export interface CloudflareDurableAdapter {
|
||||
export interface CloudflareDurableAdapter extends AdapterInstance {
|
||||
handleUpgrade(
|
||||
req: Request | CF.Request,
|
||||
env: unknown,
|
||||
@@ -59,7 +63,9 @@ export default defineWebSocketAdapter<
|
||||
CloudflareOptions
|
||||
>((opts) => {
|
||||
const hooks = new AdapterHookable(opts);
|
||||
const peers = new Set<CloudflareDurablePeer>();
|
||||
return {
|
||||
peers,
|
||||
handleUpgrade: async (req, env, _context) => {
|
||||
const bindingName = opts?.bindingName ?? "$DurableObject";
|
||||
const instanceName = opts?.instanceName ?? "crossws";
|
||||
@@ -81,6 +87,7 @@ export default defineWebSocketAdapter<
|
||||
server as unknown as CF.WebSocket,
|
||||
request,
|
||||
);
|
||||
peers.add(peer);
|
||||
(obj as DurableObjectPub).ctx.acceptWebSocket(server);
|
||||
hooks.callAdapterHook("cloudflare:accept", peer);
|
||||
hooks.callHook("open", peer);
|
||||
@@ -98,10 +105,10 @@ export default defineWebSocketAdapter<
|
||||
},
|
||||
handleDurableClose: async (obj, ws, code, reason, wasClean) => {
|
||||
const peer = peerFromDurableEvent(obj, ws as CF.WebSocket);
|
||||
peers.delete(peer);
|
||||
const details = { code, reason, wasClean };
|
||||
hooks.callAdapterHook("cloudflare:close", peer, details);
|
||||
hooks.callHook("close", peer, details);
|
||||
ws.close(code, reason);
|
||||
},
|
||||
};
|
||||
});
|
||||
@@ -127,6 +134,7 @@ function peerFromDurableEvent(
|
||||
}
|
||||
|
||||
class CloudflareDurablePeer extends Peer<{
|
||||
peers?: never;
|
||||
cloudflare: {
|
||||
ws: AugmentedWebSocket;
|
||||
request?: Request | CF.Request;
|
||||
@@ -166,6 +174,27 @@ class CloudflareDurablePeer extends Peer<{
|
||||
this._internal.cloudflare.ws.serializeAttachment(state);
|
||||
}
|
||||
|
||||
get peers() {
|
||||
const clients =
|
||||
this._internal.cloudflare.context.getWebSockets() as unknown as (typeof this._internal.cloudflare.ws)[];
|
||||
return new Set(
|
||||
clients.map((client) => {
|
||||
let peer = client._crosswsPeer;
|
||||
if (!peer) {
|
||||
peer = client._crosswsPeer = new CloudflareDurablePeer({
|
||||
cloudflare: {
|
||||
ws: client,
|
||||
request: undefined,
|
||||
env: this._internal.cloudflare.env,
|
||||
context: this._internal.cloudflare.context,
|
||||
},
|
||||
});
|
||||
}
|
||||
return peer;
|
||||
}),
|
||||
);
|
||||
}
|
||||
|
||||
publish(topic: string, message: any): void {
|
||||
const clients = (
|
||||
this._internal.cloudflare.context.getWebSockets() as unknown as (typeof this._internal.cloudflare.ws)[]
|
||||
|
||||
@@ -3,7 +3,11 @@
|
||||
import type * as _cf from "@cloudflare/workers-types";
|
||||
|
||||
import { Peer } from "../peer";
|
||||
import { AdapterOptions, defineWebSocketAdapter } from "../types.js";
|
||||
import {
|
||||
AdapterOptions,
|
||||
AdapterInstance,
|
||||
defineWebSocketAdapter,
|
||||
} from "../types.js";
|
||||
import { Message } from "../message";
|
||||
import { WSError } from "../error";
|
||||
import { AdapterHookable } from "../hooks.js";
|
||||
@@ -12,7 +16,7 @@ import { toBufferLike } from "../_utils";
|
||||
declare const WebSocketPair: typeof _cf.WebSocketPair;
|
||||
declare const Response: typeof _cf.Response;
|
||||
|
||||
export interface CloudflareAdapter {
|
||||
export interface CloudflareAdapter extends AdapterInstance {
|
||||
handleUpgrade(
|
||||
req: _cf.Request,
|
||||
env: unknown,
|
||||
@@ -25,7 +29,9 @@ export interface CloudflareOptions extends AdapterOptions {}
|
||||
export default defineWebSocketAdapter<CloudflareAdapter, CloudflareOptions>(
|
||||
(options = {}) => {
|
||||
const hooks = new AdapterHookable(options);
|
||||
const peers = new Set<CloudflarePeer>();
|
||||
return {
|
||||
peers,
|
||||
handleUpgrade: async (request, env, context) => {
|
||||
const res = await hooks.callHook(
|
||||
"upgrade",
|
||||
@@ -38,8 +44,10 @@ export default defineWebSocketAdapter<CloudflareAdapter, CloudflareOptions>(
|
||||
const client = pair[0];
|
||||
const server = pair[1];
|
||||
const peer = new CloudflarePeer({
|
||||
peers,
|
||||
cloudflare: { client, server, request, env, context },
|
||||
});
|
||||
peers.add(peer);
|
||||
server.accept();
|
||||
hooks.callAdapterHook("cloudflare:accept", peer);
|
||||
hooks.callHook("open", peer);
|
||||
@@ -48,10 +56,12 @@ export default defineWebSocketAdapter<CloudflareAdapter, CloudflareOptions>(
|
||||
hooks.callHook("message", peer, new Message(event.data));
|
||||
});
|
||||
server.addEventListener("error", (event) => {
|
||||
peers.delete(peer);
|
||||
hooks.callAdapterHook("cloudflare:error", peer, event);
|
||||
hooks.callHook("error", peer, new WSError(event.error));
|
||||
});
|
||||
server.addEventListener("close", (event) => {
|
||||
peers.delete(peer);
|
||||
hooks.callAdapterHook("cloudflare:close", peer, event);
|
||||
hooks.callHook("close", peer, event);
|
||||
});
|
||||
@@ -67,6 +77,7 @@ export default defineWebSocketAdapter<CloudflareAdapter, CloudflareOptions>(
|
||||
);
|
||||
|
||||
class CloudflarePeer extends Peer<{
|
||||
peers: Set<CloudflarePeer>;
|
||||
cloudflare: {
|
||||
client: _cf.WebSocket;
|
||||
server: _cf.WebSocket;
|
||||
@@ -96,6 +107,16 @@ class CloudflarePeer extends Peer<{
|
||||
return 0;
|
||||
}
|
||||
|
||||
publish(_topic: string, _message: any): void {
|
||||
// Not supported
|
||||
// Throws: A hanging Promise was canceled
|
||||
// for (const peer of this._internal.peers) {
|
||||
// if (peer !== this && peer._topics.has(_topic)) {
|
||||
// peer.publish(_topic, _message);
|
||||
// }
|
||||
// }
|
||||
}
|
||||
|
||||
close(code?: number, reason?: string) {
|
||||
this._internal.cloudflare.client.close(code, reason);
|
||||
}
|
||||
|
||||
@@ -5,11 +5,15 @@
|
||||
import { Message } from "../message.ts";
|
||||
import { WSError } from "../error.ts";
|
||||
import { Peer } from "../peer.ts";
|
||||
import { AdapterOptions, defineWebSocketAdapter } from "../types.ts";
|
||||
import {
|
||||
AdapterOptions,
|
||||
AdapterInstance,
|
||||
defineWebSocketAdapter,
|
||||
} from "../types.ts";
|
||||
import { AdapterHookable } from "../hooks.ts";
|
||||
import { toBufferLike } from "../_utils.ts";
|
||||
|
||||
export interface DenoAdapter {
|
||||
export interface DenoAdapter extends AdapterInstance {
|
||||
handleUpgrade(req: Request, info: ServeHandlerInfo): Promise<Response>;
|
||||
}
|
||||
|
||||
@@ -22,15 +26,12 @@ declare global {
|
||||
type WebSocketUpgrade = import("@deno/types").Deno.WebSocketUpgrade;
|
||||
type ServeHandlerInfo = unknown; // TODO
|
||||
|
||||
type DenoWSSharedState = {
|
||||
peers: Set<DenoPeer>;
|
||||
};
|
||||
|
||||
export default defineWebSocketAdapter<DenoAdapter, DenoOptions>(
|
||||
(options = {}) => {
|
||||
const hooks = new AdapterHookable(options);
|
||||
const sharedState: DenoWSSharedState = { peers: new Set() };
|
||||
const peers = new Set<DenoPeer>();
|
||||
return {
|
||||
peers,
|
||||
handleUpgrade: async (request, info) => {
|
||||
const res = await hooks.callHook("upgrade", request);
|
||||
if (res instanceof Response) {
|
||||
@@ -41,9 +42,10 @@ export default defineWebSocketAdapter<DenoAdapter, DenoOptions>(
|
||||
headers: res?.headers,
|
||||
});
|
||||
const peer = new DenoPeer({
|
||||
deno: { ws: upgrade.socket, request, info, sharedState },
|
||||
peers,
|
||||
deno: { ws: upgrade.socket, request, info },
|
||||
});
|
||||
sharedState.peers.add(peer);
|
||||
peers.add(peer);
|
||||
upgrade.socket.addEventListener("open", () => {
|
||||
hooks.callAdapterHook("deno:open", peer);
|
||||
hooks.callHook("open", peer);
|
||||
@@ -53,12 +55,12 @@ export default defineWebSocketAdapter<DenoAdapter, DenoOptions>(
|
||||
hooks.callHook("message", peer, new Message(event.data));
|
||||
});
|
||||
upgrade.socket.addEventListener("close", () => {
|
||||
sharedState.peers.delete(peer);
|
||||
peers.delete(peer);
|
||||
hooks.callAdapterHook("deno:close", peer);
|
||||
hooks.callHook("close", peer, {});
|
||||
});
|
||||
upgrade.socket.addEventListener("error", (error) => {
|
||||
sharedState.peers.delete(peer);
|
||||
peers.delete(peer);
|
||||
hooks.callAdapterHook("deno:error", peer, error);
|
||||
hooks.callHook("error", peer, new WSError(error));
|
||||
});
|
||||
@@ -69,11 +71,11 @@ export default defineWebSocketAdapter<DenoAdapter, DenoOptions>(
|
||||
);
|
||||
|
||||
class DenoPeer extends Peer<{
|
||||
peers: Set<DenoPeer>;
|
||||
deno: {
|
||||
ws: WebSocketUpgrade["socket"];
|
||||
request: Request;
|
||||
info: ServeHandlerInfo;
|
||||
sharedState: DenoWSSharedState;
|
||||
};
|
||||
}> {
|
||||
get addr() {
|
||||
@@ -98,11 +100,11 @@ class DenoPeer extends Peer<{
|
||||
return 0;
|
||||
}
|
||||
|
||||
publish(topic: string, message: any): void {
|
||||
publish(topic: string, message: any) {
|
||||
const data = toBufferLike(message);
|
||||
for (const peer of this._internal.deno.sharedState.peers) {
|
||||
for (const peer of this._internal.peers) {
|
||||
if (peer !== this && peer._topics.has(topic)) {
|
||||
peer.send(data);
|
||||
peer._internal.deno.ws.send(data);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -13,13 +13,17 @@ import type {
|
||||
import { Peer } from "../peer";
|
||||
import { Message } from "../message";
|
||||
import { WSError } from "../error";
|
||||
import { AdapterOptions, defineWebSocketAdapter } from "../types";
|
||||
import {
|
||||
AdapterOptions,
|
||||
AdapterInstance,
|
||||
defineWebSocketAdapter,
|
||||
} from "../types";
|
||||
import { AdapterHookable } from "../hooks";
|
||||
import { toBufferLike } from "../_utils";
|
||||
|
||||
type AugmentedReq = IncomingMessage & { _upgradeHeaders?: HeadersInit };
|
||||
|
||||
export interface NodeAdapter {
|
||||
export interface NodeAdapter extends AdapterInstance {
|
||||
handleUpgrade(req: IncomingMessage, socket: Duplex, head: Buffer): void;
|
||||
closeAll: (code?: number, data?: string | Buffer) => void;
|
||||
}
|
||||
@@ -32,6 +36,7 @@ export interface NodeOptions extends AdapterOptions {
|
||||
export default defineWebSocketAdapter<NodeAdapter, NodeOptions>(
|
||||
(options = {}) => {
|
||||
const hooks = new AdapterHookable(options);
|
||||
const peers = new Set<NodePeer>();
|
||||
|
||||
const wss: WebSocketServer =
|
||||
options.wss ||
|
||||
@@ -41,7 +46,8 @@ export default defineWebSocketAdapter<NodeAdapter, NodeOptions>(
|
||||
}) as WebSocketServer);
|
||||
|
||||
wss.on("connection", (ws, req) => {
|
||||
const peer = new NodePeer({ node: { ws, req, server: wss } });
|
||||
const peer = new NodePeer({ peers, node: { ws, req, server: wss } });
|
||||
peers.add(peer);
|
||||
hooks.callHook("open", peer);
|
||||
|
||||
// Managed socket-level events
|
||||
@@ -53,10 +59,12 @@ export default defineWebSocketAdapter<NodeAdapter, NodeOptions>(
|
||||
hooks.callHook("message", peer, new Message(data, isBinary));
|
||||
});
|
||||
ws.on("error", (error: Error) => {
|
||||
peers.delete(peer);
|
||||
hooks.callAdapterHook("node:error", peer, error);
|
||||
hooks.callHook("error", peer, new WSError(error));
|
||||
});
|
||||
ws.on("close", (code: number, reason: Buffer) => {
|
||||
peers.delete(peer);
|
||||
hooks.callAdapterHook("node:close", peer, code, reason);
|
||||
hooks.callHook("close", peer, {
|
||||
code,
|
||||
@@ -95,6 +103,7 @@ export default defineWebSocketAdapter<NodeAdapter, NodeOptions>(
|
||||
});
|
||||
|
||||
return {
|
||||
peers,
|
||||
handleUpgrade: async (req, socket, head) => {
|
||||
const res = await hooks.callHook("upgrade", new NodeReqProxy(req));
|
||||
if (res instanceof Response) {
|
||||
@@ -163,6 +172,7 @@ async function sendResponse(socket: Duplex, res: Response) {
|
||||
}
|
||||
|
||||
class NodePeer extends Peer<{
|
||||
peers: Set<NodePeer>;
|
||||
node: {
|
||||
server: WebSocketServer;
|
||||
req: IncomingMessage;
|
||||
@@ -221,9 +231,8 @@ class NodePeer extends Peer<{
|
||||
binary: isBinary,
|
||||
...options,
|
||||
};
|
||||
for (const client of this._internal.node.server.clients) {
|
||||
const peer = (client as WebSocketT & { _peer?: NodePeer })._peer;
|
||||
if (peer && peer !== this && peer._topics.has(topic)) {
|
||||
for (const peer of this._internal.peers) {
|
||||
if (peer !== this && peer._topics.has(topic)) {
|
||||
peer._internal.node.ws.send(data, sendOptions);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -10,7 +10,11 @@ import type {
|
||||
} from "uWebSockets.js";
|
||||
import { Peer } from "../peer";
|
||||
import { Message } from "../message";
|
||||
import { AdapterOptions, defineWebSocketAdapter } from "../types";
|
||||
import {
|
||||
AdapterOptions,
|
||||
AdapterInstance,
|
||||
defineWebSocketAdapter,
|
||||
} from "../types";
|
||||
import { AdapterHookable } from "../hooks";
|
||||
import { toBufferLike } from "../_utils";
|
||||
|
||||
@@ -23,7 +27,7 @@ type UserData = {
|
||||
|
||||
type WebSocketHandler = WebSocketBehavior<UserData>;
|
||||
|
||||
export interface UWSAdapter {
|
||||
export interface UWSAdapter extends AdapterInstance {
|
||||
websocket: WebSocketHandler;
|
||||
}
|
||||
|
||||
@@ -44,11 +48,14 @@ export interface UWSOptions extends AdapterOptions {
|
||||
export default defineWebSocketAdapter<UWSAdapter, UWSOptions>(
|
||||
(options = {}) => {
|
||||
const hooks = new AdapterHookable(options);
|
||||
const peers = new Set<UWSPeer>();
|
||||
return {
|
||||
peers,
|
||||
websocket: {
|
||||
...options.uws,
|
||||
close(ws, code, message) {
|
||||
const peer = getPeer(ws);
|
||||
const peer = getPeer(ws, peers);
|
||||
peers.delete(peer);
|
||||
hooks.callAdapterHook("uws:close", peer, ws, code, message);
|
||||
hooks.callHook("close", peer, {
|
||||
code,
|
||||
@@ -56,30 +63,31 @@ export default defineWebSocketAdapter<UWSAdapter, UWSOptions>(
|
||||
});
|
||||
},
|
||||
drain(ws) {
|
||||
const peer = getPeer(ws);
|
||||
const peer = getPeer(ws, peers);
|
||||
hooks.callAdapterHook("uws:drain", peer, ws);
|
||||
},
|
||||
message(ws, message, isBinary) {
|
||||
const peer = getPeer(ws);
|
||||
const peer = getPeer(ws, peers);
|
||||
hooks.callAdapterHook("uws:message", peer, ws, message, isBinary);
|
||||
const msg = new Message(message, isBinary);
|
||||
hooks.callHook("message", peer, msg);
|
||||
},
|
||||
open(ws) {
|
||||
const peer = getPeer(ws);
|
||||
const peer = getPeer(ws, peers);
|
||||
peers.add(peer);
|
||||
hooks.callAdapterHook("uws:open", peer, ws);
|
||||
hooks.callHook("open", peer);
|
||||
},
|
||||
ping(ws, message) {
|
||||
const peer = getPeer(ws);
|
||||
const peer = getPeer(ws, peers);
|
||||
hooks.callAdapterHook("uws:ping", peer, ws, message);
|
||||
},
|
||||
pong(ws, message) {
|
||||
const peer = getPeer(ws);
|
||||
const peer = getPeer(ws, peers);
|
||||
hooks.callAdapterHook("uws:pong", peer, ws, message);
|
||||
},
|
||||
subscription(ws, topic, newCount, oldCount) {
|
||||
const peer = getPeer(ws);
|
||||
const peer = getPeer(ws, peers);
|
||||
hooks.callAdapterHook(
|
||||
"uws:subscription",
|
||||
peer,
|
||||
@@ -146,14 +154,14 @@ class UWSReqProxy {
|
||||
private _rawHeaders: [string, string][] = [];
|
||||
url: string;
|
||||
|
||||
constructor(private _req: HttpRequest) {
|
||||
constructor(_req: HttpRequest) {
|
||||
// We need to precompute values since uws doesn't provide them after handler.
|
||||
|
||||
// Headers
|
||||
let host = "localhost";
|
||||
let proto = "http";
|
||||
// eslint-disable-next-line unicorn/no-array-for-each
|
||||
this._req.forEach((key, value) => {
|
||||
_req.forEach((key, value) => {
|
||||
if (key === "host") {
|
||||
host = value;
|
||||
} else if (key === "x-forwarded-proto" && value === "https") {
|
||||
@@ -176,17 +184,18 @@ class UWSReqProxy {
|
||||
}
|
||||
}
|
||||
|
||||
function getPeer(ws: WebSocket<UserData>) {
|
||||
function getPeer(ws: WebSocket<UserData>, peers: Set<UWSPeer>): UWSPeer {
|
||||
const userData = ws.getUserData();
|
||||
if (userData._peer) {
|
||||
return userData._peer as Peer;
|
||||
return userData._peer as UWSPeer;
|
||||
}
|
||||
const peer = new UWSPeer({ uws: { ws, userData } });
|
||||
const peer = new UWSPeer({ peers, uws: { ws, userData } });
|
||||
userData._peer = peer;
|
||||
return peer;
|
||||
}
|
||||
|
||||
class UWSPeer extends Peer<{
|
||||
peers: Set<UWSPeer>;
|
||||
uws: {
|
||||
ws: WebSocket<UserData>;
|
||||
userData: UserData;
|
||||
|
||||
24
src/peer.ts
24
src/peer.ts
@@ -8,17 +8,21 @@ const ReadyStateMap = {
|
||||
3: "closed",
|
||||
} as const;
|
||||
|
||||
export abstract class Peer<AdapterContext = any> {
|
||||
protected _internal: AdapterContext;
|
||||
export interface AdapterInternal {
|
||||
peers?: Set<Peer>;
|
||||
}
|
||||
|
||||
export abstract class Peer<Internal extends AdapterInternal = AdapterInternal> {
|
||||
protected _internal: Internal;
|
||||
protected _topics: Set<string>;
|
||||
|
||||
private static _idCounter = 0;
|
||||
private _id: string;
|
||||
|
||||
constructor(_internalCtx: AdapterContext) {
|
||||
constructor(internal: Internal) {
|
||||
this._id = ++Peer._idCounter + "";
|
||||
this._topics = new Set();
|
||||
this._internal = _internalCtx;
|
||||
this._internal = internal;
|
||||
}
|
||||
|
||||
get id(): string {
|
||||
@@ -41,11 +45,17 @@ export abstract class Peer<AdapterContext = any> {
|
||||
return -1;
|
||||
}
|
||||
|
||||
get peers(): Set<Peer> {
|
||||
return this._internal.peers || new Set();
|
||||
}
|
||||
|
||||
abstract send(message: any, options?: { compress?: boolean }): number;
|
||||
|
||||
publish(topic: string, message: any, options?: { compress?: boolean }) {
|
||||
// noop
|
||||
}
|
||||
abstract publish(
|
||||
topic: string,
|
||||
message: any,
|
||||
options?: { compress?: boolean },
|
||||
): void;
|
||||
|
||||
subscribe(topic: string) {
|
||||
this._topics.add(topic);
|
||||
|
||||
@@ -4,7 +4,9 @@ import type { Peer } from "./peer.ts";
|
||||
|
||||
// --- Adapter ---
|
||||
|
||||
export interface CrossWSAdapter {}
|
||||
export interface AdapterInstance {
|
||||
readonly peers: Set<Peer>;
|
||||
}
|
||||
|
||||
export interface AdapterOptions {
|
||||
resolve?: ResolveHooks;
|
||||
@@ -13,12 +15,12 @@ export interface AdapterOptions {
|
||||
}
|
||||
|
||||
export type Adapter<
|
||||
AdapterT extends CrossWSAdapter = CrossWSAdapter,
|
||||
AdapterT extends AdapterInstance = AdapterInstance,
|
||||
Options extends AdapterOptions = AdapterOptions,
|
||||
> = (options?: Options) => AdapterT;
|
||||
|
||||
export function defineWebSocketAdapter<
|
||||
AdapterT extends CrossWSAdapter = CrossWSAdapter,
|
||||
AdapterT extends AdapterInstance = AdapterInstance,
|
||||
Options extends AdapterOptions = AdapterOptions,
|
||||
>(factory: Adapter<AdapterT, Options>) {
|
||||
return factory;
|
||||
|
||||
@@ -8,15 +8,24 @@ import { wsTests } from "./tests";
|
||||
|
||||
const fixtureDir = fileURLToPath(new URL("fixture", import.meta.url));
|
||||
|
||||
const websockets = new Set<WebSocket>();
|
||||
afterEach(() => {
|
||||
for (const ws of websockets) {
|
||||
ws.close();
|
||||
}
|
||||
websockets.clear();
|
||||
});
|
||||
|
||||
export function wsConnect(
|
||||
url: string,
|
||||
opts?: { skip?: number; headers?: OutgoingHttpHeaders },
|
||||
) {
|
||||
const ws = new WebSocket(url, { headers: opts?.headers });
|
||||
websockets.add(ws);
|
||||
|
||||
const upgradeHeaders: Record<string, string> = Object.create(null);
|
||||
|
||||
const send = async (data: any) => {
|
||||
const send = async (data: any): Promise<any> => {
|
||||
ws.send(
|
||||
typeof data === "string" ? data : JSON.stringify({ message: data }),
|
||||
);
|
||||
@@ -55,11 +64,6 @@ export function wsConnect(
|
||||
}
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
ws.removeAllListeners();
|
||||
ws.close();
|
||||
});
|
||||
|
||||
const res = {
|
||||
ws,
|
||||
send,
|
||||
|
||||
@@ -12,7 +12,14 @@ describe("node", () => {
|
||||
|
||||
beforeAll(async () => {
|
||||
ws = createDemo(nodeAdapter);
|
||||
server = createServer((_req, res) => {
|
||||
server = createServer((req, res) => {
|
||||
if (req.url === "/peers") {
|
||||
return res.end(
|
||||
JSON.stringify({
|
||||
peers: [...ws.peers].map((p) => p.id),
|
||||
}),
|
||||
);
|
||||
}
|
||||
res.end("ok");
|
||||
});
|
||||
server.on("upgrade", ws.handleUpgrade);
|
||||
|
||||
@@ -19,14 +19,19 @@ describe("uws", () => {
|
||||
res.onAborted(() => {
|
||||
aborted = true;
|
||||
});
|
||||
const html = "OK";
|
||||
|
||||
let resBody = "OK";
|
||||
const url = req.getUrl();
|
||||
if (url === "/peers") {
|
||||
resBody = JSON.stringify({ peers: [...ws.peers].map((p) => p.id) });
|
||||
}
|
||||
|
||||
if (aborted) {
|
||||
return;
|
||||
}
|
||||
res.cork(() => {
|
||||
res.writeStatus("200 OK");
|
||||
res.writeHeader("Content-Type", "text/html");
|
||||
res.end(html);
|
||||
res.end(resBody);
|
||||
});
|
||||
});
|
||||
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import { ResolveHooks, Adapter, defineHooks } from "../../src/index.ts";
|
||||
import { Adapter, AdapterInstance, defineHooks } from "../../src/index.ts";
|
||||
|
||||
export const getIndexHTML = () =>
|
||||
import("./_index.html.ts").then((r) => r.default);
|
||||
@@ -33,6 +33,12 @@ export function createDemo<T extends Adapter<any, any>>(
|
||||
});
|
||||
break;
|
||||
}
|
||||
case "peers": {
|
||||
peer.send({
|
||||
peers: [...peer.peers].map((p) => p.id),
|
||||
});
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
peer.send(msgText);
|
||||
peer.publish("chat", msgText);
|
||||
@@ -61,3 +67,17 @@ export function createDemo<T extends Adapter<any, any>>(
|
||||
hooks,
|
||||
});
|
||||
}
|
||||
|
||||
export function handleDemoRoutes(
|
||||
ws: AdapterInstance,
|
||||
request: Request,
|
||||
): Response | undefined {
|
||||
const url = new URL(request.url);
|
||||
if (url.pathname === "/peers") {
|
||||
return new Response(
|
||||
JSON.stringify({
|
||||
peers: [...ws.peers].map((p) => p.id),
|
||||
}),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
// You can run this demo using `bun --bun ./bun.ts` or `npm run play:bun` in repo
|
||||
|
||||
import bunAdapter from "../../src/adapters/bun";
|
||||
import { createDemo, getIndexHTML } from "./_shared";
|
||||
import { createDemo, getIndexHTML, handleDemoRoutes } from "./_shared";
|
||||
|
||||
const ws = createDemo(bunAdapter);
|
||||
|
||||
@@ -10,6 +10,10 @@ Bun.serve({
|
||||
hostname: "localhost",
|
||||
websocket: ws.websocket,
|
||||
async fetch(request, server) {
|
||||
const response = handleDemoRoutes(ws, request);
|
||||
if (response) {
|
||||
return response;
|
||||
}
|
||||
if (request.headers.get("upgrade") === "websocket") {
|
||||
return ws.handleUpgrade(request, server);
|
||||
}
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
// You can run this demo using `npm run play:cf-durable` in repo
|
||||
import { DurableObject } from "cloudflare:workers";
|
||||
import cloudflareAdapter from "../../src/adapters/cloudflare-durable.ts";
|
||||
import { createDemo, getIndexHTML } from "./_shared.ts";
|
||||
import { createDemo, getIndexHTML, handleDemoRoutes } from "./_shared.ts";
|
||||
|
||||
const ws = createDemo(cloudflareAdapter);
|
||||
|
||||
@@ -11,6 +11,11 @@ export default {
|
||||
env: Record<string, any>,
|
||||
context: ExecutionContext,
|
||||
): Promise<Response> {
|
||||
const response = handleDemoRoutes(ws, request);
|
||||
if (response) {
|
||||
return response;
|
||||
}
|
||||
|
||||
if (request.headers.get("upgrade") === "websocket") {
|
||||
return ws.handleUpgrade(request, env, context);
|
||||
}
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
// You can run this demo using `npm run play:cf` in repo
|
||||
import type { Request, ExecutionContext } from "@cloudflare/workers-types";
|
||||
import type { ExecutionContext } from "@cloudflare/workers-types";
|
||||
import cloudflareAdapter from "../../src/adapters/cloudflare";
|
||||
import { createDemo, getIndexHTML } from "./_shared.ts";
|
||||
import { createDemo, getIndexHTML, handleDemoRoutes } from "./_shared.ts";
|
||||
|
||||
const ws = createDemo(cloudflareAdapter);
|
||||
|
||||
@@ -11,8 +11,13 @@ export default {
|
||||
env: Record<string, any>,
|
||||
context: ExecutionContext,
|
||||
) {
|
||||
const response = handleDemoRoutes(ws, request);
|
||||
if (response) {
|
||||
return response;
|
||||
}
|
||||
|
||||
if (request.headers.get("upgrade") === "websocket") {
|
||||
return ws.handleUpgrade(request, env, context);
|
||||
return ws.handleUpgrade(request as any, env, context);
|
||||
}
|
||||
|
||||
return new Response(await getIndexHTML(), {
|
||||
|
||||
@@ -5,15 +5,20 @@ import denoAdapter from "../../src/adapters/deno.ts";
|
||||
// @ts-ignore
|
||||
import type * as _Deno from "../types/lib.deno.d.ts";
|
||||
|
||||
import { createDemo, getIndexHTML } from "./_shared.ts";
|
||||
import { createDemo, getIndexHTML, handleDemoRoutes } from "./_shared.ts";
|
||||
|
||||
const ws = createDemo(denoAdapter);
|
||||
|
||||
const port = Number.parseInt(Deno.env.get("PORT") || "") || 3001;
|
||||
|
||||
Deno.serve({ hostname: "localhost", port }, async (req, info) => {
|
||||
if (req.headers.get("upgrade") === "websocket") {
|
||||
return ws.handleUpgrade(req, info);
|
||||
Deno.serve({ hostname: "localhost", port }, async (request, info) => {
|
||||
const response = handleDemoRoutes(ws, request);
|
||||
if (response) {
|
||||
return response;
|
||||
}
|
||||
|
||||
if (request.headers.get("upgrade") === "websocket") {
|
||||
return ws.handleUpgrade(request, info);
|
||||
}
|
||||
return new Response(await getIndexHTML(), {
|
||||
headers: { "Content-Type": "text/html" },
|
||||
|
||||
@@ -88,4 +88,26 @@ export function wsTests(
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
test("get peers from adapter", async () => {
|
||||
await wsConnect(getURL());
|
||||
await wsConnect(getURL());
|
||||
const response = await fetch(getURL().replace("ws", "http") + "peers");
|
||||
const { peers } = (await response.json()) as any;
|
||||
expect(peers.length).toBe(2);
|
||||
});
|
||||
|
||||
test("get peers from peer", async () => {
|
||||
const ws1 = await wsConnect(getURL(), { skip: 1 });
|
||||
const ws2 = await wsConnect(getURL(), { skip: 1 });
|
||||
if (opts.pubsub !== false) {
|
||||
ws1.skip(); // join message for ws2
|
||||
}
|
||||
await ws1.send("peers");
|
||||
await ws2.send("peers");
|
||||
const { peers: peers1 } = await ws1.next();
|
||||
const { peers: peers2 } = await ws2.next();
|
||||
expect(peers1.length).toBe(2);
|
||||
expect(peers1).toMatchObject(peers2);
|
||||
});
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user