mirror of
https://github.com/LukeHagar/crossws.git
synced 2025-12-10 04:19:37 +00:00
feat: pub/sub support for bun and uws
This commit is contained in:
@@ -18,7 +18,9 @@ export function createDemo<T extends WebSocketAdapter>(
|
||||
);
|
||||
},
|
||||
open(peer) {
|
||||
peer.send(`Hello ${peer}`);
|
||||
peer.send("Welcome to the server!");
|
||||
peer.subscribe("welcome");
|
||||
peer.publish("welcome", `New user joined! ${peer}`);
|
||||
},
|
||||
message(peer, message) {
|
||||
if (message.text() === "ping") {
|
||||
@@ -38,17 +40,11 @@ export function createDemo<T extends WebSocketAdapter>(
|
||||
const resolve: CrossWSOptions["resolve"] = (info) => {
|
||||
return {
|
||||
open: (peer) => {
|
||||
peer.send(
|
||||
JSON.stringify(
|
||||
{
|
||||
url: info.url,
|
||||
headers:
|
||||
info.headers && Object.fromEntries(new Headers(info.headers)),
|
||||
},
|
||||
undefined,
|
||||
2,
|
||||
),
|
||||
);
|
||||
peer.send({
|
||||
url: info.url,
|
||||
headers:
|
||||
info.headers && Object.fromEntries(new Headers(info.headers)),
|
||||
});
|
||||
},
|
||||
};
|
||||
};
|
||||
|
||||
45
src/_utils.ts
Normal file
45
src/_utils.ts
Normal file
@@ -0,0 +1,45 @@
|
||||
type BufferLike = string | Buffer | Uint8Array | ArrayBuffer;
|
||||
|
||||
export function toBufferLike(val: any): BufferLike {
|
||||
if (val === undefined || val === null) {
|
||||
return "";
|
||||
}
|
||||
|
||||
if (typeof val === "string") {
|
||||
return val;
|
||||
}
|
||||
|
||||
if (isPlainObject(val)) {
|
||||
return JSON.stringify(val);
|
||||
}
|
||||
|
||||
return val;
|
||||
}
|
||||
|
||||
// Forked from sindresorhus/is-plain-obj (MIT)
|
||||
// Copyright (c) Sindre Sorhus <sindresorhus@gmail.com> (https://sindresorhus.com)
|
||||
// From https://github.com/unjs/defu/blob/main/src/_utils.ts
|
||||
export function isPlainObject(value: unknown): boolean {
|
||||
if (value === null || typeof value !== "object") {
|
||||
return false;
|
||||
}
|
||||
const prototype = Object.getPrototypeOf(value);
|
||||
|
||||
if (
|
||||
prototype !== null &&
|
||||
prototype !== Object.prototype &&
|
||||
Object.getPrototypeOf(prototype) !== null
|
||||
) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (Symbol.iterator in value) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (Symbol.toStringTag in value) {
|
||||
return Object.prototype.toString.call(value) === "[object Module]";
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
@@ -2,11 +2,11 @@
|
||||
|
||||
import type { WebSocketHandler, ServerWebSocket, Server } from "bun";
|
||||
|
||||
import { WebSocketMessage } from "../message";
|
||||
import { WebSocketError } from "../error";
|
||||
import { WSMessage } from "../message";
|
||||
import { WSPeer } from "../peer";
|
||||
import { defineWebSocketAdapter } from "../adapter";
|
||||
import { CrossWSOptions, createCrossWS } from "../crossws";
|
||||
import { toBufferLike } from "../_utils";
|
||||
|
||||
export interface AdapterOptions extends CrossWSOptions {}
|
||||
|
||||
@@ -50,7 +50,7 @@ export default defineWebSocketAdapter<Adapter, AdapterOptions>(
|
||||
message: (ws, message) => {
|
||||
const peer = getWSPeer(ws);
|
||||
crossws.$("bun:message", peer, ws, message);
|
||||
crossws.message(peer, new WebSocketMessage(message));
|
||||
crossws.message(peer, new WSMessage(message));
|
||||
},
|
||||
open: (ws) => {
|
||||
const peer = getWSPeer(ws);
|
||||
@@ -102,8 +102,23 @@ class BunWSPeer extends WSPeer<{
|
||||
return this.ctx.bun.ws.data.req?.headers || new Headers();
|
||||
}
|
||||
|
||||
send(message: string | ArrayBuffer) {
|
||||
this.ctx.bun.ws.send(message);
|
||||
return 0;
|
||||
send(message: any, options?: { compress?: boolean }) {
|
||||
return this.ctx.bun.ws.send(toBufferLike(message), options?.compress);
|
||||
}
|
||||
|
||||
publish(topic: string, message: any, options?: { compress?: boolean }) {
|
||||
return this.ctx.bun.ws.publish(
|
||||
topic,
|
||||
toBufferLike(message),
|
||||
options?.compress,
|
||||
);
|
||||
}
|
||||
|
||||
subscribe(topic: string): void {
|
||||
this.ctx.bun.ws.subscribe(topic);
|
||||
}
|
||||
|
||||
unsubscribe(topic: string): void {
|
||||
this.ctx.bun.ws.unsubscribe(topic);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,9 +4,10 @@ import type * as _cf from "@cloudflare/workers-types";
|
||||
|
||||
import { WSPeer } from "../peer";
|
||||
import { defineWebSocketAdapter } from "../adapter.js";
|
||||
import { WebSocketMessage } from "../message";
|
||||
import { WSMessage } from "../message";
|
||||
import { WebSocketError } from "../error";
|
||||
import { CrossWSOptions, createCrossWS } from "../crossws";
|
||||
import { toBufferLike } from "../_utils";
|
||||
|
||||
type Env = Record<string, any>;
|
||||
|
||||
@@ -48,7 +49,7 @@ export default defineWebSocketAdapter<Adapter, AdapterOptions>(
|
||||
|
||||
server.addEventListener("message", (event) => {
|
||||
crossws.$("cloudflare:message", peer, event);
|
||||
crossws.message(peer, new WebSocketMessage(event.data));
|
||||
crossws.message(peer, new WSMessage(event.data));
|
||||
});
|
||||
|
||||
server.addEventListener("error", (event) => {
|
||||
@@ -100,8 +101,8 @@ class CloudflarePeer extends WSPeer<{
|
||||
return this.ctx.cloudflare.client.readyState as -1 | 0 | 1 | 2 | 3;
|
||||
}
|
||||
|
||||
send(message: string | ArrayBuffer) {
|
||||
this.ctx.cloudflare.server.send(message);
|
||||
send(message: any) {
|
||||
this.ctx.cloudflare.server.send(toBufferLike(message));
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,11 +2,12 @@
|
||||
// https://deno.land/api?s=Deno.upgradeWebSocket
|
||||
// https://examples.deno.land/http-server-websocket
|
||||
|
||||
import { WebSocketMessage } from "../message";
|
||||
import { WSMessage } from "../message";
|
||||
import { WebSocketError } from "../error";
|
||||
import { WSPeer } from "../peer";
|
||||
import { defineWebSocketAdapter } from "../adapter.js";
|
||||
import { CrossWSOptions, createCrossWS } from "../crossws";
|
||||
import { toBufferLike } from "../_utils";
|
||||
|
||||
export interface AdapterOptions extends CrossWSOptions {}
|
||||
|
||||
@@ -18,6 +19,8 @@ declare global {
|
||||
const Deno: typeof import("@deno/types").Deno;
|
||||
}
|
||||
|
||||
type WebSocketUpgrade = import("@deno/types").Deno.WebSocketUpgrade;
|
||||
|
||||
export default defineWebSocketAdapter<Adapter, AdapterOptions>(
|
||||
(hooks, options = {}) => {
|
||||
const crossws = createCrossWS(hooks, options);
|
||||
@@ -43,7 +46,7 @@ export default defineWebSocketAdapter<Adapter, AdapterOptions>(
|
||||
});
|
||||
upgrade.socket.addEventListener("message", (event) => {
|
||||
crossws.$("deno:message", peer, event);
|
||||
crossws.message(peer, new WebSocketMessage(event.data));
|
||||
crossws.message(peer, new WSMessage(event.data));
|
||||
});
|
||||
upgrade.socket.addEventListener("close", () => {
|
||||
crossws.$("deno:close", peer);
|
||||
@@ -63,9 +66,10 @@ export default defineWebSocketAdapter<Adapter, AdapterOptions>(
|
||||
);
|
||||
|
||||
class DenoWSPeer extends WSPeer<{
|
||||
deno: { ws: any; req: Request };
|
||||
deno: { ws: WebSocketUpgrade["socket"]; req: Request };
|
||||
}> {
|
||||
get id() {
|
||||
// @ts-expect-error types missing
|
||||
return this.ctx.deno.ws.remoteAddress;
|
||||
}
|
||||
|
||||
@@ -81,8 +85,8 @@ class DenoWSPeer extends WSPeer<{
|
||||
return this.ctx.deno.req.headers || new Headers();
|
||||
}
|
||||
|
||||
send(message: string | ArrayBuffer) {
|
||||
this.ctx.deno.ws.send(message);
|
||||
send(message: any) {
|
||||
this.ctx.deno.ws.send(toBufferLike(message));
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -11,10 +11,11 @@ import type {
|
||||
WebSocket as WebSocketT,
|
||||
} from "../../types/ws";
|
||||
import { WSPeer } from "../peer";
|
||||
import { WebSocketMessage } from "../message";
|
||||
import { WSMessage } from "../message";
|
||||
import { WebSocketError } from "../error";
|
||||
import { defineWebSocketAdapter } from "../adapter";
|
||||
import { CrossWSOptions, createCrossWS } from "../crossws";
|
||||
import { toBufferLike } from "../_utils";
|
||||
|
||||
export interface AdapterOptions extends CrossWSOptions {
|
||||
wss?: WebSocketServer;
|
||||
@@ -46,7 +47,7 @@ export default defineWebSocketAdapter<Adapter, AdapterOptions>(
|
||||
if (Array.isArray(data)) {
|
||||
data = Buffer.concat(data);
|
||||
}
|
||||
crossws.message(peer, new WebSocketMessage(data, isBinary));
|
||||
crossws.message(peer, new WSMessage(data, isBinary));
|
||||
});
|
||||
ws.on("error", (error: Error) => {
|
||||
crossws.$("node:error", peer, error);
|
||||
@@ -137,8 +138,12 @@ class NodeWSPeer extends WSPeer<{
|
||||
return this.ctx.node.ws.readyState;
|
||||
}
|
||||
|
||||
send(message: string, compress?: boolean) {
|
||||
this.ctx.node.ws.send(message, { compress });
|
||||
send(message: any, options?: { compress?: boolean; binary?: boolean }) {
|
||||
this.ctx.node.ws.send(toBufferLike(message), {
|
||||
compress: options?.compress,
|
||||
binary: options?.binary,
|
||||
...options,
|
||||
});
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,9 +8,10 @@ import type {
|
||||
HttpResponse,
|
||||
} from "uWebSockets.js";
|
||||
import { WSPeer } from "../peer";
|
||||
import { WebSocketMessage } from "../message";
|
||||
import { WSMessage } from "../message";
|
||||
import { defineWebSocketAdapter } from "../adapter";
|
||||
import { CrossWSOptions, createCrossWS } from "../crossws";
|
||||
import { toBufferLike } from "../_utils";
|
||||
|
||||
type UserData = {
|
||||
_peer?: any;
|
||||
@@ -67,7 +68,7 @@ export default defineWebSocketAdapter<Adapter, AdapterOptions>(
|
||||
message(ws, message, isBinary) {
|
||||
const peer = getWSPeer(ws);
|
||||
crossws.$("uws:message", peer, ws, message, isBinary);
|
||||
const msg = new WebSocketMessage(message, isBinary);
|
||||
const msg = new WSMessage(message, isBinary);
|
||||
crossws.message(peer, msg);
|
||||
},
|
||||
open(ws) {
|
||||
@@ -166,8 +167,24 @@ class UWSWSPeer extends WSPeer<{
|
||||
return this._headers;
|
||||
}
|
||||
|
||||
send(message: string, compress?: boolean) {
|
||||
this.ctx.uws.ws.send(message, false, compress);
|
||||
send(message: any, options?: { compress?: boolean; binary?: boolean }) {
|
||||
return this.ctx.uws.ws.send(
|
||||
toBufferLike(message),
|
||||
options?.binary,
|
||||
options?.compress,
|
||||
);
|
||||
}
|
||||
|
||||
subscribe(topic: string): void {
|
||||
this.ctx.uws.ws.subscribe(topic);
|
||||
}
|
||||
|
||||
publish(
|
||||
topic: string,
|
||||
message: string,
|
||||
options?: { compress?: boolean; binary?: boolean },
|
||||
) {
|
||||
this.ctx.uws.ws.publish(topic, message, options?.binary, options?.compress);
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import { WebSocketError } from "./error";
|
||||
import type { WebSocketMessage } from "./message";
|
||||
import type { WSMessage } from "./message";
|
||||
import type { WSPeer, WSRequest } from "./peer";
|
||||
|
||||
type MaybePromise<T> = T | Promise<T>;
|
||||
@@ -28,7 +28,7 @@ export interface WebSocketHooks {
|
||||
upgrade: (req: WSRequest) => MaybePromise<void | { headers?: HeadersInit }>;
|
||||
|
||||
/** A message is received */
|
||||
message: (peer: WSPeer, message: WebSocketMessage) => MaybePromise<void>;
|
||||
message: (peer: WSPeer, message: WSMessage) => MaybePromise<void>;
|
||||
|
||||
/** A socket is opened */
|
||||
open: (peer: WSPeer) => MaybePromise<void>;
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
export class WebSocketMessage {
|
||||
import { toBufferLike } from "./_utils";
|
||||
|
||||
export class WSMessage {
|
||||
constructor(
|
||||
public readonly rawData: string | ArrayBuffer | Uint8Array,
|
||||
public readonly rawData: any,
|
||||
public readonly isBinary?: boolean,
|
||||
) {}
|
||||
|
||||
@@ -8,14 +10,18 @@ export class WebSocketMessage {
|
||||
if (typeof this.rawData === "string") {
|
||||
return this.rawData;
|
||||
}
|
||||
return new TextDecoder().decode(this.rawData);
|
||||
const buff = toBufferLike(this.rawData);
|
||||
if (typeof buff === "string") {
|
||||
return buff;
|
||||
}
|
||||
return new TextDecoder().decode(buff);
|
||||
}
|
||||
|
||||
toString() {
|
||||
return `<WebSocketMessage: ${this.text()}>`;
|
||||
return this.text();
|
||||
}
|
||||
|
||||
[Symbol.for("nodejs.util.inspect.custom")]() {
|
||||
return this.toString();
|
||||
return this.text();
|
||||
}
|
||||
}
|
||||
|
||||
25
src/peer.ts
25
src/peer.ts
@@ -1,7 +1,9 @@
|
||||
import { WSMessage } from "./message";
|
||||
|
||||
// https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/readyState
|
||||
type ReadyState = 0 | 1 | 2 | 3;
|
||||
const ReadyStateMap = {
|
||||
"-1": "unkown",
|
||||
"-1": "unknown",
|
||||
0: "connecting",
|
||||
1: "open",
|
||||
2: "closing",
|
||||
@@ -14,6 +16,8 @@ export interface WSRequest {
|
||||
}
|
||||
|
||||
export abstract class WSPeer<AdapterContext = any> implements WSRequest {
|
||||
_subscriptions: Set<string> = new Set();
|
||||
|
||||
constructor(public ctx: AdapterContext) {}
|
||||
|
||||
get id(): string | undefined {
|
||||
@@ -32,13 +36,22 @@ export abstract class WSPeer<AdapterContext = any> implements WSRequest {
|
||||
return -1;
|
||||
}
|
||||
|
||||
abstract send(
|
||||
message: string | ArrayBuffer | Uint8Array,
|
||||
compress?: boolean,
|
||||
): number;
|
||||
abstract send(message: any, options?: { compress?: boolean }): number;
|
||||
|
||||
publish(topic: string, message: any, options?: { compress?: boolean }) {
|
||||
// noop
|
||||
}
|
||||
|
||||
subscribe(topic: string) {
|
||||
this._subscriptions.add(topic);
|
||||
}
|
||||
|
||||
unsubscribe(topic: string) {
|
||||
this._subscriptions.delete(topic);
|
||||
}
|
||||
|
||||
toString() {
|
||||
return `${this.id || ""}${this.readyState === 1 ? "" : ` [${ReadyStateMap[this.readyState]}]`}`;
|
||||
return `${this.id || ""}${this.readyState === 1 || this.readyState === -1 ? "" : ` [${ReadyStateMap[this.readyState]}]`}`;
|
||||
}
|
||||
|
||||
[Symbol.for("nodejs.util.inspect.custom")]() {
|
||||
|
||||
Reference in New Issue
Block a user