Skip to content

Conversation

@RicoSuter
Copy link
Owner

@RicoSuter RicoSuter commented Nov 24, 2025

Added MQTT Client (MqttSubjectClientSource)

  • Bidirectional sync: Reads property changes from MQTT topics and publishes local changes
  • Configurable QoS: Support for QoS 0, 1, and 2 with sensible defaults (QoS 0 for throughput)
  • Retained messages: Enables initial state loading from broker
  • Custom serialization: Pluggable IMqttValueConverter (JSON default, with Span-based zero-allocation optimization)
  • Timestamp support: Extracts source timestamps from MQTT user properties
  • Topic prefix: Configurable namespace isolation

Improved MQTT Server (MqttSubjectServerBackgroundService)

  • Embedded broker: Hosts a full MQTTnet server as a .NET background service
  • Property-to-topic mapping: Automatic topic generation from ISourcePathProvider
  • Initial state publishing: New clients receive current state via retained messages
  • Connection monitoring: Tracks number of connected clients

Resilience

Circuit Breaker Pattern

  • Prevents resource exhaustion during persistent failures
  • Configurable failure threshold (default: 5 failures)
  • Cooldown period with automatic retry (default: 60 seconds)
  • Tracks trip count for monitoring

Reconnection Stall Detection

  • Detects hung reconnection attempts
  • Configurable timeout via health check iterations (default: 10 × 30s = 5 minutes)
  • Automatic recovery with flag reset

Exception Classification

  • Distinguishes permanent failures (bad credentials, DNS errors, TLS issues) from transient failures
  • Enhanced logging with user-friendly error descriptions
  • Stops aggressive retrying for configuration errors

Connection Management

  • Hybrid event + health check monitoring (TryPingAsync every 30s)
  • Exponential backoff with jitter (2s → 60s max)
  • Graceful shutdown with proper disposal
  • Thread-safe state management (Volatile/Interlocked)

Performance Optimizations

  • Zero-allocation JSON: Thread-local ArrayBufferWriter<byte> reuse
  • Span-based deserialization: Single-segment fast path, pooled buffers for multi-segment
  • Array pooling: Both client and server rent/return arrays for batching
  • Object pooling: Server uses ObjectPool<List<MqttUserProperty>>
  • Topic caching: ConcurrentDictionary for path-to-topic resolution
  • Static delegate allocation: Hot path avoids closure allocations

Files

Namotion.Interceptor.Mqtt/
├── Client/
│   ├── MqttSubjectClientSource.cs       # Main client implementation
│   ├── MqttConnectionMonitor.cs         # Resilience & reconnection logic
│   ├── MqttExceptionClassifier.cs      # Exception categorization
│   └── MqttClientConfiguration.cs       # Client configuration with validation
├── Server/
│   ├── MqttSubjectServerBackgroundService.cs  # Server implementation
│   └── MqttServerConfiguration.cs       # Server configuration
├── MqttHelper.cs                        # Shared utilities (topic stripping, etc.)
├── IMqttValueConverter.cs               # Serialization abstraction
├── JsonMqttValueConverter.cs            # JSON implementation (default)
└── MqttSubjectExtensions.cs             # DI registration extensions

Configuration Example

Client

services.AddMqttSubjectClient(clientGuid, config =>
{
    config.BrokerHost = "mqtt.example.com";
    config.BrokerPort = 1883;
    config.Username = "user";
    config.Password = "pass";
    config.UseTls = true;
    config.TopicPrefix = "devices/sensor1";
    config.PathProvider = new DotSeparatedPathProvider();

    // Resilience settings
    config.CircuitBreakerFailureThreshold = 5;      // Open after 5 failures
    config.CircuitBreakerCooldown = TimeSpan.FromSeconds(60);
    config.ReconnectStallThreshold = 10;            // 5 min timeout (10 × 30s)

    // QoS & performance
    config.DefaultQualityOfService = MqttQualityOfServiceLevel.AtLeastOnce;
    config.UseRetainedMessages = true;
    config.BufferTime = TimeSpan.FromMilliseconds(8);
    config.WriteRetryQueueSize = 1000;
});

Server

services.AddMqttSubjectServer(serverGuid, config =>
{
    config.Port = 1883;
    config.PathProvider = new DotSeparatedPathProvider();
    config.TopicPrefix = "devices";
    config.DefaultQualityOfService = MqttQualityOfServiceLevel.AtMostOnce;
    config.UseRetainedMessages = true;
    config.BufferTime = TimeSpan.FromMilliseconds(100);
});

@RicoSuter RicoSuter merged commit 8e35488 into master Nov 28, 2025
4 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants