Skip to content
14 changes: 13 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,10 @@ WEBHOOK_URL=the webhook url, this config attribute put phone number on the end,
WEBHOOK_TOKEN=the webhook header token
WEBHOOK_HEADER=the webhook header name
WEBHOOK_TIMEOUT_MS=webhook request timeout, default 5000 ms
WEBHOOK_CB_ENABLED=true enable webhook circuit breaker to avoid backlog when endpoint is offline, default true
WEBHOOK_CB_FAILURE_THRESHOLD=number of failures within window to open circuit, default 1
WEBHOOK_CB_OPEN_MS=how long to keep the circuit open (skip sends), default 120000
WEBHOOK_CB_FAILURE_TTL_MS=failure counter window in ms, default 300000
WEBHOOK_SEND_NEW_MESSAGES=true, send new messages to webhook, caution with this, messages will be duplicated, default is false
WEBHOOK_SEND_GROUP_MESSAGES=true, send group messages to webhook, default is true
WEBHOOK_SEND_OUTGOING_MESSAGES=true, send outgoing messages to webhook, default is true
Expand Down Expand Up @@ -523,6 +527,13 @@ WEBHOOK_FORWARD_VERSION=the version of whatsapp cloud api, default is v17.0
WEBHOOK_FORWARD_URL=the url of whatsapp cloud api, default is https://graph.facebook.com
WEBHOOK_FORWARD_TIMEOUT_MS=the timeout for request to whatsapp cloud api, default is 360000
```
Example (circuit breaker):
```env
WEBHOOK_CB_ENABLED=true
WEBHOOK_CB_FAILURE_THRESHOLD=1
WEBHOOK_CB_FAILURE_TTL_MS=300000
WEBHOOK_CB_OPEN_MS=120000
```

### Config session with redis

Expand Down Expand Up @@ -808,4 +819,5 @@ Mail to [email protected]
- Connect with pairing code: https://github.com/WhiskeySockets/Baileys#starting-socket-with-pairing-code
- Counting connection retry attempts even when restarting to prevent looping messages
- Message delete endpoint
- Send reply message with please to send again, when any error and message enqueue in .dead
- Send reply message with please to send again, when any error and message enqueue in .dead

6 changes: 6 additions & 0 deletions src/defaults.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ export const WEBHOOK_ADD_TO_BLACKLIST_ON_OUTGOING_MESSAGE_WITH_TTL =
? undefined
: parseInt(process.env.WEBHOOK_ADD_TO_BLACKLIST_ON_OUTGOING_MESSAGE_WITH_TTL!)
export const WEBHOOK_SESSION = process.env.WEBHOOK_SESSION || ''
// Webhook circuit breaker (fail fast when endpoints are offline)
export const WEBHOOK_CB_ENABLED =
process.env.WEBHOOK_CB_ENABLED == _undefined ? true : process.env.WEBHOOK_CB_ENABLED == 'true'
export const WEBHOOK_CB_FAILURE_THRESHOLD = parseInt(process.env.WEBHOOK_CB_FAILURE_THRESHOLD || '1')
export const WEBHOOK_CB_OPEN_MS = parseInt(process.env.WEBHOOK_CB_OPEN_MS || '120000')
export const WEBHOOK_CB_FAILURE_TTL_MS = parseInt(process.env.WEBHOOK_CB_FAILURE_TTL_MS || '300000')
export const AMQP_URL = process.env.AMQP_URL || 'amqp://guest:guest@localhost:5672'
export const REDIS_URL = process.env.REDIS_URL || 'redis://localhost:6379'
export const PROXY_URL = process.env.PROXY_URL
Expand Down
91 changes: 90 additions & 1 deletion src/services/outgoing_cloud_api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import logger from './logger'
import { completeCloudApiWebHook, isGroupMessage, isOutgoingMessage, isNewsletterMessage, isUpdateMessage, extractDestinyPhone, extractFromPhone } from './transformer'
import { addToBlacklist, isInBlacklist } from './blacklist'
import { PublishOption } from '../amqp'
import { WEBHOOK_CB_ENABLED, WEBHOOK_CB_FAILURE_THRESHOLD, WEBHOOK_CB_OPEN_MS, WEBHOOK_CB_FAILURE_TTL_MS } from '../defaults'
import { isWebhookCircuitOpen, openWebhookCircuit, closeWebhookCircuit, bumpWebhookCircuitFailure } from './redis'

export class OutgoingCloudApi implements Outgoing {
private getConfig: getConfig
Expand All @@ -29,6 +31,23 @@ export class OutgoingCloudApi implements Outgoing {
}

public async sendHttp(phone: string, webhook: Webhook, message: object, _options: Partial<PublishOption> = {}) {
const cbEnabled = !!WEBHOOK_CB_ENABLED && WEBHOOK_CB_FAILURE_THRESHOLD > 0 && WEBHOOK_CB_OPEN_MS > 0
const cbId = (webhook && (webhook.id || webhook.url || webhook.urlAbsolute)) ? `${webhook.id || webhook.url || webhook.urlAbsolute}` : 'default'
const cbKey = `${phone}:${cbId}`
const now = Date.now()
if (cbEnabled) {
try {
const open = await isWebhookCircuitOpen(phone, cbId)
if (open) {
logger.warn('WEBHOOK_CB open: skipping send (phone=%s webhook=%s)', phone, cbId)
return
}
} catch {}
if (isCircuitOpenLocal(cbKey, now)) {
logger.warn('WEBHOOK_CB open (local): skipping send (phone=%s webhook=%s)', phone, cbId)
return
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}
const destinyPhone = await this.isInBlacklist(phone, webhook.id, message)
if (destinyPhone) {
logger.info(`Session phone %s webhook %s and destiny phone %s are in blacklist`, phone, webhook.id, destinyPhone)
Expand Down Expand Up @@ -89,11 +108,81 @@ export class OutgoingCloudApi implements Outgoing {
} catch (error) {
logger.error('Error on send to url %s with headers %s and body %s', url, JSON.stringify(headers), body)
logger.error(error)
if (cbEnabled) {
await this.handleCircuitFailure(phone, cbId, cbKey, error as any)
return
}
throw error
}
logger.debug('Response: %s', response?.status)
if (!response?.ok) {
throw await response?.text()
const errText = await response?.text()
if (cbEnabled) {
await this.handleCircuitFailure(phone, cbId, cbKey, errText)
return
}
throw errText
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
if (cbEnabled) {
try {
await closeWebhookCircuit(phone, cbId)
} catch {}
resetCircuitLocal(cbKey)
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}

private async handleCircuitFailure(phone: string, cbId: string, cbKey: string, error: any) {
try {
const threshold = WEBHOOK_CB_FAILURE_THRESHOLD || 1
const openMs = WEBHOOK_CB_OPEN_MS || 120000
const ttlMs = WEBHOOK_CB_FAILURE_TTL_MS || openMs
const count = await bumpWebhookCircuitFailure(phone, cbId, ttlMs)
const localCount = bumpCircuitFailureLocal(cbKey, ttlMs)
const finalCount = Math.max(count || 0, localCount || 0)
if (finalCount >= threshold) {
await openWebhookCircuit(phone, cbId, openMs)
openCircuitLocal(cbKey, openMs)
logger.warn('WEBHOOK_CB opened (phone=%s webhook=%s count=%s openMs=%s)', phone, cbId, finalCount, openMs)
} else {
logger.warn('WEBHOOK_CB failure (phone=%s webhook=%s count=%s/%s)', phone, cbId, finalCount, threshold)
}
} catch (e) {
logger.warn(e as any, 'WEBHOOK_CB failure handler error')
}
try { logger.warn(error as any, 'WEBHOOK_CB send failed (phone=%s webhook=%s)', phone, cbId) } catch {}
}
}

const cbOpenUntil: Map<string, number> = new Map()
const cbFailState: Map<string, { count: number; exp: number }> = new Map()

const isCircuitOpenLocal = (key: string, now: number) => {
const until = cbOpenUntil.get(key)
if (!until) return false
if (now >= until) {
cbOpenUntil.delete(key)
return false
}
return true
}

const openCircuitLocal = (key: string, openMs: number) => {
cbOpenUntil.set(key, Date.now() + Math.max(1, openMs || 0))
}

const resetCircuitLocal = (key: string) => {
cbOpenUntil.delete(key)
cbFailState.delete(key)
}

const bumpCircuitFailureLocal = (key: string, ttlMs: number): number => {
const now = Date.now()
const ttl = Math.max(1, ttlMs || 0)
const current = cbFailState.get(key)
if (!current || now >= current.exp) {
cbFailState.set(key, { count: 1, exp: now + ttl })
return 1
}
current.count += 1
return current.count
}
61 changes: 61 additions & 0 deletions src/services/redis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,67 @@ const redisSetAndExpire = async function (key: string, value: any, ttl: number)
}
}

export const redisDelKey = async (key: string) => redisDel(key)

// Atomic increment with TTL (seconds). Sets TTL on first increment.
export const redisIncrWithTtl = async (key: string, ttlSec: number): Promise<number> => {
logger.trace(`INCR ${key} with ttl ${ttlSec}s`)
try {
const v = await client.incr(key)
if (v === 1 && ttlSec > 0) {
try { await client.expire(key, ttlSec) } catch {}
}
return v
} catch (error) {
if (!client) {
await getRedis()
const v = await client.incr(key)
if (v === 1 && ttlSec > 0) {
try { await client.expire(key, ttlSec) } catch {}
}
return v
}
throw error
}
}

// Webhook circuit breaker keys
export const webhookCircuitOpenKey = (session: string, webhookId: string) =>
`${BASE_KEY}webhook-cb:${session}:${webhookId}:open`
export const webhookCircuitFailKey = (session: string, webhookId: string) =>
`${BASE_KEY}webhook-cb:${session}:${webhookId}:fail`

export const isWebhookCircuitOpen = async (session: string, webhookId: string): Promise<boolean> => {
const key = webhookCircuitOpenKey(session, webhookId)
try {
const v = await redisGet(key)
return !!v
} catch {
return false
}
}

export const openWebhookCircuit = async (session: string, webhookId: string, openMs: number): Promise<void> => {
const ttlSec = Math.max(1, Math.ceil((openMs || 0) / 1000))
try {
await redisSetAndExpire(webhookCircuitOpenKey(session, webhookId), '1', ttlSec)
} catch {}
}

export const closeWebhookCircuit = async (session: string, webhookId: string): Promise<void> => {
try { await redisDel(webhookCircuitOpenKey(session, webhookId)) } catch {}
try { await redisDel(webhookCircuitFailKey(session, webhookId)) } catch {}
}

export const bumpWebhookCircuitFailure = async (session: string, webhookId: string, ttlMs: number): Promise<number> => {
const ttlSec = Math.max(1, Math.ceil((ttlMs || 0) / 1000))
try {
return await redisIncrWithTtl(webhookCircuitFailKey(session, webhookId), ttlSec)
} catch {
return 0
}
}

export const authKey = (phone: string) => {
return `${BASE_KEY}auth:${phone}`
}
Expand Down