diff --git a/src/Namotion.Interceptor.Mqtt.SampleClient/Namotion.Interceptor.Mqtt.SampleClient.csproj b/src/Namotion.Interceptor.Mqtt.SampleClient/Namotion.Interceptor.Mqtt.SampleClient.csproj
new file mode 100644
index 00000000..fd7c5108
--- /dev/null
+++ b/src/Namotion.Interceptor.Mqtt.SampleClient/Namotion.Interceptor.Mqtt.SampleClient.csproj
@@ -0,0 +1,22 @@
+
+
+
+ Exe
+ net9.0
+ enable
+ enable
+ false
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/Namotion.Interceptor.Mqtt.SampleClient/Program.cs b/src/Namotion.Interceptor.Mqtt.SampleClient/Program.cs
new file mode 100644
index 00000000..7c52fa43
--- /dev/null
+++ b/src/Namotion.Interceptor.Mqtt.SampleClient/Program.cs
@@ -0,0 +1,41 @@
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Hosting;
+using Namotion.Interceptor;
+using Namotion.Interceptor.Hosting;
+using Namotion.Interceptor.Mqtt;
+using Namotion.Interceptor.Mqtt.Client;
+using Namotion.Interceptor.Registry;
+using Namotion.Interceptor.SamplesModel;
+using Namotion.Interceptor.SamplesModel.Workers;
+using Namotion.Interceptor.Sources.Paths;
+using Namotion.Interceptor.Tracking;
+using Namotion.Interceptor.Validation;
+
+var builder = Host.CreateApplicationBuilder(args);
+
+var context = InterceptorSubjectContext
+ .Create()
+ .WithFullPropertyTracking()
+ .WithRegistry()
+ .WithParents()
+ .WithLifecycle()
+ .WithDataAnnotationValidation()
+ .WithHostedServices(builder.Services);
+
+var root = Root.CreateWithPersons(context);
+context.AddService(root);
+
+builder.Services.AddSingleton(root);
+builder.Services.AddHostedService();
+builder.Services.AddMqttSubjectClient(
+ _ => root,
+ _ => new MqttClientConfiguration
+ {
+ BrokerHost = "localhost",
+ BrokerPort = 1883,
+ PathProvider = new AttributeBasedSourcePathProvider("mqtt", "/", null)
+ });
+
+using var performanceProfiler = new PerformanceProfiler(context, "Client");
+var host = builder.Build();
+host.Run();
diff --git a/src/Namotion.Interceptor.Mqtt.SampleServer/Namotion.Interceptor.Mqtt.SampleServer.csproj b/src/Namotion.Interceptor.Mqtt.SampleServer/Namotion.Interceptor.Mqtt.SampleServer.csproj
new file mode 100644
index 00000000..fd7c5108
--- /dev/null
+++ b/src/Namotion.Interceptor.Mqtt.SampleServer/Namotion.Interceptor.Mqtt.SampleServer.csproj
@@ -0,0 +1,22 @@
+
+
+
+ Exe
+ net9.0
+ enable
+ enable
+ false
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/Namotion.Interceptor.Mqtt.SampleServer/Program.cs b/src/Namotion.Interceptor.Mqtt.SampleServer/Program.cs
new file mode 100644
index 00000000..c291f8c0
--- /dev/null
+++ b/src/Namotion.Interceptor.Mqtt.SampleServer/Program.cs
@@ -0,0 +1,40 @@
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Hosting;
+using Namotion.Interceptor;
+using Namotion.Interceptor.Hosting;
+using Namotion.Interceptor.Mqtt.Server;
+using Namotion.Interceptor.Registry;
+using Namotion.Interceptor.SamplesModel;
+using Namotion.Interceptor.SamplesModel.Workers;
+using Namotion.Interceptor.Sources.Paths;
+using Namotion.Interceptor.Tracking;
+using Namotion.Interceptor.Validation;
+
+var builder = Host.CreateApplicationBuilder(args);
+
+var context = InterceptorSubjectContext
+ .Create()
+ .WithFullPropertyTracking()
+ .WithRegistry()
+ .WithParents()
+ .WithLifecycle()
+ .WithDataAnnotationValidation()
+ .WithHostedServices(builder.Services);
+
+var root = Root.CreateWithPersons(context);
+context.AddService(root);
+
+builder.Services.AddSingleton(root);
+builder.Services.AddHostedService();
+builder.Services.AddMqttSubjectServer(
+ _ => root,
+ _ => new MqttServerConfiguration
+ {
+ BrokerHost = "localhost",
+ BrokerPort = 1883,
+ PathProvider = new AttributeBasedSourcePathProvider("mqtt", "/")
+ });
+
+using var performanceProfiler = new PerformanceProfiler(context, "Server");
+var host = builder.Build();
+host.Run();
diff --git a/src/Namotion.Interceptor.Mqtt.Tests/JsonMqttValueConverterTests.cs b/src/Namotion.Interceptor.Mqtt.Tests/JsonMqttValueConverterTests.cs
new file mode 100644
index 00000000..ce0e8540
--- /dev/null
+++ b/src/Namotion.Interceptor.Mqtt.Tests/JsonMqttValueConverterTests.cs
@@ -0,0 +1,144 @@
+using System.Buffers;
+using System.Text;
+using System.Text.Json;
+using Xunit;
+
+namespace Namotion.Interceptor.Mqtt.Tests;
+
+public class JsonMqttValueConverterTests
+{
+ private readonly JsonMqttValueConverter _converter = new();
+
+ [Fact]
+ public void Serialize_NullValue_ReturnsJsonNull()
+ {
+ // Act
+ var result = _converter.Serialize(null, typeof(object));
+
+ // Assert
+ Assert.Equal("null", Encoding.UTF8.GetString(result));
+ }
+
+ [Theory]
+ [InlineData(42, typeof(int), "42")]
+ [InlineData(3.14, typeof(double), "3.14")]
+ [InlineData(true, typeof(bool), "true")]
+ [InlineData(false, typeof(bool), "false")]
+ [InlineData("test", typeof(string), "\"test\"")]
+ public void Serialize_PrimitiveTypes_ReturnsCorrectJson(object value, Type type, string expected)
+ {
+ // Act
+ var result = _converter.Serialize(value, type);
+
+ // Assert
+ Assert.Equal(expected, Encoding.UTF8.GetString(result));
+ }
+
+ [Fact]
+ public void Serialize_ComplexObject_ReturnsCorrectJson()
+ {
+ // Arrange
+ var obj = new TestObject { Name = "Test", Value = 123 };
+
+ // Act
+ var result = _converter.Serialize(obj, typeof(TestObject));
+
+ // Assert
+ var json = Encoding.UTF8.GetString(result);
+ Assert.Contains("\"name\":", json); // camelCase
+ Assert.Contains("\"Test\"", json);
+ Assert.Contains("\"value\":", json);
+ Assert.Contains("123", json);
+ }
+
+ [Fact]
+ public void Deserialize_EmptyPayload_ReturnsNull()
+ {
+ // Arrange
+ var payload = new ReadOnlySequence(Array.Empty());
+
+ // Act
+ var result = _converter.Deserialize(payload, typeof(string));
+
+ // Assert
+ Assert.Null(result);
+ }
+
+ [Fact]
+ public void Deserialize_JsonNull_ReturnsNull()
+ {
+ // Arrange
+ var payload = new ReadOnlySequence(Encoding.UTF8.GetBytes("null"));
+
+ // Act
+ var result = _converter.Deserialize(payload, typeof(string));
+
+ // Assert
+ Assert.Null(result);
+ }
+
+ [Theory]
+ [InlineData("42", typeof(int), 42)]
+ [InlineData("3.14", typeof(double), 3.14)]
+ [InlineData("true", typeof(bool), true)]
+ [InlineData("\"test\"", typeof(string), "test")]
+ public void Deserialize_PrimitiveTypes_ReturnsCorrectValue(string json, Type type, object expected)
+ {
+ // Arrange
+ var payload = new ReadOnlySequence(Encoding.UTF8.GetBytes(json));
+
+ // Act
+ var result = _converter.Deserialize(payload, type);
+
+ // Assert
+ Assert.Equal(expected, result);
+ }
+
+ [Fact]
+ public void Deserialize_ComplexObject_ReturnsCorrectObject()
+ {
+ // Arrange
+ var json = "{\"name\":\"Test\",\"value\":123}";
+ var payload = new ReadOnlySequence(Encoding.UTF8.GetBytes(json));
+
+ // Act
+ var result = _converter.Deserialize(payload, typeof(TestObject)) as TestObject;
+
+ // Assert
+ Assert.NotNull(result);
+ Assert.Equal("Test", result.Name);
+ Assert.Equal(123, result.Value);
+ }
+
+ [Fact]
+ public void Deserialize_InvalidJson_ThrowsJsonException()
+ {
+ // Arrange
+ var payload = new ReadOnlySequence(Encoding.UTF8.GetBytes("invalid json"));
+
+ // Act & Assert
+ Assert.Throws(() => _converter.Deserialize(payload, typeof(int)));
+ }
+
+ [Fact]
+ public void RoundTrip_PreservesValue()
+ {
+ // Arrange
+ var original = new TestObject { Name = "RoundTrip", Value = 456 };
+
+ // Act
+ var serialized = _converter.Serialize(original, typeof(TestObject));
+ var deserialized = _converter.Deserialize(new ReadOnlySequence(serialized), typeof(TestObject)) as TestObject;
+
+ // Assert
+ Assert.NotNull(deserialized);
+ Assert.Equal(original.Name, deserialized.Name);
+ Assert.Equal(original.Value, deserialized.Value);
+ }
+
+ private class TestObject
+ {
+ public string Name { get; set; } = string.Empty;
+ public int Value { get; set; }
+ }
+}
diff --git a/src/Namotion.Interceptor.Mqtt.Tests/MqttClientConfigurationTests.cs b/src/Namotion.Interceptor.Mqtt.Tests/MqttClientConfigurationTests.cs
new file mode 100644
index 00000000..fee7f3ef
--- /dev/null
+++ b/src/Namotion.Interceptor.Mqtt.Tests/MqttClientConfigurationTests.cs
@@ -0,0 +1,140 @@
+using Namotion.Interceptor.Mqtt.Client;
+using Namotion.Interceptor.Sources.Paths;
+using Xunit;
+
+namespace Namotion.Interceptor.Mqtt.Tests;
+
+public class MqttClientConfigurationTests
+{
+ [Fact]
+ public void Validate_ValidConfiguration_DoesNotThrow()
+ {
+ // Arrange
+ var config = new MqttClientConfiguration
+ {
+ BrokerHost = "localhost",
+ BrokerPort = 1883,
+ PathProvider = new AttributeBasedSourcePathProvider("test", "/", null)
+ };
+
+ // Act & Assert
+ config.Validate(); // Should not throw
+ }
+
+ [Fact]
+ public void Validate_NullBrokerHost_ThrowsArgumentException()
+ {
+ // Arrange
+ var config = new MqttClientConfiguration
+ {
+ BrokerHost = null!,
+ BrokerPort = 1883,
+ PathProvider = new AttributeBasedSourcePathProvider("test", "/", null)
+ };
+
+ // Act & Assert
+ Assert.Throws(() => config.Validate());
+ }
+
+ [Fact]
+ public void Validate_EmptyBrokerHost_ThrowsArgumentException()
+ {
+ // Arrange
+ var config = new MqttClientConfiguration
+ {
+ BrokerHost = "",
+ BrokerPort = 1883,
+ PathProvider = new AttributeBasedSourcePathProvider("test", "/", null)
+ };
+
+ // Act & Assert
+ Assert.Throws(() => config.Validate());
+ }
+
+ [Theory]
+ [InlineData(0)]
+ [InlineData(-1)]
+ [InlineData(65536)]
+ public void Validate_InvalidBrokerPort_ThrowsArgumentException(int port)
+ {
+ // Arrange
+ var config = new MqttClientConfiguration
+ {
+ BrokerHost = "localhost",
+ BrokerPort = port,
+ PathProvider = new AttributeBasedSourcePathProvider("test", "/", null)
+ };
+
+ // Act & Assert
+ Assert.Throws(() => config.Validate());
+ }
+
+ [Fact]
+ public void Validate_NullPathProvider_ThrowsArgumentException()
+ {
+ // Arrange
+ var config = new MqttClientConfiguration
+ {
+ BrokerHost = "localhost",
+ BrokerPort = 1883,
+ PathProvider = null!
+ };
+
+ // Act & Assert
+ Assert.Throws(() => config.Validate());
+ }
+
+ [Fact]
+ public void Validate_NegativeWriteRetryQueueSize_ThrowsArgumentException()
+ {
+ // Arrange
+ var config = new MqttClientConfiguration
+ {
+ BrokerHost = "localhost",
+ BrokerPort = 1883,
+ PathProvider = new AttributeBasedSourcePathProvider("test", "/", null),
+ WriteRetryQueueSize = -1
+ };
+
+ // Act & Assert
+ Assert.Throws(() => config.Validate());
+ }
+
+ [Fact]
+ public void Validate_MaxReconnectDelayLessThanReconnectDelay_ThrowsArgumentException()
+ {
+ // Arrange
+ var config = new MqttClientConfiguration
+ {
+ BrokerHost = "localhost",
+ BrokerPort = 1883,
+ PathProvider = new AttributeBasedSourcePathProvider("test", "/", null),
+ ReconnectDelay = TimeSpan.FromSeconds(10),
+ MaximumReconnectDelay = TimeSpan.FromSeconds(5)
+ };
+
+ // Act & Assert
+ Assert.Throws(() => config.Validate());
+ }
+
+ [Fact]
+ public void DefaultValues_AreCorrect()
+ {
+ // Arrange & Act
+ var config = new MqttClientConfiguration
+ {
+ BrokerHost = "localhost",
+ PathProvider = new AttributeBasedSourcePathProvider("test", "/", null)
+ };
+
+ // Assert
+ Assert.Equal(1883, config.BrokerPort);
+ Assert.False(config.UseTls);
+ Assert.True(config.CleanSession);
+ Assert.Equal(TimeSpan.FromSeconds(15), config.KeepAliveInterval);
+ Assert.Equal(TimeSpan.FromSeconds(10), config.ConnectTimeout);
+ Assert.Equal(TimeSpan.FromSeconds(2), config.ReconnectDelay);
+ Assert.Equal(TimeSpan.FromMinutes(1), config.MaximumReconnectDelay);
+ Assert.NotNull(config.ValueConverter);
+ }
+}
diff --git a/src/Namotion.Interceptor.Mqtt.Tests/MqttServerConfigurationTests.cs b/src/Namotion.Interceptor.Mqtt.Tests/MqttServerConfigurationTests.cs
new file mode 100644
index 00000000..70b33be5
--- /dev/null
+++ b/src/Namotion.Interceptor.Mqtt.Tests/MqttServerConfigurationTests.cs
@@ -0,0 +1,69 @@
+using Namotion.Interceptor.Mqtt.Server;
+using Namotion.Interceptor.Sources.Paths;
+using Xunit;
+
+namespace Namotion.Interceptor.Mqtt.Tests;
+
+public class MqttServerConfigurationTests
+{
+ [Fact]
+ public void Validate_ValidConfiguration_DoesNotThrow()
+ {
+ // Arrange
+ var config = new MqttServerConfiguration
+ {
+ BrokerHost = "localhost",
+ BrokerPort = 1883,
+ PathProvider = new AttributeBasedSourcePathProvider("test", "/", null)
+ };
+
+ // Act & Assert
+ config.Validate(); // Should not throw
+ }
+
+ [Fact]
+ public void Validate_NullBrokerHost_ThrowsArgumentException()
+ {
+ // Arrange
+ var config = new MqttServerConfiguration
+ {
+ BrokerHost = null!,
+ BrokerPort = 1883,
+ PathProvider = new AttributeBasedSourcePathProvider("test", "/", null)
+ };
+
+ // Act & Assert
+ Assert.Throws(() => config.Validate());
+ }
+
+ [Fact]
+ public void Validate_NullPathProvider_ThrowsArgumentException()
+ {
+ // Arrange
+ var config = new MqttServerConfiguration
+ {
+ BrokerHost = "localhost",
+ BrokerPort = 1883,
+ PathProvider = null!
+ };
+
+ // Act & Assert
+ Assert.Throws(() => config.Validate());
+ }
+
+ [Fact]
+ public void DefaultValues_AreCorrect()
+ {
+ // Arrange & Act
+ var config = new MqttServerConfiguration
+ {
+ BrokerHost = "localhost",
+ PathProvider = new AttributeBasedSourcePathProvider("test", "/", null)
+ };
+
+ // Assert
+ Assert.Equal(1883, config.BrokerPort);
+ Assert.Equal(10000, config.MaxPendingMessagesPerClient);
+ Assert.NotNull(config.ValueConverter);
+ }
+}
diff --git a/src/Namotion.Interceptor.Mqtt.Tests/Namotion.Interceptor.Mqtt.Tests.csproj b/src/Namotion.Interceptor.Mqtt.Tests/Namotion.Interceptor.Mqtt.Tests.csproj
new file mode 100644
index 00000000..fab37b05
--- /dev/null
+++ b/src/Namotion.Interceptor.Mqtt.Tests/Namotion.Interceptor.Mqtt.Tests.csproj
@@ -0,0 +1,31 @@
+
+
+
+ net9.0
+ enable
+ enable
+ false
+ true
+
+
+
+
+
+
+
+ runtime; build; native; contentfiles; analyzers; buildtransitive
+ all
+
+
+ runtime; build; native; contentfiles; analyzers; buildtransitive
+ all
+
+
+
+
+
+
+
+
+
+
diff --git a/src/Namotion.Interceptor.Mqtt/Client/MqttClientConfiguration.cs b/src/Namotion.Interceptor.Mqtt/Client/MqttClientConfiguration.cs
new file mode 100644
index 00000000..1bf572ca
--- /dev/null
+++ b/src/Namotion.Interceptor.Mqtt/Client/MqttClientConfiguration.cs
@@ -0,0 +1,213 @@
+using System;
+using MQTTnet.Protocol;
+using Namotion.Interceptor.Sources.Paths;
+
+namespace Namotion.Interceptor.Mqtt.Client;
+
+///
+/// Configuration for MQTT client source.
+///
+public class MqttClientConfiguration
+{
+ ///
+ /// Gets or sets the MQTT broker hostname or IP address.
+ ///
+ public required string BrokerHost { get; init; }
+
+ ///
+ /// Gets or sets the MQTT broker port. Default is 1883.
+ ///
+ public int BrokerPort { get; init; } = 1883;
+
+ ///
+ /// Gets or sets the username for broker authentication.
+ ///
+ public string? Username { get; init; }
+
+ ///
+ /// Gets or sets the password for broker authentication.
+ ///
+ public string? Password { get; init; }
+
+ ///
+ /// Gets or sets whether to use TLS/SSL for the connection. Default is false.
+ ///
+ public bool UseTls { get; init; }
+
+ ///
+ /// Gets or sets the client identifier. Default is a unique GUID-based identifier.
+ ///
+ public string ClientId { get; init; } = $"Namotion_{Guid.NewGuid():N}";
+
+ ///
+ /// Gets or sets whether to use a clean session. Default is true.
+ /// When false, the broker preserves subscriptions and queued messages across reconnects.
+ ///
+ public bool CleanSession { get; init; } = true;
+
+ ///
+ /// Gets or sets the optional topic prefix. When set, all topics are prefixed with this value.
+ ///
+ public string? TopicPrefix { get; init; }
+
+ ///
+ /// Gets or sets the source path provider for property-to-topic mapping.
+ ///
+ public required ISourcePathProvider PathProvider { get; init; }
+
+ // QoS settings
+
+ ///
+ /// Gets or sets the default QoS level for publish/subscribe operations. Default is AtMostOnce (0) for high throughput.
+ ///
+ public MqttQualityOfServiceLevel DefaultQualityOfService { get; init; } = MqttQualityOfServiceLevel.AtMostOnce;
+
+ ///
+ /// Gets or sets whether to use retained messages. Default is true.
+ /// Retained messages enable initial state loading.
+ ///
+ public bool UseRetainedMessages { get; init; } = true;
+
+ ///
+ /// Gets or sets the connection timeout. Default is 10 seconds.
+ ///
+ public TimeSpan ConnectTimeout { get; init; } = TimeSpan.FromSeconds(10);
+
+ ///
+ /// Gets or sets the initial delay before attempting to reconnect. Default is 2 seconds.
+ ///
+ public TimeSpan ReconnectDelay { get; init; } = TimeSpan.FromSeconds(2);
+
+ ///
+ /// Gets or sets the maximum delay between reconnection attempts (for exponential backoff). Default is 60 seconds.
+ ///
+ public TimeSpan MaximumReconnectDelay { get; init; } = TimeSpan.FromSeconds(60);
+
+ ///
+ /// Gets or sets the keep-alive interval. Default is 15 seconds.
+ ///
+ public TimeSpan KeepAliveInterval { get; init; } = TimeSpan.FromSeconds(15);
+
+ ///
+ /// Gets or sets the interval for connection health checks. Default is 30 seconds.
+ /// Health checks use TryPingAsync to verify the connection is still alive.
+ ///
+ public TimeSpan HealthCheckInterval { get; init; } = TimeSpan.FromSeconds(30);
+
+ ///
+ /// Gets or sets the maximum number of health check iterations while reconnecting before forcing a reset.
+ /// Default is 10 iterations. Combined with HealthCheckInterval, this provides a stall timeout.
+ /// Example: 10 iterations × 30s = 5 minutes timeout for hung reconnection attempts.
+ /// Set to 0 to disable stall detection.
+ ///
+ public int ReconnectStallThreshold { get; init; } = 10;
+
+ ///
+ /// Gets or sets the number of consecutive connection failures before the circuit breaker opens.
+ /// Default is 5 failures. When open, reconnection attempts are paused for the cooldown period.
+ /// Set to 0 to disable circuit breaker.
+ ///
+ public int CircuitBreakerFailureThreshold { get; init; } = 5;
+
+ ///
+ /// Gets or sets the cooldown period after the circuit breaker opens. Default is 60 seconds.
+ /// During cooldown, no reconnection attempts are made. After cooldown, one retry is allowed.
+ ///
+ public TimeSpan CircuitBreakerCooldown { get; init; } = TimeSpan.FromSeconds(60);
+
+ ///
+ /// Gets or sets the time to buffer property changes before sending. Default is 8ms.
+ ///
+ public TimeSpan BufferTime { get; init; } = TimeSpan.FromMilliseconds(8);
+
+ ///
+ /// Gets or sets the time between retry attempts for failed writes. Default is 10 seconds.
+ ///
+ public TimeSpan RetryTime { get; init; } = TimeSpan.FromSeconds(10);
+
+ ///
+ /// Gets or sets the maximum number of writes to queue during disconnection. Default is 1000.
+ /// Set to 0 to disable write buffering.
+ ///
+ public int WriteRetryQueueSize { get; init; } = 1000;
+
+ ///
+ /// Gets or sets the value converter for serialization/deserialization. Default is JSON.
+ ///
+ public IMqttValueConverter ValueConverter { get; init; } = new JsonMqttValueConverter();
+
+ ///
+ /// Gets or sets the MQTT user property name for the source timestamp. Default is "ts".
+ /// Set to null to disable timestamp extraction.
+ ///
+ public string? SourceTimestampPropertyName { get; init; } = "ts";
+
+ ///
+ /// Gets or sets the converter function for serializing timestamps to strings.
+ /// Default converts to Unix milliseconds.
+ ///
+ public Func SourceTimestampConverter { get; init; } =
+ static timestamp => timestamp.ToUnixTimeMilliseconds().ToString();
+
+ ///
+ /// Validates the configuration and throws if invalid.
+ ///
+ /// Thrown when configuration is invalid.
+ public void Validate()
+ {
+ if (string.IsNullOrWhiteSpace(BrokerHost))
+ {
+ throw new ArgumentException("BrokerHost must be specified.", nameof(BrokerHost));
+ }
+
+ if (BrokerPort is < 1 or > 65535)
+ {
+ throw new ArgumentException($"BrokerPort must be between 1 and 65535, got: {BrokerPort}", nameof(BrokerPort));
+ }
+
+ if (PathProvider is null)
+ {
+ throw new ArgumentException("PathProvider must be specified.", nameof(PathProvider));
+ }
+
+ if (ConnectTimeout <= TimeSpan.Zero)
+ {
+ throw new ArgumentException($"ConnectTimeout must be positive, got: {ConnectTimeout}", nameof(ConnectTimeout));
+ }
+
+ if (ReconnectDelay <= TimeSpan.Zero)
+ {
+ throw new ArgumentException($"ReconnectDelay must be positive, got: {ReconnectDelay}", nameof(ReconnectDelay));
+ }
+
+ if (MaximumReconnectDelay < ReconnectDelay)
+ {
+ throw new ArgumentException($"MaxReconnectDelay must be >= ReconnectDelay, got: {MaximumReconnectDelay}", nameof(MaximumReconnectDelay));
+ }
+
+ if (WriteRetryQueueSize < 0)
+ {
+ throw new ArgumentException($"WriteRetryQueueSize must be non-negative, got: {WriteRetryQueueSize}", nameof(WriteRetryQueueSize));
+ }
+
+ if (ValueConverter is null)
+ {
+ throw new ArgumentException("ValueConverter must be specified.", nameof(ValueConverter));
+ }
+
+ if (ReconnectStallThreshold < 0)
+ {
+ throw new ArgumentException($"ReconnectStallThreshold must be non-negative, got: {ReconnectStallThreshold}", nameof(ReconnectStallThreshold));
+ }
+
+ if (CircuitBreakerFailureThreshold < 0)
+ {
+ throw new ArgumentException($"CircuitBreakerFailureThreshold must be non-negative, got: {CircuitBreakerFailureThreshold}", nameof(CircuitBreakerFailureThreshold));
+ }
+
+ if (CircuitBreakerCooldown <= TimeSpan.Zero && CircuitBreakerFailureThreshold > 0)
+ {
+ throw new ArgumentException($"CircuitBreakerCooldown must be positive when circuit breaker is enabled, got: {CircuitBreakerCooldown}", nameof(CircuitBreakerCooldown));
+ }
+ }
+}
diff --git a/src/Namotion.Interceptor.Mqtt/Client/MqttConnectionLifetime.cs b/src/Namotion.Interceptor.Mqtt/Client/MqttConnectionLifetime.cs
new file mode 100644
index 00000000..96a10997
--- /dev/null
+++ b/src/Namotion.Interceptor.Mqtt/Client/MqttConnectionLifetime.cs
@@ -0,0 +1,32 @@
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Namotion.Interceptor.Mqtt.Client;
+
+internal sealed class MqttConnectionLifetime : IDisposable, IAsyncDisposable
+{
+ private readonly Func _disposeAsync;
+ private int _disposed;
+
+ public MqttConnectionLifetime(Func disposeAsync)
+ {
+ _disposeAsync = disposeAsync;
+ }
+
+ public void Dispose()
+ {
+ if (Interlocked.Exchange(ref _disposed, 1) == 0)
+ {
+ _disposeAsync().GetAwaiter().GetResult();
+ }
+ }
+
+ public async ValueTask DisposeAsync()
+ {
+ if (Interlocked.Exchange(ref _disposed, 1) == 0)
+ {
+ await _disposeAsync().ConfigureAwait(false);
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/Namotion.Interceptor.Mqtt/Client/MqttConnectionMonitor.cs b/src/Namotion.Interceptor.Mqtt/Client/MqttConnectionMonitor.cs
new file mode 100644
index 00000000..9845e229
--- /dev/null
+++ b/src/Namotion.Interceptor.Mqtt/Client/MqttConnectionMonitor.cs
@@ -0,0 +1,244 @@
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+using Microsoft.Extensions.Logging;
+using MQTTnet;
+using Namotion.Interceptor.Sources.Resilience;
+
+namespace Namotion.Interceptor.Mqtt.Client;
+
+///
+/// Monitors MQTT client connection health and handles reconnection with exponential backoff.
+/// Uses a hybrid approach: events trigger immediate action, but actual reconnection happens in monitoring task.
+///
+internal sealed class MqttConnectionMonitor : IAsyncDisposable
+{
+ private readonly IMqttClient _client;
+ private readonly MqttClientConfiguration _configuration;
+ private readonly ILogger _logger;
+
+ private readonly Func _optionsBuilder;
+ private readonly Func _onReconnected;
+ private readonly Func _onDisconnected;
+
+ private readonly SemaphoreSlim _reconnectSignal = new(0, 1);
+ private readonly CircuitBreaker? _circuitBreaker;
+
+ private int _isReconnecting;
+ private int _reconnectingIterations; // Tracks health check iterations while reconnecting (for stall detection)
+ private int _disposed;
+
+ public MqttConnectionMonitor(
+ IMqttClient client,
+ MqttClientConfiguration configuration,
+ Func optionsBuilder,
+ Func onReconnected,
+ Func onDisconnected,
+ ILogger logger)
+ {
+ _client = client ?? throw new ArgumentNullException(nameof(client));
+ _configuration = configuration ?? throw new ArgumentNullException(nameof(configuration));
+ _logger = logger ?? throw new ArgumentNullException(nameof(logger));
+ _optionsBuilder = optionsBuilder ?? throw new ArgumentNullException(nameof(optionsBuilder));
+ _onReconnected = onReconnected ?? throw new ArgumentNullException(nameof(onReconnected));
+ _onDisconnected = onDisconnected ?? throw new ArgumentNullException(nameof(onDisconnected));
+
+ // Initialize circuit breaker if enabled
+ if (configuration.CircuitBreakerFailureThreshold > 0)
+ {
+ _circuitBreaker = new CircuitBreaker(
+ configuration.CircuitBreakerFailureThreshold,
+ configuration.CircuitBreakerCooldown);
+ }
+ }
+
+ ///
+ /// Signals that a reconnection is needed (called by DisconnectedAsync event handler in MqttSubjectClientSource).
+ ///
+ public void SignalReconnectNeeded()
+ {
+ try
+ {
+ // Only signal if not already signaled (semaphore maxCount is 1)
+ _reconnectSignal.Release();
+ }
+ catch (SemaphoreFullException)
+ {
+ // Already signaled, ignore
+ }
+ catch (ObjectDisposedException)
+ {
+ // Monitor is being disposed, ignore (race between disconnect event and disposal)
+ }
+ }
+
+ ///
+ /// Monitors connection health and performs reconnection with exponential backoff, circuit breaker, and stall detection.
+ /// This is a blocking method that runs until a cancellation is requested.
+ /// Uses hybrid approach: Waits for disconnect event OR periodic health check.
+ ///
+ /// Cancellation token to stop monitoring.
+ public async Task MonitorConnectionAsync(CancellationToken cancellationToken)
+ {
+ var healthCheckInterval = _configuration.HealthCheckInterval;
+ var maxDelay = _configuration.MaximumReconnectDelay;
+ var stallThreshold = _configuration.ReconnectStallThreshold;
+
+ while (!cancellationToken.IsCancellationRequested)
+ {
+ try
+ {
+ // Wait for either: Disconnect event signal OR periodic health check timeout
+ var signaled = await _reconnectSignal.WaitAsync(healthCheckInterval, cancellationToken).ConfigureAwait(false);
+ if (signaled)
+ {
+ _logger.LogWarning("MQTT disconnect event received.");
+ await _onDisconnected().ConfigureAwait(false);
+ }
+ else
+ {
+ // Periodic health check: Use TryPingAsync (recommended by MQTTnet)
+ var isHealthy = _client.IsConnected && await _client.TryPingAsync(cancellationToken).ConfigureAwait(false);
+ if (isHealthy)
+ {
+ // Connection healthy - reset stall detection counter
+ Interlocked.Exchange(ref _reconnectingIterations, 0);
+ continue;
+ }
+
+ _logger.LogWarning("MQTT health check failed.");
+ }
+
+ // Stall detection: Check if reconnection is hung
+ if (Volatile.Read(ref _isReconnecting) == 1 && stallThreshold > 0)
+ {
+ var iterations = Interlocked.Increment(ref _reconnectingIterations);
+ if (iterations > stallThreshold)
+ {
+ // Timeout: iterations × health check interval (e.g., 10 × 30s = 5 minutes)
+ _logger.LogError(
+ "Reconnection stalled after {Iterations} iterations (~{Timeout}s). Forcing reset.",
+ iterations,
+ (int)(iterations * healthCheckInterval.TotalSeconds));
+
+ // Force reset reconnection flag to allow recovery
+ Interlocked.Exchange(ref _isReconnecting, 0);
+ Interlocked.Exchange(ref _reconnectingIterations, 0);
+
+ // Reset circuit breaker to allow immediate retry
+ _circuitBreaker?.Reset();
+ }
+ }
+
+ if (!_client.IsConnected)
+ {
+ if (Interlocked.Exchange(ref _isReconnecting, 1) == 1)
+ {
+ continue; // Already reconnecting
+ }
+
+ try
+ {
+ var reconnectDelay = _configuration.ReconnectDelay;
+ while (!cancellationToken.IsCancellationRequested)
+ {
+ // Check circuit breaker
+ if (_circuitBreaker is not null && !_circuitBreaker.ShouldAttempt())
+ {
+ var cooldownRemaining = _circuitBreaker.GetCooldownRemaining();
+ _logger.LogWarning(
+ "Circuit breaker open after {TripCount} trips. Pausing reconnection attempts for {Cooldown}s.",
+ _circuitBreaker.TripCount,
+ (int)cooldownRemaining.TotalSeconds);
+
+ // Wait for cooldown period (or until cancellation)
+ await Task.Delay(cooldownRemaining, cancellationToken).ConfigureAwait(false);
+ continue;
+ }
+
+ try
+ {
+ _logger.LogInformation("Attempting to reconnect to MQTT broker in {Delay}...", reconnectDelay);
+ await Task.Delay(reconnectDelay, cancellationToken).ConfigureAwait(false);
+
+ var options = _optionsBuilder();
+ await _client.ConnectAsync(options, cancellationToken).ConfigureAwait(false);
+
+ _logger.LogInformation("Reconnected to MQTT broker successfully.");
+ await _onReconnected(cancellationToken).ConfigureAwait(false);
+
+ // Success - close circuit breaker and reset counters
+ _circuitBreaker?.RecordSuccess();
+ Interlocked.Exchange(ref _reconnectingIterations, 0);
+ break;
+ }
+ catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
+ {
+ _logger.LogInformation("Reconnection cancelled due to shutdown.");
+ break;
+ }
+ catch (Exception ex)
+ {
+ var isTransient = MqttExceptionClassifier.IsTransient(ex);
+ var description = MqttExceptionClassifier.GetFailureDescription(ex);
+
+ if (isTransient)
+ {
+ _logger.LogError(ex,
+ "Failed to reconnect to MQTT broker: {Description}.",
+ description);
+ }
+ else
+ {
+ _logger.LogError(ex,
+ "Permanent connection failure detected: {Description}. " +
+ "Reconnection will be retried, but this likely requires configuration changes.",
+ description);
+ }
+
+ // Record failure in circuit breaker
+ if (_circuitBreaker is not null && _circuitBreaker.RecordFailure())
+ {
+ _logger.LogWarning(
+ "Circuit breaker tripped after {Threshold} consecutive failures. " +
+ "Pausing reconnection attempts for {Cooldown}s.",
+ _configuration.CircuitBreakerFailureThreshold,
+ (int)_configuration.CircuitBreakerCooldown.TotalSeconds);
+ }
+
+ // Exponential backoff with jitter
+ var jitter = Random.Shared.NextDouble() * 0.1 + 0.95; // 0.95 to 1.05
+ reconnectDelay = TimeSpan.FromMilliseconds(
+ Math.Min(reconnectDelay.TotalMilliseconds * 2 * jitter, maxDelay.TotalMilliseconds));
+ }
+ }
+ }
+ finally
+ {
+ Interlocked.Exchange(ref _isReconnecting, 0);
+ }
+ }
+ }
+ catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
+ {
+ _logger.LogInformation("Connection monitoring cancelled due to shutdown.");
+ break;
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, "Error in connection monitoring.");
+ }
+ }
+ }
+
+ public ValueTask DisposeAsync()
+ {
+ if (Interlocked.Exchange(ref _disposed, 1) == 1)
+ {
+ return ValueTask.CompletedTask;
+ }
+
+ _reconnectSignal.Dispose();
+ return ValueTask.CompletedTask;
+ }
+}
diff --git a/src/Namotion.Interceptor.Mqtt/Client/MqttExceptionClassifier.cs b/src/Namotion.Interceptor.Mqtt/Client/MqttExceptionClassifier.cs
new file mode 100644
index 00000000..29f65450
--- /dev/null
+++ b/src/Namotion.Interceptor.Mqtt/Client/MqttExceptionClassifier.cs
@@ -0,0 +1,80 @@
+using System;
+using System.Net.Sockets;
+using MQTTnet.Exceptions;
+
+namespace Namotion.Interceptor.Mqtt.Client;
+
+///
+/// Classifies MQTT exceptions as transient (retryable) or permanent (configuration/design-time errors).
+///
+internal static class MqttExceptionClassifier
+{
+ ///
+ /// Determines if an exception represents a transient failure that can be retried.
+ /// Returns true for transient errors (network issues, broker temporarily unavailable),
+ /// false for permanent errors (bad credentials, invalid configuration).
+ ///
+ public static bool IsTransient(Exception exception)
+ {
+ return exception switch
+ {
+ // Authentication failures - permanent
+ MqttCommunicationException { InnerException: SocketException { SocketErrorCode: SocketError.ConnectionRefused } } => false,
+
+ // Invalid configuration - permanent
+ ArgumentNullException => false,
+ ArgumentException => false,
+ InvalidOperationException ex when ex.Message.Contains("not allowed to connect", StringComparison.OrdinalIgnoreCase) => false,
+
+ // DNS resolution failures - permanent (invalid hostname)
+ SocketException { SocketErrorCode: SocketError.HostNotFound } => false,
+ SocketException { SocketErrorCode: SocketError.NoData } => false,
+
+ // TLS/Certificate failures - potentially permanent (depends on configuration)
+ MqttCommunicationException ex when ex.Message.Contains("TLS", StringComparison.OrdinalIgnoreCase) => false,
+ MqttCommunicationException ex when ex.Message.Contains("certificate", StringComparison.OrdinalIgnoreCase) => false,
+ MqttCommunicationException ex when ex.Message.Contains("authentication", StringComparison.OrdinalIgnoreCase) => false,
+
+ // All other exceptions are considered transient (network issues, timeout, etc.)
+ _ => true
+ };
+ }
+
+ ///
+ /// Gets a user-friendly description of the failure type for logging.
+ ///
+ public static string GetFailureDescription(Exception exception)
+ {
+ return exception switch
+ {
+ MqttCommunicationException { InnerException: SocketException { SocketErrorCode: SocketError.ConnectionRefused } }
+ => "Connection refused - broker may be down or authentication failed",
+
+ SocketException { SocketErrorCode: SocketError.HostNotFound }
+ => "Host not found - invalid broker hostname",
+
+ SocketException { SocketErrorCode: SocketError.NoData }
+ => "No DNS data - invalid broker hostname",
+
+ MqttCommunicationException ex when ex.Message.Contains("TLS", StringComparison.OrdinalIgnoreCase)
+ => "TLS connection failed - check certificate configuration",
+
+ MqttCommunicationException ex when ex.Message.Contains("certificate", StringComparison.OrdinalIgnoreCase)
+ => "Certificate validation failed - check TLS configuration",
+
+ MqttCommunicationException ex when ex.Message.Contains("authentication", StringComparison.OrdinalIgnoreCase)
+ => "Authentication failed - check username/password",
+
+ InvalidOperationException ex when ex.Message.Contains("not allowed to connect", StringComparison.OrdinalIgnoreCase)
+ => "Connection state error - client already connected or connecting",
+
+ SocketException socketEx
+ => $"Network error: {socketEx.SocketErrorCode}",
+
+ OperationCanceledException
+ => "Operation cancelled",
+
+ _ => exception.GetType().Name
+ };
+ }
+}
diff --git a/src/Namotion.Interceptor.Mqtt/Client/MqttSubjectClientSource.cs b/src/Namotion.Interceptor.Mqtt/Client/MqttSubjectClientSource.cs
new file mode 100644
index 00000000..6c6b4e9d
--- /dev/null
+++ b/src/Namotion.Interceptor.Mqtt/Client/MqttSubjectClientSource.cs
@@ -0,0 +1,390 @@
+using System;
+using System.Buffers;
+using System.Collections.Concurrent;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using Microsoft.Extensions.Hosting;
+using Microsoft.Extensions.Logging;
+using MQTTnet;
+using MQTTnet.Packets;
+using Namotion.Interceptor.Registry;
+using Namotion.Interceptor.Registry.Abstractions;
+using Namotion.Interceptor.Sources;
+using Namotion.Interceptor.Sources.Paths;
+using Namotion.Interceptor.Tracking.Change;
+
+namespace Namotion.Interceptor.Mqtt.Client;
+
+///
+/// MQTT client source that subscribes to an MQTT broker and synchronizes properties.
+///
+internal sealed class MqttSubjectClientSource : BackgroundService, ISubjectSource, IAsyncDisposable
+{
+ private readonly IInterceptorSubject _subject;
+ private readonly MqttClientConfiguration _configuration;
+ private readonly ILogger _logger;
+
+ private readonly MqttClientFactory _factory;
+
+ // TODO(memory): Might lead to memory leaks
+ private readonly ConcurrentDictionary _topicToProperty = new();
+ private readonly ConcurrentDictionary _propertyToTopic = new();
+
+ private IMqttClient? _client;
+ private SubjectPropertyWriter? _propertyWriter;
+ private MqttConnectionMonitor? _connectionMonitor;
+
+ private int _disposed;
+ private volatile bool _isStarted;
+
+ public MqttSubjectClientSource(
+ IInterceptorSubject subject,
+ MqttClientConfiguration configuration,
+ ILogger logger)
+ {
+ _subject = subject ?? throw new ArgumentNullException(nameof(subject));
+ _configuration = configuration ?? throw new ArgumentNullException(nameof(configuration));
+ _logger = logger ?? throw new ArgumentNullException(nameof(logger));
+
+ _factory = new MqttClientFactory();
+
+ configuration.Validate();
+ }
+
+ ///
+ public bool IsPropertyIncluded(RegisteredSubjectProperty property) =>
+ _configuration.PathProvider.IsPropertyIncluded(property);
+
+ ///
+ public int WriteBatchSize => 0; // No server-imposed limit for MQTT
+
+ ///
+ public async Task StartListeningAsync(SubjectPropertyWriter propertyWriter, CancellationToken cancellationToken)
+ {
+ _propertyWriter = propertyWriter;
+ _logger.LogInformation("Connecting to MQTT broker at {Host}:{Port}.", _configuration.BrokerHost, _configuration.BrokerPort);
+
+ _client = _factory.CreateMqttClient();
+ _client.ApplicationMessageReceivedAsync += OnMessageReceivedAsync;
+ _client.DisconnectedAsync += OnDisconnectedAsync;
+
+ await _client.ConnectAsync(GetClientOptions(), cancellationToken).ConfigureAwait(false);
+ _logger.LogInformation("Connected to MQTT broker successfully.");
+
+ await SubscribeToPropertiesAsync(cancellationToken).ConfigureAwait(false);
+
+ _connectionMonitor = new MqttConnectionMonitor(
+ _client,
+ _configuration,
+ GetClientOptions,
+ async ct => await OnReconnectedAsync(ct).ConfigureAwait(false),
+ async () =>
+ {
+ _propertyWriter?.StartBuffering();
+ await Task.CompletedTask;
+ }, _logger);
+
+ _isStarted = true;
+
+ return new MqttConnectionLifetime(async () =>
+ {
+ if (_client?.IsConnected == true)
+ {
+ await _client.DisconnectAsync(cancellationToken: cancellationToken).ConfigureAwait(false);
+ }
+ });
+ }
+
+ ///
+ public Task LoadInitialStateAsync(CancellationToken cancellationToken)
+ {
+ // Retained messages are received through the normal message handler: No separate initial load needed/possible
+ return Task.FromResult(null);
+ }
+
+ ///
+ public async ValueTask WriteChangesAsync(ReadOnlyMemory changes, CancellationToken cancellationToken)
+ {
+ var client = _client;
+ if (client is null || !client.IsConnected)
+ {
+ throw new InvalidOperationException("MQTT client is not connected.");
+ }
+
+ var length = changes.Length;
+ if (length == 0) return;
+
+ // Rent array from pool for messages
+ var messagesPool = ArrayPool.Shared;
+ var messages = messagesPool.Rent(length);
+ var messageCount = 0;
+ try
+ {
+ var changesSpan = changes.Span;
+
+ // Build all messages first
+ for (var i = 0; i < length; i++)
+ {
+ var change = changesSpan[i];
+ var property = change.Property.TryGetRegisteredProperty();
+ if (property is null || property.HasChildSubjects)
+ {
+ continue;
+ }
+
+ var topic = TryGetTopicForProperty(change.Property, property);
+ if (topic is null) continue;
+
+ byte[] payload;
+ try
+ {
+ payload = _configuration.ValueConverter.Serialize(
+ change.GetNewValue