rembrembdocs

Using Subscriptions

tip

Adding a subscription procedure

server/router.ts

tsx

import { EventEmitter } from 'events';

import { initTRPC } from '@trpc/server';

import { observable } from '@trpc/server/observable';

import { z } from 'zod';

// create a global event emitter (could be replaced by redis, etc)

const ee = new EventEmitter();

const t = initTRPC.create();

export const appRouter = t.router({

onAdd: t.procedure.subscription(() => {

``// return an `observable` with a callback which is triggered immediately``

`return observable<Post>((emit) => {`

  `const onAdd = (data: Post) => {`

    `// emit data to client`

    `emit.next(data);`

  `};`

  ``// trigger `onAdd()` when `add` is triggered in our event emitter``

  `ee.on('add', onAdd);`

  `// unsubscribe function when client disconnects or stops subscribing`

  `return () => {`

    `ee.off('add', onAdd);`

  `};`

`});`

}),

add: t.procedure

`.input(`

  `z.object({`

    `id: z.string().uuid().optional(),`

    `text: z.string().min(1),`

  `}),`

`)`

`.mutation(async (opts) => {`

  `const post = { ...opts.input }; /* [..] add to db */`

  `ee.emit('add', post);`

  `return post;`

`}),`

});

Creating a WebSocket-server

bash

yarn add ws

server/wsServer.ts

ts

import { applyWSSHandler } from '@trpc/server/adapters/ws';

import ws from 'ws';

import { appRouter } from './routers/app';

import { createContext } from './trpc';

const wss = new ws.Server({

port: 3001,

});

const handler = applyWSSHandler({ wss, router: appRouter, createContext });

wss.on('connection', (ws) => {

console.log(`➕➕ Connection (${wss.clients.size})`);

ws.once('close', () => {

``console.log(`➖➖ Connection (${wss.clients.size})`);``

});

});

console.log('✅ WebSocket Server listening on ws://localhost:3001');

process.on('SIGTERM', () => {

console.log('SIGTERM');

handler.broadcastReconnectNotification();

wss.close();

});

Setting TRPCClient to use WebSockets

tip

You can use Links to route queries and/or mutations to HTTP transport and subscriptions over WebSockets.

client.ts

tsx

import { createTRPCProxyClient, createWSClient, wsLink } from '@trpc/client';

import type { AppRouter } from '../path/to/server/trpc';

// create persistent WebSocket connection

const wsClient = createWSClient({

url: `ws://localhost:3001`,

});

// configure TRPCClient to use WebSockets transport

const client = createTRPCProxyClient<AppRouter>({

links: [

`wsLink({`

  `client: wsClient,`

`}),`

],

});

Using React

See /examples/next-prisma-starter-websockets.

WebSockets RPC Specification

You can read more details by drilling into the TypeScript definitions:

query / mutation

Request

ts

{

id: number | string;

jsonrpc?: '2.0'; // optional

method: 'query' | 'mutation';

params: {

`path: string;`

`input?: unknown; // <-- pass input of procedure, serialized by transformer`

};

}

Response

... below, or an error.

ts

{

id: number | string;

jsonrpc?: '2.0'; // only defined if included in request

result: {

`type: 'data'; // always 'data' for mutation / queries`

`data: TOutput; // output from procedure`

}

}

subscription / subscription.stop

Start a subscription

ts

{

id: number | string;

jsonrpc?: '2.0';

method: 'subscription';

params: {

`path: string;`

`input?: unknown; // <-- pass input of procedure, serialized by transformer`

};

}

To cancel a subscription, call subscription.stop

ts

{

id: number | string; // <-- id of your created subscription

jsonrpc?: '2.0';

method: 'subscription.stop';

}

Subscription response shape

... below, or an error.

ts

{

id: number | string;

jsonrpc?: '2.0';

result: (

`| {`

    `type: 'data';`

    `data: TData; // subscription emitted data`

  `}`

`| {`

    `type: 'started'; // subscription started`

  `}`

`| {`

    `type: 'stopped'; // subscription stopped`

  `}`

)

}

Errors

See https://www.jsonrpc.org/specification#error_object or Error Formatting.

Notifications from Server to Client

{ id: null, type: 'reconnect' }

Tells clients to reconnect before shutting down the server. Invoked by wssHandler.broadcastReconnectNotification().