feat: global publish (#61)

This commit is contained in:
Pooya Parsa
2024-08-06 18:12:12 +02:00
committed by GitHub
parent 7fa9976584
commit 31f759f49d
17 changed files with 76 additions and 20 deletions

View File

@@ -1,3 +1,6 @@
import type { Peer } from "./peer.ts";
import { AdapterInstance } from "./types.ts";
type BufferLike = string | Buffer | Uint8Array | ArrayBuffer;
export function toBufferLike(val: any): BufferLike {
@@ -57,3 +60,16 @@ export function isPlainObject(value: unknown): boolean {
return true;
}
export function adapterUtils(peers: Set<Peer>) {
return {
peers,
publish(topic: string, message: any, options) {
const firstPeer = peers.values().next().value as Peer;
if (firstPeer) {
firstPeer.send(message, options);
firstPeer.publish(topic, message, options);
}
},
} satisfies AdapterInstance;
}

View File

@@ -9,7 +9,7 @@ import {
defineWebSocketAdapter,
} from "../types";
import { AdapterHookable } from "../hooks";
import { toBufferLike } from "../_utils";
import { adapterUtils, toBufferLike } from "../_utils";
export interface BunAdapter extends AdapterInstance {
websocket: WebSocketHandler<ContextData>;
@@ -30,7 +30,7 @@ export default defineWebSocketAdapter<BunAdapter, BunOptions>(
const hooks = new AdapterHookable(options);
const peers = new Set<BunPeer>();
return {
peers,
...adapterUtils(peers),
async handleUpgrade(request, server) {
const res = await hooks.callHook("upgrade", request);
if (res instanceof Response) {

View File

@@ -10,7 +10,7 @@ import {
import { Peer } from "../peer";
import { Message } from "../message";
import { AdapterHookable } from "../hooks";
import { toBufferLike } from "../_utils";
import { adapterUtils, toBufferLike } from "../_utils";
declare class DurableObjectPub extends DurableObject {
public ctx: DurableObject["ctx"];
@@ -65,7 +65,7 @@ export default defineWebSocketAdapter<
const hooks = new AdapterHookable(opts);
const peers = new Set<CloudflareDurablePeer>();
return {
peers,
...adapterUtils(peers),
handleUpgrade: async (req, env, _context) => {
const bindingName = opts?.bindingName ?? "$DurableObject";
const instanceName = opts?.instanceName ?? "crossws";

View File

@@ -11,7 +11,7 @@ import {
import { Message } from "../message";
import { WSError } from "../error";
import { AdapterHookable } from "../hooks.js";
import { toBufferLike } from "../_utils";
import { adapterUtils, toBufferLike } from "../_utils";
declare const WebSocketPair: typeof _cf.WebSocketPair;
declare const Response: typeof _cf.Response;
@@ -31,7 +31,7 @@ export default defineWebSocketAdapter<CloudflareAdapter, CloudflareOptions>(
const hooks = new AdapterHookable(options);
const peers = new Set<CloudflarePeer>();
return {
peers,
...adapterUtils(peers),
handleUpgrade: async (request, env, context) => {
const res = await hooks.callHook(
"upgrade",

View File

@@ -11,7 +11,7 @@ import {
defineWebSocketAdapter,
} from "../types.ts";
import { AdapterHookable } from "../hooks.ts";
import { toBufferLike } from "../_utils.ts";
import { adapterUtils, toBufferLike } from "../_utils.ts";
export interface DenoAdapter extends AdapterInstance {
handleUpgrade(req: Request, info: ServeHandlerInfo): Promise<Response>;
@@ -31,7 +31,7 @@ export default defineWebSocketAdapter<DenoAdapter, DenoOptions>(
const hooks = new AdapterHookable(options);
const peers = new Set<DenoPeer>();
return {
peers,
...adapterUtils(peers),
handleUpgrade: async (request, info) => {
const res = await hooks.callHook("upgrade", request);
if (res instanceof Response) {

View File

@@ -19,7 +19,7 @@ import {
defineWebSocketAdapter,
} from "../types";
import { AdapterHookable } from "../hooks";
import { toBufferLike } from "../_utils";
import { adapterUtils, toBufferLike } from "../_utils";
type AugmentedReq = IncomingMessage & { _upgradeHeaders?: HeadersInit };
@@ -103,7 +103,7 @@ export default defineWebSocketAdapter<NodeAdapter, NodeOptions>(
});
return {
peers,
...adapterUtils(peers),
handleUpgrade: async (req, socket, head) => {
const res = await hooks.callHook("upgrade", new NodeReqProxy(req));
if (res instanceof Response) {

View File

@@ -16,7 +16,7 @@ import {
defineWebSocketAdapter,
} from "../types";
import { AdapterHookable } from "../hooks";
import { toBufferLike } from "../_utils";
import { adapterUtils, toBufferLike } from "../_utils";
type UserData = {
_peer?: any;
@@ -50,7 +50,7 @@ export default defineWebSocketAdapter<UWSAdapter, UWSOptions>(
const hooks = new AdapterHookable(options);
const peers = new Set<UWSPeer>();
return {
peers,
...adapterUtils(peers),
websocket: {
...options.uws,
close(ws, code, message) {

View File

@@ -6,6 +6,7 @@ import type { Peer } from "./peer.ts";
export interface AdapterInstance {
readonly peers: Set<Peer>;
readonly publish: Peer["publish"];
}
export interface AdapterOptions {

View File

@@ -103,7 +103,7 @@ export function wsConnect(
return Object.assign(connectPromise, res) as Promise<typeof res>;
}
export function wsTestsExec(cmd: string, opts?: Parameters<typeof wsTests>[1]) {
export function wsTestsExec(cmd: string, opts: Parameters<typeof wsTests>[1]) {
let childProc: ExecaRes;
let url: string;
beforeAll(async () => {

View File

@@ -2,5 +2,5 @@ import { describe } from "vitest";
import { wsTestsExec } from "../_utils";
describe("bun", () => {
wsTestsExec("bun run ./bun.ts", {});
wsTestsExec("bun run ./bun.ts", { adapter: "bun" });
});

View File

@@ -4,6 +4,6 @@ import { wsTestsExec } from "../_utils";
describe("cloudflare-durable", () => {
wsTestsExec(
"wrangler dev -c ./wrangler-durable.toml --inspector-port 0 --port $PORT",
{},
{ adapter: "cloudflare-durable" },
);
});

View File

@@ -4,6 +4,6 @@ import { wsTestsExec } from "../_utils";
describe("cloudflare", () => {
wsTestsExec(
"wrangler dev -c ./wrangler.toml --inspector-port 0 --port $PORT",
{ pubsub: false },
{ adapter: "cloudflare", pubsub: false },
);
});

View File

@@ -2,5 +2,5 @@ import { describe } from "vitest";
import { wsTestsExec } from "../_utils";
describe("deno", () => {
wsTestsExec("deno run -A ./deno.ts", { resHeaders: false });
wsTestsExec("deno run -A ./deno.ts", { resHeaders: false, adapter: "deno" });
});

View File

@@ -19,6 +19,14 @@ describe("node", () => {
peers: [...ws.peers].map((p) => p.id),
}),
);
} else if (req.url!.startsWith("/publish")) {
const q = new URLSearchParams(req.url!.split("?")[1]);
const topic = q.get("topic") || "";
const message = q.get("message") || "";
if (topic && message) {
ws.publish(topic, message);
return res.end("published");
}
}
res.end("ok");
});
@@ -34,5 +42,7 @@ describe("node", () => {
await new Promise<void>((resolve) => server.close(() => resolve()));
});
wsTests(() => url, {});
wsTests(() => url, {
adapter: "node",
});
});

View File

@@ -24,6 +24,14 @@ describe("uws", () => {
const url = req.getUrl();
if (url === "/peers") {
resBody = JSON.stringify({ peers: [...ws.peers].map((p) => p.id) });
} else if (url === "/publish") {
const q = new URLSearchParams(req.getQuery());
const topic = q.get("topic") || "";
const message = q.get("message") || "";
if (topic && message) {
ws.publish(topic, message);
resBody = "published";
}
}
if (aborted) {
@@ -45,5 +53,7 @@ describe("uws", () => {
app.close();
});
wsTests(() => url, {});
wsTests(() => url, {
adapter: "uws",
});
});

View File

@@ -79,5 +79,10 @@ export function handleDemoRoutes(
peers: [...ws.peers].map((p) => p.id),
}),
);
} else if (url.pathname === "/publish") {
const topic = url.searchParams.get("topic") || "";
const message = url.searchParams.get("message") || "";
ws.publish(topic, message);
return new Response("published");
}
}

View File

@@ -3,7 +3,7 @@ import { wsConnect } from "./_utils";
export function wsTests(
getURL: () => string,
opts: { pubsub?: boolean; resHeaders?: boolean } = {},
opts: { adapter: string; pubsub?: boolean; resHeaders?: boolean },
) {
test("http works", async () => {
const response = await fetch(getURL().replace("ws", "http"));
@@ -110,4 +110,18 @@ export function wsTests(
expect(peers1.length).toBe(2);
expect(peers1).toMatchObject(peers2);
});
test.skipIf(opts.adapter.startsWith("cloudflare"))(
"publish to all peers from adapter",
async () => {
const ws1 = await wsConnect(getURL(), { skip: 1 });
const ws2 = await wsConnect(getURL(), { skip: 1 });
ws1.skip(); // join message for ws2
await fetch(
getURL().replace("ws", "http") + `publish?topic=chat&message=ping`,
);
expect(await ws1.next()).toBe("ping");
expect(await ws2.next()).toBe("ping");
},
);
}