Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -169,3 +169,5 @@ coordination/orchestration/*
claude-flow
# Removed Windows wrapper files per user request
hive-mind-prompt-*.txt
infrastructure/
.playwright-*
19 changes: 19 additions & 0 deletions packages/api/src/cache/cacheConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,17 @@ const cacheConfig = {
REDIS_USE_ALTERNATIVE_DNS_LOOKUP: isEnabled(process.env.REDIS_USE_ALTERNATIVE_DNS_LOOKUP),
/** Enable redis cluster without the need of multiple URIs */
USE_REDIS_CLUSTER: isEnabled(process.env.USE_REDIS_CLUSTER ?? 'false'),
/**
* Force single-key operations for Redis commands even in non-cluster mode.
* Required for AWS ElastiCache Serverless which internally shards data but
* doesn't support the Redis Cluster protocol. This avoids CROSSSLOT errors
* when using batch operations on a sharded backend.
*
* Note: When USE_REDIS_CLUSTER=true, single-key operations are always used.
* This option only affects behavior when USE_REDIS_CLUSTER=false.
* @default false
*/
REDIS_SINGLE_KEY_OPS: isEnabled(process.env.REDIS_SINGLE_KEY_OPS ?? 'false'),
CI: isEnabled(process.env.CI),
DEBUG_MEMORY_CACHE: isEnabled(process.env.DEBUG_MEMORY_CACHE),

Expand Down Expand Up @@ -128,6 +139,14 @@ const cacheConfig = {
* @default 5000 (5 seconds)
*/
MCP_REGISTRY_CACHE_TTL: math(process.env.MCP_REGISTRY_CACHE_TTL, 5000),

/**
* Key name used to store the index of MCP server names in Redis.
* Required for sharded backends (ElastiCache Serverless) where SCAN
* only hits one shard. Set to customize the index key name.
* @default '__server_index__'
*/
MCP_SERVER_INDEX_KEY: process.env.MCP_SERVER_INDEX_KEY || '__server_index__',
};

export { cacheConfig };
15 changes: 11 additions & 4 deletions packages/api/src/cache/redisUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,24 @@ export async function batchDeleteKeys(
}

const size = chunkSize ?? cacheConfig.REDIS_DELETE_CHUNK_SIZE;
const mode = cacheConfig.USE_REDIS_CLUSTER ? 'cluster' : 'single-node';
// Use single-key operations if cluster mode OR if explicitly enabled for sharded backends like ElastiCache Serverless
const useSingleKeyOps = cacheConfig.USE_REDIS_CLUSTER || cacheConfig.REDIS_SINGLE_KEY_OPS;
const mode = cacheConfig.USE_REDIS_CLUSTER
? 'cluster'
: cacheConfig.REDIS_SINGLE_KEY_OPS
? 'single-key-ops'
: 'single-node';
const deletePromises = [];

if (cacheConfig.USE_REDIS_CLUSTER) {
// Cluster mode: Delete each key individually in parallel chunks to avoid CROSSSLOT errors
if (useSingleKeyOps) {
// Single-key mode: Delete each key individually in parallel chunks to avoid CROSSSLOT errors
// Required for Redis Cluster or sharded backends like AWS ElastiCache Serverless
for (let i = 0; i < keys.length; i += size) {
const chunk = keys.slice(i, i + size);
deletePromises.push(Promise.all(chunk.map((key) => client.del(key))));
}
} else {
// Single-node mode: Batch delete chunks using DEL with array
// Batch mode: Delete chunks using DEL with array (only safe for true single-node Redis)
for (let i = 0; i < keys.length; i += size) {
const chunk = keys.slice(i, i + size);
deletePromises.push(client.del(chunk));
Expand Down
176 changes: 162 additions & 14 deletions packages/api/src/mcp/registry/cache/ServerConfigsCacheRedis.ts
Original file line number Diff line number Diff line change
@@ -1,28 +1,152 @@
import type Keyv from 'keyv';
import { fromPairs } from 'lodash';
import { standardCache, keyvRedisClient } from '~/cache';
import { standardCache, keyvRedisClient, ioredisClient, cacheConfig } from '~/cache';
import { ParsedServerConfig, AddServerResult } from '~/mcp/types';
import { BaseRegistryCache } from './BaseRegistryCache';
import { IServerConfigsRepositoryInterface } from '../ServerConfigsRepositoryInterface';

/**
* Lua script for atomic add-to-index operation.
* Preserves Keyv's JSON wrapper format: {"value": [...], "expires": null}
*
* KEYS[1] = index key
* ARGV[1] = server name to add
*
* Returns 1 if added, 0 if already exists
*/
const LUA_ADD_TO_INDEX = `
local raw = redis.call('GET', KEYS[1])
local index = {}
if raw then
local data = cjson.decode(raw)
if data and data.value then
index = data.value
end
end
for i, name in ipairs(index) do
if name == ARGV[1] then
return 0
end
end
table.insert(index, ARGV[1])
local wrapped = cjson.encode({value = index, expires = cjson.null})
redis.call('SET', KEYS[1], wrapped)
return 1
`;

/**
* Lua script for atomic remove-from-index operation.
* Preserves Keyv's JSON wrapper format: {"value": [...], "expires": null}
*
* KEYS[1] = index key
* ARGV[1] = server name to remove
*
* Returns 1 if removed, 0 if not found
*/
const LUA_REMOVE_FROM_INDEX = `
local raw = redis.call('GET', KEYS[1])
if not raw then
return 0
end
local data = cjson.decode(raw)
if not data or not data.value then
return 0
end
local index = data.value
local newIndex = {}
local found = false
for i, name in ipairs(index) do
if name == ARGV[1] then
found = true
else
table.insert(newIndex, name)
end
end
if not found then
return 0
end
local wrapped = cjson.encode({value = newIndex, expires = cjson.null})
redis.call('SET', KEYS[1], wrapped)
return 1
`;

/**
* Redis-backed implementation of MCP server configurations cache for distributed deployments.
* Stores server configs in Redis with namespace isolation.
* Enables data sharing across multiple server instances in a cluster environment.
* Supports optional leader-only write operations to prevent race conditions during initialization.
* Data persists across server restarts and is accessible from any instance in the cluster.
*
* For sharded backends (Redis Cluster or ElastiCache Serverless with REDIS_SINGLE_KEY_OPS),
* an index key is maintained to track all server names, avoiding SCAN operations that may
* miss keys distributed across different shards.
*/
export class ServerConfigsCacheRedis
extends BaseRegistryCache
implements IServerConfigsRepositoryInterface
{
protected readonly cache: Keyv;
private readonly namespace: string;
/**
* Whether to use index-based operations instead of SCAN.
* Required for Redis Cluster or sharded backends like ElastiCache Serverless.
*/
private readonly useIndex: boolean;

constructor(namespace: string, leaderOnly: boolean) {
super(leaderOnly);
this.namespace = namespace;
this.cache = standardCache(`${this.PREFIX}::Servers::${namespace}`);
this.useIndex = cacheConfig.USE_REDIS_CLUSTER || cacheConfig.REDIS_SINGLE_KEY_OPS;
}

/**
* Retrieves the current server name index from cache.
* Returns empty array if index doesn't exist.
*/
private async getIndex(): Promise<string[]> {
const index = await this.cache.get(cacheConfig.MCP_SERVER_INDEX_KEY);
return Array.isArray(index) ? index : [];
}

/**
* Adds a server name to the index if not already present.
* Uses atomic Lua script to prevent race conditions during parallel initialization.
*/
private async addToIndex(serverName: string): Promise<void> {
// Use atomic Lua script via ioredis to prevent race conditions
// ioredis auto-prepends keyPrefix, so we only need namespace:key
if (ioredisClient) {
const fullKey = `${this.cache.namespace}:${cacheConfig.MCP_SERVER_INDEX_KEY}`;
await ioredisClient.call('EVAL', LUA_ADD_TO_INDEX, 1, fullKey, serverName);
return;
}
// Fallback: non-atomic operation (only used if ioredis unavailable)
const index = await this.getIndex();
if (!index.includes(serverName)) {
index.push(serverName);
await this.cache.set(cacheConfig.MCP_SERVER_INDEX_KEY, index);
}
}

/**
* Removes a server name from the index.
* Uses atomic Lua script to prevent race conditions.
*/
private async removeFromIndex(serverName: string): Promise<void> {
// Use atomic Lua script via ioredis to prevent race conditions
// ioredis auto-prepends keyPrefix, so we only need namespace:key
if (ioredisClient) {
const fullKey = `${this.cache.namespace}:${cacheConfig.MCP_SERVER_INDEX_KEY}`;
await ioredisClient.call('EVAL', LUA_REMOVE_FROM_INDEX, 1, fullKey, serverName);
return;
}
// Fallback: non-atomic operation (only used if ioredis unavailable)
const index = await this.getIndex();
const newIndex = index.filter((name) => name !== serverName);
if (newIndex.length !== index.length) {
await this.cache.set(cacheConfig.MCP_SERVER_INDEX_KEY, newIndex);
}
}

public async add(serverName: string, config: ParsedServerConfig): Promise<AddServerResult> {
Expand All @@ -35,6 +159,12 @@ export class ServerConfigsCacheRedis
const storedConfig = { ...config, updatedAt: Date.now() };
const success = await this.cache.set(serverName, storedConfig);
this.successCheck(`add ${this.namespace} server "${serverName}"`, success);

// Maintain index for sharded backends (cluster mode or single-key ops)
if (this.useIndex) {
await this.addToIndex(serverName);
}

return { serverName, config: storedConfig };
}

Expand All @@ -53,32 +183,50 @@ export class ServerConfigsCacheRedis
if (this.leaderOnly) await this.leaderCheck(`remove ${this.namespace} MCP servers`);
const success = await this.cache.delete(serverName);
this.successCheck(`remove ${this.namespace} server "${serverName}"`, success);

// Keep index in sync for sharded backends
if (this.useIndex) {
await this.removeFromIndex(serverName);
}
}

public async get(serverName: string): Promise<ParsedServerConfig | undefined> {
return this.cache.get(serverName);
}

public async getAll(): Promise<Record<string, ParsedServerConfig>> {
// Use Redis SCAN iterator directly (non-blocking, production-ready)
// Note: Keyv uses a single colon ':' between namespace and key, even if GLOBAL_PREFIX_SEPARATOR is '::'
const pattern = `*${this.cache.namespace}:*`;
const entries: Array<[string, ParsedServerConfig]> = [];

// Use scanIterator from Redis client
if (keyvRedisClient && 'scanIterator' in keyvRedisClient) {
for await (const key of keyvRedisClient.scanIterator({ MATCH: pattern })) {
// Extract the actual key name (last part after final colon)
// Full key format: "prefix::namespace:keyName"
const lastColonIndex = key.lastIndexOf(':');
const keyName = key.substring(lastColonIndex + 1);
const config = (await this.cache.get(keyName)) as ParsedServerConfig | undefined;
if (this.useIndex) {
// Index-based retrieval for sharded backends (cluster mode or ElastiCache Serverless)
// SCAN only hits one shard on ElastiCache Serverless, so we use an index key instead
const serverNames = await this.getIndex();
for (const serverName of serverNames) {
const config = (await this.cache.get(serverName)) as ParsedServerConfig | undefined;
if (config) {
entries.push([keyName, config]);
entries.push([serverName, config]);
}
}
} else {
throw new Error('Redis client with scanIterator not available.');
// Use Redis SCAN iterator directly (non-blocking, production-ready)
// Note: Keyv uses a single colon ':' between namespace and key, even if GLOBAL_PREFIX_SEPARATOR is '::'
const pattern = `*${this.cache.namespace}:*`;

// Use scanIterator from Redis client
if (keyvRedisClient && 'scanIterator' in keyvRedisClient) {
for await (const key of keyvRedisClient.scanIterator({ MATCH: pattern })) {
// Extract the actual key name (last part after final colon)
// Full key format: "prefix::namespace:keyName"
const lastColonIndex = key.lastIndexOf(':');
const keyName = key.substring(lastColonIndex + 1);
const config = (await this.cache.get(keyName)) as ParsedServerConfig | undefined;
if (config) {
entries.push([keyName, config]);
}
}
} else {
throw new Error('Redis client with scanIterator not available.');
}
}

return fromPairs(entries);
Expand Down
Loading