Skip to main content

WebSocket

@carno.js/websocket adds native, decorator-based WebSocket support to Carno.js. It is built on top of Bun's built-in WebSocket server — no external dependencies, no wrappers, no overhead.

It brings a Socket.io-inspired API to Bun: rooms, namespaces, server-side broadcasting, and a clean event-handler model via TypeScript decorators.

Installation

bun install @carno.js/websocket
info

@carno.js/websocket requires @carno.js/core and Bun >=1.0.0.

Quick Start

import { Carno } from '@carno.js/core';
import {
WebSocketPlugin,
Gateway,
OnOpen,
OnClose,
SubscribeMessage,
CarnoSocket,
} from '@carno.js/websocket';

@Gateway('/chat')
class ChatGateway {
@OnOpen()
onOpen(socket: CarnoSocket) {
socket.join('general');
socket.emit('welcome', { id: socket.id });
}

@SubscribeMessage('send')
onSend(socket: CarnoSocket, payload: { message: string }) {
socket.to('general').emit('message', {
from: socket.id,
text: payload.message,
});
}

@OnClose()
onClose(socket: CarnoSocket, code: number, reason: string) {
console.log(`${socket.id} disconnected (${code})`);
}
}

const app = new Carno();
app.use(WebSocketPlugin.create([ChatGateway]));
app.listen(3000);

// Clients connect to: ws://localhost:3000/chat

Registering the Plugin

WebSocketPlugin.create() wraps your gateway classes into a Carno plugin. Pass it to app.use() before calling app.listen().

import { WebSocketPlugin } from '@carno.js/websocket';

app.use(WebSocketPlugin.create([ChatGateway, NotificationsGateway]));

The second argument is an optional configuration object.

app.use(WebSocketPlugin.create([ChatGateway], {
idleTimeout: 60,
sendPings: true,
perMessageDeflate: true,
}));

Gateways

A gateway is a class decorated with @Gateway(path). Each gateway is responsible for WebSocket connections made to a specific HTTP upgrade path, which also acts as its namespace.

import { Gateway } from '@carno.js/websocket';

@Gateway('/chat') // Clients connect to ws://host/chat
class ChatGateway { }

@Gateway('/notifications') // Clients connect to ws://host/notifications
class NotificationsGateway { }
  • The path must start with /. If it doesn't, a leading / is added automatically.
  • Gateways are resolved through Dependency Injection, so you can inject services into their constructors.
import { Gateway, OnOpen, CarnoSocket } from '@carno.js/websocket';
import { UserService } from './user.service';

@Gateway('/chat')
class ChatGateway {
constructor(private readonly users: UserService) {}

@OnOpen()
async onOpen(socket: CarnoSocket) {
const user = await this.users.findById(socket.id);
socket.emit('welcome', { name: user.name });
}
}

Event Handlers

Decorate methods on your gateway to handle WebSocket lifecycle events.

@OnOpen()

Invoked when a new connection is established.

@OnOpen()
onOpen(socket: CarnoSocket) {
socket.join('lobby');
socket.emit('connected', { id: socket.id });
}

@OnClose()

Invoked when a connection is closed. Receives the close code and reason string.

@OnClose()
onClose(socket: CarnoSocket, code: number, reason: string) {
console.log(`Socket ${socket.id} closed with code ${code}: ${reason}`);
}

@OnMessage()

Invoked for every incoming message in its raw form, regardless of content type. Receives the raw payload — use this for binary data or when you want to handle raw strings yourself.

@OnMessage()
onMessage(socket: CarnoSocket, message: string | Buffer) {
console.log('Raw message:', message);
}
tip

For JSON-based event dispatch, prefer @SubscribeMessage() over @OnMessage(). When a JSON message arrives, both @OnMessage and any matching @SubscribeMessage handler are called.

@OnError()

Invoked when a WebSocket error occurs.

@OnError()
onError(socket: CarnoSocket, error: Error) {
console.error(`Socket ${socket.id} error:`, error.message);
}

@OnDrain()

Invoked when the send buffer has drained and the socket is ready to accept more data. Useful for implementing backpressure-aware senders.

@OnDrain()
onDrain(socket: CarnoSocket) {
console.log(`Socket ${socket.id} ready to receive more data`);
}

@SubscribeMessage(event)

Subscribes a method to a specific named event. The client must send messages following the JSON protocol:

{ "event": "send", "data": { "message": "Hello!" } }

The second argument to the handler is the data payload, already parsed.

@SubscribeMessage('send')
onSend(socket: CarnoSocket, payload: { message: string }) {
socket.to('general').emit('message', {
from: socket.id,
text: payload.message,
});
}

A single gateway can have multiple @SubscribeMessage handlers for different events:

@Gateway('/game')
class GameGateway {
@SubscribeMessage('move')
onMove(socket: CarnoSocket, payload: { x: number; y: number }) { ... }

@SubscribeMessage('attack')
onAttack(socket: CarnoSocket, payload: { targetId: string }) { ... }

@SubscribeMessage('chat')
onChat(socket: CarnoSocket, payload: { text: string }) { ... }
}

Multiple handlers for the same event are all called, in declaration order:

@SubscribeMessage('message')
logMessage(socket: CarnoSocket, data: any) {
this.logger.log(data);
}

@SubscribeMessage('message')
handleMessage(socket: CarnoSocket, data: any) {
// also called
}

CarnoSocket

Every handler receives a CarnoSocket instance — a thin, Socket.io-inspired wrapper around Bun's native ServerWebSocket.

Sending Messages

MethodDescription
socket.emit(event, data?)Send a named event as JSON: { event, data }
socket.send(message, compress?)Send a raw string, ArrayBuffer, or Uint8Array
socket.emit('notification', { title: 'New message', body: 'Hello!' });

socket.send('ping');
socket.send(new Uint8Array([1, 2, 3]));

Rooms

Rooms use Bun's built-in pub/sub. Subscribing a socket to a room allows broadcasting to all members.

MethodDescription
socket.join(room)Subscribe this socket to a room
socket.leave(room)Unsubscribe this socket from a room
socket.isSubscribed(room)Returns true if the socket is in the room
socket.roomsArray of rooms this socket currently belongs to
@OnOpen()
onOpen(socket: CarnoSocket) {
socket.join('general');
socket.join(`user:${socket.id}`);
}

Broadcasting to a Room

Use socket.to(room) to get a RoomBroadcaster that sends to all subscribers of that room except the sender, following the same semantics as Socket.io's to().

// Emit a named event to everyone in 'general'
socket.to('general').emit('message', { from: socket.id, text: 'Hi!' });

// Send a raw message to everyone in 'alerts'
socket.to('alerts').send('SYSTEM ALERT');

Publishing to a Topic

socket.publish() sends to all subscribers of a topic, including the sender (depending on publishToSelf config).

socket.publish('global', 'broadcast', { text: 'Hello, everyone!' });

Closing the Connection

socket.close();             // Clean close
socket.close(1000, 'Done'); // With code and reason

Socket Properties

PropertyTypeDescription
socket.idstringUnique UUID assigned at connection time
socket.namespacestringThe gateway path this socket connected to
socket.remoteAddressstringClient's remote IP address
socket.roomsstring[]List of rooms this socket is subscribed to
socket.readyStatenumberWebSocket ready state (03)
socket.rawServerWebSocketThe underlying Bun ServerWebSocket instance

RoomBroadcaster

RoomBroadcaster is returned by socket.to(room) and targets all subscribers of a room.

MethodDescription
broadcaster.emit(event, data?)Send a named JSON event to all room subscribers
broadcaster.send(message)Send a raw string/binary to all room subscribers
const broadcaster = socket.to('general');

broadcaster.emit('typing', { user: socket.id });
broadcaster.send('RAW_BROADCAST');

Rooms in Practice

Here is the typical pattern for a chat application:

@Gateway('/chat')
class ChatGateway {
@OnOpen()
onOpen(socket: CarnoSocket) {
socket.join('general');

// Notify others in the room
socket.to('general').emit('user:joined', { id: socket.id });

// Send the room list to the new connection
socket.emit('rooms', socket.rooms);
}

@SubscribeMessage('join')
onJoinRoom(socket: CarnoSocket, payload: { room: string }) {
socket.join(payload.room);
socket.to(payload.room).emit('user:joined', { id: socket.id });
}

@SubscribeMessage('leave')
onLeaveRoom(socket: CarnoSocket, payload: { room: string }) {
socket.leave(payload.room);
socket.to(payload.room).emit('user:left', { id: socket.id });
}

@SubscribeMessage('message')
onMessage(socket: CarnoSocket, payload: { room: string; text: string }) {
socket.to(payload.room).emit('message', {
from: socket.id,
text: payload.text,
});
}

@OnClose()
onClose(socket: CarnoSocket) {
socket.to('general').emit('user:left', { id: socket.id });
}
}

Namespaces

Each gateway path is its own namespace. Different gateways are completely isolated — they have separate event handlers and separate room scopes. A @SubscribeMessage('ping') in /chat will never be invoked for a message sent to /notifications.

@Gateway('/chat')
class ChatGateway {
@OnOpen()
onOpen(socket: CarnoSocket) {
// socket.namespace === '/chat'
}
}

@Gateway('/notifications')
class NotificationsGateway {
@OnOpen()
onOpen(socket: CarnoSocket) {
// socket.namespace === '/notifications'
}
}

// Register both with the plugin:
app.use(WebSocketPlugin.create([ChatGateway, NotificationsGateway]));

Server-Side Broadcasting

To broadcast from outside a WebSocket handler — in a controller, a service, a queue job — inject RoomManager.

import { Service } from '@carno.js/core';
import { RoomManager } from '@carno.js/websocket';

@Service()
class AlertService {
constructor(private readonly rooms: RoomManager) {}

sendAlert(message: string) {
// Broadcast a named event to all subscribers of 'alerts'
this.rooms.broadcast('alerts', 'alert', { message });
}

sendRaw(payload: string) {
this.rooms.broadcastRaw('alerts', payload);
}
}

RoomManager is automatically registered in the DI container when you call WebSocketPlugin.create(). You can inject it into any service or controller.

warning

RoomManager.broadcast() and broadcastRaw() require the server to be running. Call them after app.listen() — for example, inside HTTP handlers or lifecycle hooks, not during bootstrap.

Broadcasting from a Controller

import { Controller, Post, Body } from '@carno.js/core';
import { RoomManager } from '@carno.js/websocket';

@Controller('/api/notifications')
class NotificationsController {
constructor(private readonly rooms: RoomManager) {}

@Post('/broadcast')
send(@Body() body: { room: string; message: string }) {
this.rooms.broadcast(body.room, 'notification', { text: body.message });
return { sent: true };
}
}

RoomManager API

Method / PropertyDescription
broadcast(room, event, data?)Broadcast a named event to all subscribers of room
broadcastRaw(room, message)Broadcast a raw string/binary to all subscribers of room
pendingWebSocketsNumber of buffered (pending) WebSocket connections

Connection Statistics

NamespaceRegistry tracks the count of active connections per namespace. It is automatically updated by the plugin on every open/close event.

Inject it into any service to monitor connection state:

import { Service } from '@carno.js/core';
import { NamespaceRegistry } from '@carno.js/websocket';

@Service()
class StatsService {
constructor(private readonly registry: NamespaceRegistry) {}

getStats() {
return {
total: this.registry.getTotalConnections(),
namespaces: this.registry.getNamespaces(),
chatConnections: this.registry.getCount('/chat'),
};
}
}

NamespaceRegistry API

MethodReturnsDescription
getCount(namespace)numberActive connections for a specific namespace
getNamespaces()string[]All namespaces with at least one active connection
getTotalConnections()numberSum of active connections across all namespaces

Configuration

Pass a WebSocketPluginConfig object as the second argument to WebSocketPlugin.create().

app.use(WebSocketPlugin.create([ChatGateway], {
idleTimeout: 120,
maxPayloadLength: 16 * 1024 * 1024, // 16 MB
sendPings: true,
perMessageDeflate: true,
publishToSelf: false,
}));
OptionTypeDefaultDescription
perMessageDeflateboolean | objectfalseEnable per-message deflate compression. Pass { compress, decompress } for fine-grained control.
maxPayloadLengthnumber16777216Maximum allowed message size in bytes (16 MB).
idleTimeoutnumber120Idle connection timeout in seconds.
backpressureLimitnumber1048576Backpressure limit in bytes (1 MB).
closeOnBackpressureLimitbooleanfalseClose the connection when the backpressure limit is reached.
sendPingsbooleantrueSend periodic ping frames to keep connections alive.
publishToSelfbooleanfalseInclude the sender when using pub/sub.

Complete Example

A fully-featured chat server with rooms, authentication via DI, and server broadcasting:

// chat.gateway.ts
import { Gateway, OnOpen, OnClose, SubscribeMessage, CarnoSocket } from '@carno.js/websocket';
import { AuthService } from './auth.service';

@Gateway('/chat')
export class ChatGateway {
constructor(private readonly auth: AuthService) {}

@OnOpen()
async onOpen(socket: CarnoSocket) {
const user = await this.auth.resolveSocket(socket);

if (!user) {
socket.close(4001, 'Unauthorized');
return;
}

socket.join('general');
socket.to('general').emit('user:joined', { username: user.name });
socket.emit('welcome', { id: socket.id, rooms: socket.rooms });
}

@SubscribeMessage('join')
onJoin(socket: CarnoSocket, payload: { room: string }) {
socket.join(payload.room);
socket.to(payload.room).emit('user:joined', { id: socket.id });
}

@SubscribeMessage('leave')
onLeave(socket: CarnoSocket, payload: { room: string }) {
socket.to(payload.room).emit('user:left', { id: socket.id });
socket.leave(payload.room);
}

@SubscribeMessage('message')
onMessage(socket: CarnoSocket, payload: { room: string; text: string }) {
socket.to(payload.room).emit('message', {
from: socket.id,
room: payload.room,
text: payload.text,
at: new Date().toISOString(),
});
}

@OnClose()
onClose(socket: CarnoSocket) {
socket.to('general').emit('user:left', { id: socket.id });
}
}
// notifications.controller.ts
import { Controller, Post, Body } from '@carno.js/core';
import { RoomManager } from '@carno.js/websocket';

@Controller('/api/notifications')
export class NotificationsController {
constructor(private readonly rooms: RoomManager) {}

@Post('/announce')
announce(@Body() body: { room: string; message: string }) {
this.rooms.broadcast(body.room, 'announcement', { text: body.message });
return { ok: true };
}
}
// main.ts
import { Carno } from '@carno.js/core';
import { WebSocketPlugin } from '@carno.js/websocket';
import { ChatGateway } from './chat.gateway';
import { NotificationsController } from './notifications.controller';
import { AuthService } from './auth.service';

const app = new Carno();

app.use(WebSocketPlugin.create([ChatGateway], {
idleTimeout: 120,
sendPings: true,
}));

app.services([AuthService]);
app.controllers([NotificationsController]);

app.listen(3000);
// WS: ws://localhost:3000/chat
// HTTP: POST http://localhost:3000/api/notifications/announce

Client-Side Protocol

The client uses a simple JSON protocol to send named events:

const ws = new WebSocket('ws://localhost:3000/chat');

// Send a named event
ws.send(JSON.stringify({ event: 'message', data: { room: 'general', text: 'Hello!' } }));

// Receive events
ws.onmessage = (e) => {
const { event, data } = JSON.parse(e.data);
console.log(event, data);
};

For raw messages (non-JSON), use @OnMessage() on the server side — anything that is not valid JSON or lacks an event field is handled exclusively by @OnMessage() handlers.