Generic Data-Loader Agent for Redis-Centric Architectures
RDI Agent is a long-running service that fetches data from MCP-compatible tools, extracts specified fields, and stores the results in Redis with configurable TTL-based refresh scheduling. It acts as a centralized data loader that eliminates the need for individual microservices to implement their own data fetching logic.
- MCP Tool Integration: Supports HTTP GET/POST calls to MCP-compatible tools
- Flexible Data Extraction: JSONPath, JMESPath, XPath, and regex extraction methods
- Multiple Redis Data Types: STRING, HASH, LIST, SET, SORTED_SET, and JSON storage
- TTL-Based Refresh: Configurable refresh intervals based on data TTL
- Change Detection: ETag and timestamp-based change detection to avoid redundant writes
- Comprehensive Monitoring: Prometheus metrics and OpenTelemetry tracing
- RESTful API: Force refresh, status monitoring, health checks
- Hot Reloading: Automatic descriptor file monitoring and reloading
- Python 3.11+
- Redis 7.2+
- OpenAI API key (if using OpenAI integrations)
- Clone the repository:
git clone <repository-url>
cd rdi-agent- Install dependencies:
pip install -r requirements.txt- Configure environment:
cp .env.example .env
# Edit .env with your configuration-
Create descriptor files in the
descriptors/directory (see examples) -
Run the agent:
python -m rdi_agent.main| Variable | Default | Description |
|---|---|---|
REDIS_HOST |
localhost | Redis host |
REDIS_PORT |
6379 | Redis port |
REDIS_PASSWORD |
Redis password | |
REDIS_DB |
0 | Redis database number |
REDIS_SSL |
false | Use SSL for Redis |
OPENAI_API_KEY |
OpenAI API key | |
DESCRIPTORS_PATH |
./descriptors | Path to descriptor files |
REFRESH_FACTOR |
0.9 | Refresh factor (0.1-1.0) |
SCAN_INTERVAL |
30 | Descriptor scan interval (seconds) |
HTTP_PORT |
8000 | HTTP server port |
LOG_LEVEL |
INFO | Log level |
Descriptors are YAML files that define data sources, extraction rules, and Redis targets:
id: weather_sf
source:
tool: WeatherTool
method: GET
endpoint: https://api.openweathermap.org/data/2.5/weather
params:
q: "San Francisco"
appid: "your_api_key_here"
units: "metric"
extract:
path: $.current
type: jsonpath
default: {}
target:
redis_key: weather:sf
data_type: HASH
ttl: 600
enabled: true
etag_header: etag- id: Unique identifier
- source: Data source configuration
- tool: MCP tool name
- method: HTTP method (GET/POST)
- endpoint: API endpoint URL
- params: Query parameters or request data
- headers: HTTP headers (optional)
- body: Request body for POST (optional)
- timeout: Request timeout in seconds
- extract: Data extraction configuration
- path: Extraction path expression
- type: Extraction type (jsonpath, jmespath, xpath, regex)
- default: Default value if extraction fails
- target: Redis target configuration
- redis_key: Redis key name
- data_type: Redis data type (STRING, HASH, LIST, SET, SORTED_SET, JSON)
- field: Hash field name (for HASH type)
- ttl: Time-to-live in seconds (60-86400)
- enabled: Whether descriptor is enabled
- etag_header: HTTP header for ETag checking
- timestamp_field: Field for timestamp checking
POST /api/v1/force/{descriptor_id}Triggers immediate refresh for a specific descriptor.
GET /api/v1/status
GET /api/v1/status/{descriptor_id}Returns job status and execution history.
GET /healthz # Liveness probe
GET /readyz # Readiness probeGET /metrics # Prometheus metricsExtract data from JSON using JSONPath expressions:
extract:
path: $.weather[0].description
type: jsonpathQuery JSON data with JMESPath:
extract:
path: weather[0].{description: description, temp: main.temp}
type: jmespathExtract from XML/HTML using XPath:
extract:
path: //div[@class='temperature']/text()
type: xpathExtract using regular expressions:
extract:
path: 'Temperature: (\\d+)°F'
type: regexStore data as a Redis string:
target:
redis_key: temperature:sf
data_type: STRINGStore data as a Redis hash:
target:
redis_key: weather:sf
data_type: HASH
field: current # Optional: specific fieldStore data as a Redis list:
target:
redis_key: alerts:weather
data_type: LISTStore data as a Redis set:
target:
redis_key: cities:active
data_type: SETStore data as a Redis sorted set:
target:
redis_key: temperatures:ranked
data_type: SORTED_SETStore data as RedisJSON:
target:
redis_key: weather:sf:full
data_type: JSONThe agent exposes various metrics:
rdi_agent_job_success_total: Successful job executionsrdi_agent_job_failure_total: Failed job executionsrdi_agent_job_duration_seconds: Job execution durationrdi_agent_data_staleness_seconds: Age of cached datardi_agent_descriptors_total: Total descriptorsrdi_agent_descriptors_enabled: Enabled descriptorsrdi_agent_consecutive_failures: Consecutive failures per descriptor
OpenTelemetry tracing is enabled by default and provides distributed tracing across all operations.
The agent implements robust error handling with:
- Exponential backoff for retryable errors (5xx, network issues)
- Extended backoff for 4xx errors (up to 5x TTL)
- Retry logic with circuit breaker patterns
- Graceful degradation when Redis or data sources are unavailable
Build and run with Docker:
docker build -t rdi-agent .
docker run -d --name rdi-agent \
--env-file .env \
-p 8000:8000 \
-v ./descriptors:/app/descriptors \
rdi-agentUse the provided docker-compose.yml:
docker-compose up -dDeploy using Helm charts or Kubernetes manifests with:
- ConfigMaps for descriptor files
- Secrets for credentials
- ServiceMonitor for Prometheus scraping
- Horizontal Pod Autoscaling
pytestblack src/
isort src/mypy src/flake8 src/- Fork the repository
- Create a feature branch
- Make your changes
- Add tests for new functionality
- Ensure all tests pass
- Submit a pull request
MIT License - see LICENSE file for details.