Learn how to intercept and modify WebSocket messages using Reflux middleware.
Reflux can intercept WebSocket messages in both directions:
- Send: Outgoing messages from the browser to the server
- Receive: Incoming messages from the server to the browser
This allows you to:
- Log WebSocket traffic for debugging
- Transform message formats
- Filter or block specific messages
- Add encryption/decryption
- Implement custom protocols
Unlike plugins which use the RefluxAPI, WebSocket middleware is created directly:
// This is a conceptual example - WebSocket middleware
// is typically set up at the transport initialization level
const websocketMiddleware = {
id: "com.example.ws-logger",
modifyWebSocketMessage: async (data, direction) => {
console.log(`WebSocket ${direction}:`, data);
return data; // Return unmodified data
}
};The direction parameter tells you which way the message is flowing:
"send"- Message going from browser to server"receive"- Message coming from server to browser
modifyWebSocketMessage: async (data, direction) => {
if (direction === "send") {
console.log("Outgoing:", data);
} else if (direction === "receive") {
console.log("Incoming:", data);
}
return data;
}WebSocket messages can be different types. Handle them accordingly:
modifyWebSocketMessage: async (data, direction) => {
if (typeof data === "string") {
// Text message
console.log("Text:", data);
return data.toUpperCase();
}
else if (data instanceof Blob) {
// Binary data as Blob
const text = await data.text();
console.log("Blob as text:", text);
return data;
}
else if (data instanceof ArrayBuffer) {
// Binary data as ArrayBuffer
const view = new Uint8Array(data);
console.log("ArrayBuffer bytes:", view);
return data;
}
return data;
}const wsLogger = {
id: "com.example.ws-logger",
modifyWebSocketMessage: async (data, direction) => {
const arrow = direction === "send" ? "β" : "β";
const timestamp = new Date().toISOString();
if (typeof data === "string") {
console.log(`[${timestamp}] ${arrow} ${data}`);
} else {
console.log(`[${timestamp}] ${arrow} [Binary: ${data.byteLength || data.size} bytes]`);
}
return data;
}
};const jsonTransformer = {
id: "com.example.json-transformer",
modifyWebSocketMessage: async (data, direction) => {
if (typeof data === "string") {
try {
const json = JSON.parse(data);
if (direction === "send") {
// Add metadata to outgoing messages
json.metadata = {
timestamp: Date.now(),
client: "reflux"
};
} else {
// Transform incoming messages
if (json.type === "notification") {
console.log("π’ Notification:", json.message);
}
}
return JSON.stringify(json);
} catch (e) {
// Not JSON, return as-is
return data;
}
}
return data;
}
};const messageFilter = {
id: "com.example.message-filter",
modifyWebSocketMessage: async (data, direction) => {
if (typeof data === "string" && direction === "receive") {
try {
const json = JSON.parse(data);
// Block spam messages
if (json.type === "spam") {
console.log("π« Blocked spam message");
return ""; // Return empty to effectively block
}
// Filter sensitive data
if (json.password) {
delete json.password;
return JSON.stringify(json);
}
} catch (e) {
// Not JSON
}
}
return data;
}
};const encryptionMiddleware = {
id: "com.example.encryption",
modifyWebSocketMessage: async (data, direction) => {
if (typeof data !== "string") return data;
// Simple XOR cipher (use proper crypto in production!)
const key = "my-secret-key";
const xorCipher = (text, key) => {
return text.split("").map((char, i) => {
return String.fromCharCode(
char.charCodeAt(0) ^ key.charCodeAt(i % key.length)
);
}).join("");
};
if (direction === "send") {
// Encrypt outgoing
const encrypted = btoa(xorCipher(data, key));
console.log("π Encrypted message");
return encrypted;
} else {
// Decrypt incoming
try {
const decrypted = xorCipher(atob(data), key);
console.log("π Decrypted message");
return decrypted;
} catch (e) {
// Not encrypted, return as-is
return data;
}
}
}
};const protocolWrapper = {
id: "com.example.protocol",
modifyWebSocketMessage: async (data, direction) => {
if (typeof data !== "string") return data;
if (direction === "send") {
// Wrap outgoing messages in custom protocol
const wrapped = JSON.stringify({
version: "1.0",
timestamp: Date.now(),
payload: data
});
return wrapped;
} else {
// Unwrap incoming messages
try {
const parsed = JSON.parse(data);
if (parsed.version && parsed.payload) {
console.log("π¦ Unwrapped protocol message");
return parsed.payload;
}
} catch (e) {
// Not wrapped
}
return data;
}
}
};const binaryProcessor = {
id: "com.example.binary",
modifyWebSocketMessage: async (data, direction) => {
if (data instanceof ArrayBuffer) {
const view = new Uint8Array(data);
if (direction === "send") {
// Add header byte to outgoing binary
const newData = new Uint8Array(view.length + 1);
newData[0] = 0xFF; // Header marker
newData.set(view, 1);
return newData.buffer;
} else {
// Remove header byte from incoming binary
if (view[0] === 0xFF) {
return view.slice(1).buffer;
}
}
}
return data;
}
};const statefulMiddleware = {
id: "com.example.stateful",
_messageCount: 0,
_sessionId: null,
modifyWebSocketMessage: async function(data, direction) {
this._messageCount++;
if (typeof data === "string") {
try {
const json = JSON.parse(data);
// Track session
if (json.type === "session_start" && direction === "receive") {
this._sessionId = json.sessionId;
console.log("π± Session started:", this._sessionId);
}
// Add message number
if (direction === "send") {
json.messageNumber = this._messageCount;
json.sessionId = this._sessionId;
return JSON.stringify(json);
}
} catch (e) {
// Not JSON
}
}
return data;
}
};const conditionalMiddleware = {
id: "com.example.conditional",
modifyWebSocketMessage: async (data, direction) => {
// Only process for specific domains
const currentDomain = window.location.hostname;
if (!currentDomain.includes("example.com")) {
return data; // Pass through
}
// Only process certain message types
if (typeof data === "string") {
try {
const json = JSON.parse(data);
if (json.type === "chat_message") {
// Process chat messages
json.enhanced = true;
return JSON.stringify(json);
}
} catch (e) {
// Not JSON
}
}
return data;
}
};const performanceMonitor = {
id: "com.example.perf-monitor",
_metrics: {
messagesSent: 0,
messagesReceived: 0,
bytesSent: 0,
bytesReceived: 0,
startTime: Date.now()
},
modifyWebSocketMessage: async function(data, direction) {
const size = data.byteLength || data.size || data.length || 0;
if (direction === "send") {
this._metrics.messagesSent++;
this._metrics.bytesSent += size;
} else {
this._metrics.messagesReceived++;
this._metrics.bytesReceived += size;
}
// Log stats every 100 messages
const totalMessages = this._metrics.messagesSent + this._metrics.messagesReceived;
if (totalMessages % 100 === 0) {
const elapsed = (Date.now() - this._metrics.startTime) / 1000;
console.log("π WebSocket Stats:", {
uptime: `${elapsed.toFixed(1)}s`,
sent: this._metrics.messagesSent,
received: this._metrics.messagesReceived,
totalBytes: this._metrics.bytesSent + this._metrics.bytesReceived
});
}
return data;
}
};// β Bad: Forgetting to return
modifyWebSocketMessage: async (data, direction) => {
console.log(data);
// No return - data will be lost!
}
// β
Good: Always return
modifyWebSocketMessage: async (data, direction) => {
console.log(data);
return data;
}// β
Good: Handle all possible types
modifyWebSocketMessage: async (data, direction) => {
if (typeof data === "string") {
// Handle string
return data;
} else if (data instanceof Blob) {
// Handle Blob
return data;
} else if (data instanceof ArrayBuffer) {
// Handle ArrayBuffer
return data;
}
return data; // Fallback
}modifyWebSocketMessage: async (data, direction) => {
try {
// Your processing logic
if (typeof data === "string") {
const json = JSON.parse(data);
json.modified = true;
return JSON.stringify(json);
}
} catch (error) {
console.error("WebSocket middleware error:", error);
return data; // Return original on error
}
return data;
}// β Bad: Heavy processing on every message
modifyWebSocketMessage: async (data, direction) => {
// Expensive operation
await heavyComputation(data);
return data;
}
// β
Good: Only process when necessary
modifyWebSocketMessage: async (data, direction) => {
// Quick check first
if (shouldProcess(data)) {
await lightweightProcessing(data);
}
return data;
}const inspector = {
id: "com.example.inspector",
modifyWebSocketMessage: async (data, direction) => {
const details = {
direction,
timestamp: new Date().toISOString(),
type: typeof data,
size: data.byteLength || data.size || data.length || 0
};
if (typeof data === "string") {
details.preview = data.substring(0, 100);
try {
details.json = JSON.parse(data);
} catch (e) {
details.json = null;
}
}
console.table(details);
return data;
}
};const chatEnhancer = {
id: "com.example.chat-enhancer",
_userCache: new Map(),
modifyWebSocketMessage: async function(data, direction) {
if (typeof data !== "string") return data;
try {
const msg = JSON.parse(data);
if (direction === "receive" && msg.type === "chat_message") {
// Cache user info
if (msg.user) {
this._userCache.set(msg.user.id, msg.user);
}
// Add timestamp if missing
if (!msg.timestamp) {
msg.timestamp = Date.now();
}
// Format mentions
if (msg.text && msg.text.includes("@")) {
msg.text = msg.text.replace(
/@(\w+)/g,
'<span class="mention">@$1</span>'
);
}
return JSON.stringify(msg);
}
if (direction === "send" && msg.type === "chat_message") {
// Add client metadata
msg.client = {
version: "1.0",
platform: navigator.platform,
enhanced: true
};
return JSON.stringify(msg);
}
} catch (e) {
console.error("Chat enhancer error:", e);
}
return data;
}
};- Request/Response Middleware - Intercept HTTP traffic
- API Reference - Complete API documentation
- Examples - More complete examples