feat: experimental sse adapter (#62)

This commit is contained in:
Pooya Parsa
2024-08-06 20:04:46 +02:00
committed by GitHub
parent 31f759f49d
commit d662e48d4a
9 changed files with 444 additions and 175 deletions

62
docs/2.adapters/sse.md Normal file
View File

@@ -0,0 +1,62 @@
---
icon: oui:token-event
---
# SSE
> Integrate CrossWS with [server-sent events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events).
If your deployment server is incapable of of handling WebSocket upgrades but support standard web API ([`Request`](https://developer.mozilla.org/en-US/docs/Web/API/Request) and [`Response`](https://developer.mozilla.org/en-US/docs/Web/API/Response)) you can integrate crossws to act as a one way (server to client) handler using [server-sent events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events).
> [!IMPORTANT]
> This is an experimental adapter and works only with a limited subset of CrossWS functionalities.
> [!IMPORTANT]
> Instead of [`WebSocket`](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket) client you need to use [`EventSource`](https://developer.mozilla.org/en-US/docs/Web/API/EventSource) as client to connect such server.
```ts
import sseAdapter from "crossws/adapters/sse";
const sse = sseAdapter({
hooks: {
upgrade(request) {
// Handle upgrade logic
// You can return a custom response to abort
// You can return { headers } to override default headers
},
open(peer) {
// Use this hook to send messages to peer
peer.send("hello!");
},
},
});
```
Inside your Web compatible server handler:
```js
async fetch(request) {
const url = new URL(request.url)
// Handle SSE
if (url.pathname === "/sse" && request.headers.get("accept") === "text/event-stream") {
return sse.fetch(request);
}
return new Response("server is up!")
}
```
In order to connect to the server, you need to use [`EventSource`](https://developer.mozilla.org/en-US/docs/Web/API/EventSource) as client:
```js
const ev = new EventSource("http://<server>/sse");
ev.addEventListener("message", (event) => {
console.log(event.data); // hello!
});
```
::read-more
See [`playground/sse.ts`](https://github.com/unjs/crossws/tree/main/playground/sse.ts) for demo and [`src/adapters/sse.ts`](https://github.com/unjs/crossws/tree/main/src/adapters/sse.ts) for implementation.
::

View File

@@ -75,6 +75,7 @@
"play:cf-durable": "wrangler dev --port 3001 -c test/fixture/wrangler-durable.toml", "play:cf-durable": "wrangler dev --port 3001 -c test/fixture/wrangler-durable.toml",
"play:deno": "deno run -A test/fixture/deno.ts", "play:deno": "deno run -A test/fixture/deno.ts",
"play:node": "jiti test/fixture/node.ts", "play:node": "jiti test/fixture/node.ts",
"play:sse": "bun test/fixture/sse.ts",
"play:uws": "jiti test/fixture/uws.ts", "play:uws": "jiti test/fixture/uws.ts",
"release": "pnpm test && pnpm build && changelogen --release && npm publish && git push --follow-tags", "release": "pnpm test && pnpm build && changelogen --release && npm publish && git push --follow-tags",
"test": "pnpm lint && pnpm test:types && vitest run", "test": "pnpm lint && pnpm test:types && vitest run",
@@ -87,6 +88,7 @@
"@cloudflare/workers-types": "^4.20240729.0", "@cloudflare/workers-types": "^4.20240729.0",
"@deno/types": "^0.0.1", "@deno/types": "^0.0.1",
"@types/bun": "^1.1.6", "@types/bun": "^1.1.6",
"@types/eventsource": "^1.1.15",
"@types/node": "^22.1.0", "@types/node": "^22.1.0",
"@types/web": "^0.0.153", "@types/web": "^0.0.153",
"@types/ws": "^8.5.12", "@types/ws": "^8.5.12",
@@ -96,6 +98,7 @@
"consola": "^3.2.3", "consola": "^3.2.3",
"eslint": "^9.8.0", "eslint": "^9.8.0",
"eslint-config-unjs": "^0.3.2", "eslint-config-unjs": "^0.3.2",
"eventsource": "^2.0.2",
"execa": "^9.3.0", "execa": "^9.3.0",
"get-port-please": "^3.1.2", "get-port-please": "^3.1.2",
"h3": "^1.12.0", "h3": "^1.12.0",

17
pnpm-lock.yaml generated
View File

@@ -20,6 +20,9 @@ importers:
'@types/bun': '@types/bun':
specifier: ^1.1.6 specifier: ^1.1.6
version: 1.1.6 version: 1.1.6
'@types/eventsource':
specifier: ^1.1.15
version: 1.1.15
'@types/node': '@types/node':
specifier: ^22.1.0 specifier: ^22.1.0
version: 22.1.0 version: 22.1.0
@@ -47,6 +50,9 @@ importers:
eslint-config-unjs: eslint-config-unjs:
specifier: ^0.3.2 specifier: ^0.3.2
version: 0.3.2(eslint@9.8.0)(typescript@5.5.4) version: 0.3.2(eslint@9.8.0)(typescript@5.5.4)
eventsource:
specifier: ^2.0.2
version: 2.0.2
execa: execa:
specifier: ^9.3.0 specifier: ^9.3.0
version: 9.3.0 version: 9.3.0
@@ -1110,6 +1116,9 @@ packages:
'@types/estree@1.0.5': '@types/estree@1.0.5':
resolution: {integrity: sha512-/kYRxGDLWzHOB7q+wtSUQlFrtcdUccpfy+X+9iMBpHK8QLLhx2wIPYuS5DYtR9Wa/YlZAbIovy7qVdB1Aq6Lyw==} resolution: {integrity: sha512-/kYRxGDLWzHOB7q+wtSUQlFrtcdUccpfy+X+9iMBpHK8QLLhx2wIPYuS5DYtR9Wa/YlZAbIovy7qVdB1Aq6Lyw==}
'@types/eventsource@1.1.15':
resolution: {integrity: sha512-XQmGcbnxUNa06HR3VBVkc9+A2Vpi9ZyLJcdS5dwaQQ/4ZMWFO+5c90FnMUpbtMZwB/FChoYHwuVg8TvkECacTA==}
'@types/mdast@3.0.15': '@types/mdast@3.0.15':
resolution: {integrity: sha512-LnwD+mUEfxWMa1QpDraczIn6k0Ee3SMicuYSSzS6ZYl2gKS09EClnJYGd8Du6rfc5r/GZEk5o1mRb8TaTj03sQ==} resolution: {integrity: sha512-LnwD+mUEfxWMa1QpDraczIn6k0Ee3SMicuYSSzS6ZYl2gKS09EClnJYGd8Du6rfc5r/GZEk5o1mRb8TaTj03sQ==}
@@ -1713,6 +1722,10 @@ packages:
resolution: {integrity: sha512-kVscqXk4OCp68SZ0dkgEKVi6/8ij300KBWTJq32P/dYeWTSwK41WyTxalN1eRmA5Z9UU/LX9D7FWSmV9SAYx6g==} resolution: {integrity: sha512-kVscqXk4OCp68SZ0dkgEKVi6/8ij300KBWTJq32P/dYeWTSwK41WyTxalN1eRmA5Z9UU/LX9D7FWSmV9SAYx6g==}
engines: {node: '>=0.10.0'} engines: {node: '>=0.10.0'}
eventsource@2.0.2:
resolution: {integrity: sha512-IzUmBGPR3+oUG9dUeXynyNmf91/3zUSJg1lCktzKw47OXuhco54U3r9B7O4XX+Rb1Itm9OZ2b0RkTs10bICOxA==}
engines: {node: '>=12.0.0'}
execa@5.1.1: execa@5.1.1:
resolution: {integrity: sha512-8uSpZZocAZRBAPIEINJj3Lo9HyGitllczc27Eh5YYojjMFMn8yHMDMaUHE2Jqfq05D/wucwI4JGURyXt1vchyg==} resolution: {integrity: sha512-8uSpZZocAZRBAPIEINJj3Lo9HyGitllczc27Eh5YYojjMFMn8yHMDMaUHE2Jqfq05D/wucwI4JGURyXt1vchyg==}
engines: {node: '>=10'} engines: {node: '>=10'}
@@ -3874,6 +3887,8 @@ snapshots:
'@types/estree@1.0.5': {} '@types/estree@1.0.5': {}
'@types/eventsource@1.1.15': {}
'@types/mdast@3.0.15': '@types/mdast@3.0.15':
dependencies: dependencies:
'@types/unist': 2.0.10 '@types/unist': 2.0.10
@@ -4712,6 +4727,8 @@ snapshots:
esutils@2.0.3: {} esutils@2.0.3: {}
eventsource@2.0.2: {}
execa@5.1.1: execa@5.1.1:
dependencies: dependencies:
cross-spawn: 7.0.3 cross-spawn: 7.0.3

108
src/adapters/sse.ts Normal file
View File

@@ -0,0 +1,108 @@
// https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events
import { WebSocketServer as _WebSocketServer } from "ws";
import { Peer } from "../peer";
import {
AdapterOptions,
AdapterInstance,
defineWebSocketAdapter,
} from "../types";
import { AdapterHookable } from "../hooks";
import { adapterUtils, toBufferLike } from "../_utils";
export interface SSEAdapter extends AdapterInstance {
fetch(req: Request): Promise<Response>;
}
export interface SSEOptions extends AdapterOptions {}
export default defineWebSocketAdapter<SSEAdapter, SSEOptions>(
(options = {}) => {
const hooks = new AdapterHookable(options);
const peers = new Set<SSEPeer>();
return {
...adapterUtils(peers),
fetch: async (request: Request) => {
const _res = await hooks.callHook("upgrade", request);
if (_res instanceof Response) {
return _res;
}
const peer = new SSEPeer({ peers, sse: { request, hooks } });
let headers: HeadersInit = {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
};
if (_res?.headers) {
headers = new Headers(headers);
for (const [key, value] of new Headers(_res.headers)) {
headers.set(key, value);
}
}
return new Response(peer._sseStream, { ..._res, headers });
},
};
},
);
class SSEPeer extends Peer<{
peers: Set<SSEPeer>;
sse: {
request: Request;
hooks: AdapterHookable;
};
}> {
_sseStream: ReadableStream;
_sseStreamController?: ReadableStreamDefaultController;
constructor(internal: SSEPeer["_internal"]) {
super(internal);
this._sseStream = new ReadableStream({
start: (controller) => {
this._sseStreamController = controller;
this._internal.sse.hooks.callHook("open", this);
},
cancel: () => {
this._internal.sse.hooks.callHook("close", this);
},
});
}
get url() {
return this._internal.sse.request.url;
}
get headers() {
return this._internal.sse.request.headers;
}
send(message: any) {
let data = toBufferLike(message);
if (typeof data !== "string") {
// eslint-disable-next-line unicorn/prefer-code-point
data = btoa(String.fromCharCode(...new Uint8Array(data)));
}
this._sseStreamController?.enqueue(`event: message\ndata: ${data}\n\n`);
return 0;
}
publish(topic: string, message: any) {
const data = toBufferLike(message);
for (const peer of this._internal.peers) {
if (peer !== this && peer._topics.has(topic)) {
peer._sseStreamController?.enqueue(data);
}
}
}
close() {
this._sseStreamController?.close();
}
terminate() {
this.close();
}
}

View File

@@ -103,7 +103,11 @@ export function wsConnect(
return Object.assign(connectPromise, res) as Promise<typeof res>; return Object.assign(connectPromise, res) as Promise<typeof res>;
} }
export function wsTestsExec(cmd: string, opts: Parameters<typeof wsTests>[1]) { export function wsTestsExec(
cmd: string,
opts: Parameters<typeof wsTests>[1],
tests = wsTests,
) {
let childProc: ExecaRes; let childProc: ExecaRes;
let url: string; let url: string;
beforeAll(async () => { beforeAll(async () => {
@@ -132,5 +136,5 @@ export function wsTestsExec(cmd: string, opts: Parameters<typeof wsTests>[1]) {
afterAll(async () => { afterAll(async () => {
await childProc.kill(); await childProc.kill();
}); });
wsTests(() => url, opts); tests(() => url, opts);
} }

19
test/adapters/sse.test.ts Normal file
View File

@@ -0,0 +1,19 @@
import { describe, test, expect } from "vitest";
import { wsTestsExec } from "../_utils";
import EventSource from "eventsource";
describe("sse", () => {
wsTestsExec("bun run ./sse.ts", { adapter: "sse" }, (getURL, opts) => {
test("connects to the server", async () => {
const url = getURL().replace("ws", "http");
const ev = new EventSource(url);
const messages: string[] = [];
ev.addEventListener("message", (event) => {
messages.push(event.data);
});
await new Promise((resolve) => ev.addEventListener("open", resolve));
ev.close();
expect(messages).toMatchObject(["Welcome to the server #1!"]);
});
});
});

View File

@@ -1,6 +1,7 @@
export default /* html */ ` export default function indexTemplate(opts: { sse?: boolean } = {}) {
<!doctype html> return /* html */ `
<html lang="en" data-theme="dark"> <!doctype html>
<html lang="en" data-theme="dark">
<head> <head>
<title>CrossWS Test Page</title> <title>CrossWS Test Page</title>
<script src="https://cdn.tailwindcss.com"></script> <script src="https://cdn.tailwindcss.com"></script>
@@ -17,8 +18,6 @@ export default /* html */ `
nextTick, nextTick,
} from "https://esm.sh/petite-vue@0.4.1"; } from "https://esm.sh/petite-vue@0.4.1";
let ws;
const store = reactive({ const store = reactive({
message: "", message: "",
messages: [], messages: [],
@@ -61,7 +60,8 @@ export default /* html */ `
format(); format();
}; };
const connect = async () => { let ws;
const connectWS = async () => {
const isSecure = location.protocol === "https:"; const isSecure = location.protocol === "https:";
const url = (isSecure ? "wss://" : "ws://") + location.host + "/_ws"; const url = (isSecure ? "wss://" : "ws://") + location.host + "/_ws";
if (ws) { if (ws) {
@@ -88,6 +88,35 @@ export default /* html */ `
log("ws", "Connected!"); log("ws", "Connected!");
}; };
let sse;
const connectSSE = async () => {
const url = "/sse";
if (sse) {
log("sse", "Closing previous connection before reconnecting...");
sse.close();
clear();
}
log("sse", "Connecting to", url, "...");
sse = new EventSource(url);
sse.addEventListener("message", async (event) => {
console.log(event)
const data = typeof event.data === "string" ? event.data : await event.data.text();
const { user = "system", message = "" } = data.startsWith("{")
? JSON.parse(data)
: { message: data };
log(
user,
typeof message === "string" ? message : JSON.stringify(message),
);
});
log("sse", "Connected!");
}
const connect = ${opts.sse ? "connectSSE" : "connectWS"};
const clear = () => { const clear = () => {
store.messages.splice(0, store.messages.length); store.messages.splice(0, store.messages.length);
log("system", "previous messages cleared"); log("system", "previous messages cleared");
@@ -187,5 +216,6 @@ export default /* html */ `
</div> </div>
</main> </main>
</body> </body>
</html>\ </html>
`.trim(); `.trim();
}

View File

@@ -1,7 +1,7 @@
import { Adapter, AdapterInstance, defineHooks } from "../../src/index.ts"; import { Adapter, AdapterInstance, defineHooks } from "../../src/index.ts";
export const getIndexHTML = () => export const getIndexHTML = (opts?: { sse?: boolean }) =>
import("./_index.html.ts").then((r) => r.default); import("./_index.html.ts").then((r) => r.default(opts));
export function createDemo<T extends Adapter<any, any>>( export function createDemo<T extends Adapter<any, any>>(
adapter: T, adapter: T,

26
test/fixture/sse.ts Normal file
View File

@@ -0,0 +1,26 @@
// You can run this demo using `npm run play:sse` in repo
import sseAdapter from "../../src/adapters/sse";
import { createDemo, getIndexHTML, handleDemoRoutes } from "./_shared";
const ws = createDemo(sseAdapter);
Bun.serve({
port: process.env.PORT || 3001,
hostname: "localhost",
async fetch(request) {
const response = handleDemoRoutes(ws, request);
if (response) {
return response;
}
// Handle SSE
if (request.headers.get("accept") === "text/event-stream") {
return ws.fetch(request);
}
return new Response(await getIndexHTML({ sse: true }), {
headers: { "Content-Type": "text/html" },
});
},
});