diff --git a/Source/AwsCommonRuntimeKit/mqtt/Mqtt5Packets.swift b/Source/AwsCommonRuntimeKit/mqtt/Mqtt5Packets.swift index dfc02321..94dc7d91 100644 --- a/Source/AwsCommonRuntimeKit/mqtt/Mqtt5Packets.swift +++ b/Source/AwsCommonRuntimeKit/mqtt/Mqtt5Packets.swift @@ -66,6 +66,12 @@ func convertOptionalUserProperties( return userProperties } +extension UserProperty: Equatable { + static public func == (lhs: UserProperty, rhs: UserProperty) -> Bool { + return lhs.name == rhs.name && lhs.value == rhs.value + } +} + // We can't mutate this class after initialization. Swift can not verify the sendability due to the class is non-final, // so mark it unchecked Sendable /// Data model of an `MQTT5 PUBLISH `_ packet diff --git a/Source/AwsCommonRuntimeKit/mqtt/MqttRequestResponseClient.swift b/Source/AwsCommonRuntimeKit/mqtt/MqttRequestResponseClient.swift index 812cae4c..fe6d2012 100644 --- a/Source/AwsCommonRuntimeKit/mqtt/MqttRequestResponseClient.swift +++ b/Source/AwsCommonRuntimeKit/mqtt/MqttRequestResponseClient.swift @@ -4,25 +4,16 @@ import AwsCMqtt import LibNative import Foundation -/** - * The type of change to the state of a streaming operation subscription - */ +/// The type of change to the state of a streaming operation subscription public enum SubscriptionStatusEventType: Sendable { - /** - * The streaming operation is successfully subscribed to its topic (filter) - */ + /// The streaming operation is successfully subscribed to its topic (filter) case established - /** - * The streaming operation has temporarily lost its subscription to its topic (filter) - */ + /// The streaming operation has temporarily lost its subscription to its topic (filter) case lost - /** - * The streaming operation has entered a terminal state where it has given up trying to subscribe - * to its topic (filter). This is always due to user error (bad topic filter or IoT Core permission - * policy). - */ + /// The streaming operation has entered a terminal state where it has given up trying to subscribe + /// to its topic (filter). This is always due to user error (bad topic filter or IoT Core permission policy). case halted } @@ -60,17 +51,38 @@ public struct SubscriptionStatusEvent: Sendable { public let error: CRTError? } -// TODO: Igor has updated the events for IoT Command. Need update later /// An event that describes an incoming publish message received on a streaming operation. public struct IncomingPublishEvent: Sendable { - /// The payload of the publish message in a byte buffer format - let payload: Data - /// The topic associated with this PUBLISH packet. - let topic: String + public let topic: String + + /// The raw bytes of the publish message payload. + public let payload: Data + + /// (Optional) The content type of the payload + public let contentType: String? + + /// (Optional) User Properties, if there is no user property, the array will be an empty array. + public let userProperties: [UserProperty] + + /// (Optional) Some services use this field to specify client-side timeouts. + public let messageExpiryInterval: TimeInterval? - // TODO: More options for IoT Command changes + /// internal constructor that initializes an IncomingPublishEvent from a native aws_mqtt_rr_incoming_publish_event + init(_ raw_publish_event: UnsafePointer) { + let publish_event = raw_publish_event.pointee + + self.topic = publish_event.topic.toString() + self.payload = Data(bytes: publish_event.payload.ptr, count: publish_event.payload.len) + self.messageExpiryInterval = (publish_event.message_expiry_interval_seconds?.pointee).map { + TimeInterval($0) + } + self.contentType = publish_event.content_type?.pointee.toString() + self.userProperties = convertOptionalUserProperties( + count: publish_event.user_property_count, + userPropertiesPointer: publish_event.user_properties) + } } /// Function signature of a SubscriptionStatusEvent event handler @@ -79,11 +91,14 @@ public typealias SubscriptionStatusEventHandler = @Sendable (SubscriptionStatusE /// Function signature of an IncomingPublishEvent event handler public typealias IncomingPublishEventHandler = @Sendable (IncomingPublishEvent) -> Void -/// Encapsulates a response to an AWS IoT Core MQTT-based service request +/// A response to an AWS IoT Core MQTT-based service request public struct MqttRequestResponse: Sendable { + /// The MQTT Topic that the response was received on. public let topic: String + /// Payload of the response that correlates to a submitted request. public let payload: Data + /// internal constructor that initializes an IncomingPublishEvent from a native aws_mqtt_rr_incoming_publish_event init(_ raw_publish_event: UnsafePointer) { let publish_event = raw_publish_event.pointee self.topic = publish_event.topic.toString() @@ -91,15 +106,22 @@ public struct MqttRequestResponse: Sendable { } } -// We can't mutate this class after initialization. Swift can not verify the sendability due to direct use of c pointer, +// We can't mutate this class after initialization. Swift can not verify the sendable due to direct use of c pointer, // so mark it unchecked Sendable /// A response path is a pair of values - MQTT topic and a JSON path - that describe where a response to /// an MQTT-based request may arrive. For a given request type, there may be multiple response paths and each /// one is associated with a separate JSON schema for the response body. -public class ResponsePath: CStruct, @unchecked Sendable { +final public class ResponsePath: CStruct, @unchecked Sendable { + /// MQTT topic that a response may arrive on. let topic: String + /// JSON path for finding correlation tokens within payloads that arrive on this path's topic. let correlationTokenJsonPath: String + /// Create a response path + /// + /// - Parameters: + /// - topic: MQTT topic that a response may arrive on. + /// - correlationTokenJsonPath: JSON path for finding correlation tokens within payloads that arrive on this path's topic. public init(topic: String, correlationTokenJsonPath: String) { self.topic = topic self.correlationTokenJsonPath = correlationTokenJsonPath @@ -134,12 +156,31 @@ public class ResponsePath: CStruct, @unchecked Sendable { /// Configuration options for request response operation public struct RequestResponseOperationOptions: CStructWithUserData, Sendable { + /// Topic filters that should be subscribed to in order to cover all possible response paths. Sometimes we can use wildcards to + /// cut down on the subscriptions needed; sometimes we can't. let subscriptionTopicFilters: [String] + /// All possible response paths associated with this request type let responsePaths: [ResponsePath] + /// Topic to publish the request to once response subscriptions have been established. let topic: String + /// Payload to publish in order to initiate the request let payload: Data + /// Correlation token embedded in the request that must be found in a response message. This can be the empty cursor to support certain services + /// which don't use correlation tokens. In this case, the client only allows one request at a time to use the associated subscriptions; no concurrency + /// is possible. There are some optimizations we could make here but for now, they're not worth the complexity. let correlationToken: String? + /// Create a configuration options for request response operation + /// + /// - Parameters: + /// - subscriptionTopicFilters: Topic filters that should be subscribed to in order to cover all possible response paths. Sometimes we can use wildcards to cut + /// down on the subscriptions needed; sometimes we can't. + /// - responsePaths: All possible response paths associated with this request type. + /// - topic: Topic to publish the request to once response subscriptions have been established. + /// - payload: Payload to publish in order to initiate the request + /// - correlationToken: Correlation token embedded in the request that must be found in a response message. This can be the empty cursor to support certain + /// services which don't use correlation tokens. In this case, the client only allows one request at a time to use the associated subscriptions; no concurrency is + /// possible. There are some optimizations we could make here but for now, they're not worth the complexity. public init( subscriptionTopicFilters: [String], responsePaths: [ResponsePath], @@ -190,60 +231,166 @@ public struct RequestResponseOperationOptions: CStructWithUserData, Sendable { } } +private func MqttRRStreamingOperationTerminationCallback(_ userData: UnsafeMutableRawPointer?) { + // Termination callback. This is triggered when the native object is terminated. + // `takeRetainedValue()` will release the operation reference. IT SHOULD ONLY BE CALLED AFTER YOU RELEASE THE CLIENT` + _ = Unmanaged.fromOpaque(userData!).takeRetainedValue() +} + +private func MqttRRStreamingOperationIncomingPublishCallback( + _ publishEvent: UnsafePointer?, + _ userData: UnsafeMutableRawPointer? +) { + guard let userData, let publishEvent else { + // No userData, directly return + return + } + let operationCore = Unmanaged.fromOpaque(userData).takeUnretainedValue() + operationCore.rwlock.read { + // Only invoke the callback if the streaming operation is not closed. + if operationCore.rawValue != nil, operationCore.options.incomingPublishEventHandler != nil { + let incomingPublishEvent = IncomingPublishEvent(publishEvent) + operationCore.options.incomingPublishEventHandler(incomingPublishEvent) + } + } +} + +private func MqttRRStreamingOperationSubscriptionStatusCallback( + _ eventType: aws_rr_streaming_subscription_event_type, + _ errorCode: Int32, + _ userData: UnsafeMutableRawPointer? +) { + guard let userData else { + // No userData, directly return + return + } + let operationCore = Unmanaged.fromOpaque(userData).takeUnretainedValue() + operationCore.rwlock.read { + // Only invoke the callback if the streaming operation is not closed. + if operationCore.rawValue != nil, operationCore.options.subscriptionStatusEventHandler != nil { + let subStatusEvent = SubscriptionStatusEvent( + event: SubscriptionStatusEventType(eventType), + error: errorCode == 0 ? nil : CRTError(code: Int32(errorCode))) + operationCore.options.subscriptionStatusEventHandler(subStatusEvent) + } + } +} + /// Configuration options for streaming operations -public struct StreamingOperationOptions: CStruct, Sendable { - let subscriptionStatusEventHandler: SubscriptionStatusEventHandler - let incomingPublishEventHandler: IncomingPublishEventHandler - let topicFilter: String +public struct StreamingOperationOptions: CStructWithUserData, Sendable { + /// The MQTT topic that the streaming operation is "listening" to + public let topicFilter: String + /// The handler function a streaming operation will use for subscription status events. + public let subscriptionStatusEventHandler: SubscriptionStatusEventHandler + /// The handler function a streaming operation will use for the incoming messages + public let incomingPublishEventHandler: IncomingPublishEventHandler - public init() { - // TODO: INIT THE MEMBERS - self.subscriptionStatusEventHandler = { _ in return; } - self.incomingPublishEventHandler = { _ in return; } - self.topicFilter = "" + public init( + topicFilter: String, + subscriptionStatusCallback: @escaping SubscriptionStatusEventHandler, + incomingPublishCallback: @escaping IncomingPublishEventHandler + ) { + self.subscriptionStatusEventHandler = subscriptionStatusCallback + self.incomingPublishEventHandler = incomingPublishCallback + self.topicFilter = topicFilter } typealias RawType = aws_mqtt_streaming_operation_options - func withCStruct(_ body: (RawType) -> Result) -> Result { - // TODO: convert into aws_mqtt_request_operation_options - let options = aws_mqtt_streaming_operation_options() - return body(options) + func withCStruct( + userData: UnsafeMutableRawPointer?, _ body: (aws_mqtt_streaming_operation_options) -> Result + ) -> Result { + var options = aws_mqtt_streaming_operation_options() + options.incoming_publish_callback = MqttRRStreamingOperationIncomingPublishCallback + options.subscription_status_callback = MqttRRStreamingOperationSubscriptionStatusCallback + options.terminated_callback = MqttRRStreamingOperationTerminationCallback + return withByteCursorFromStrings(self.topicFilter) { topicFilterCursor in + options.topic_filter = topicFilterCursor + options.user_data = userData + return body(options) + } + } +} + +// IMPORTANT: You are responsible for concurrency correctness of StreamingOperationCore. +// The rawValue is mutable cross threads and protected by the rwlock. +/// The internal core of the streaming operation. It helps to handle the aws_mqtt_rr_client_operation termination. +private class StreamingOperationCore: @unchecked Sendable { + fileprivate var rawValue: OpaquePointer? // ? + fileprivate let rwlock = ReadWriteLock() + fileprivate let options: StreamingOperationOptions + + fileprivate init(streamOptions: StreamingOperationOptions, client: MqttRequestResponseClientCore) + throws + { + self.options = streamOptions + let rawValue = streamOptions.withCPointer( + userData: Unmanaged.passRetained(self).toOpaque() + ) { optionsPointer in + return aws_mqtt_request_response_client_create_streaming_operation( + client.rawValue, optionsPointer) + } + guard let rawValue else { + throw CommonRunTimeError.crtError(CRTError(code: aws_last_error())) + } + self.rawValue = rawValue } + /// Opens a streaming operation by making the appropriate MQTT subscription with the broker. + fileprivate func open() throws { + try rwlock.read { + if let rawValue = self.rawValue { + if aws_mqtt_rr_client_operation_activate(rawValue) != AWS_OP_SUCCESS { + throw CommonRunTimeError.crtError(CRTError(code: aws_last_error())) + } + } + } + } + + /// Closes the operation + fileprivate func close() { + rwlock.write { + aws_mqtt_rr_client_operation_release(self.rawValue) + self.rawValue = nil + } + } } -/// A streaming operation is automatically closed (and an MQTT unsubscribe triggered) when its -/// destructor is invoked. +/// The streaming operation, it is automatically closed (and an MQTT unsubscribe triggered) when its destructor is invoked. public class StreamingOperation { - fileprivate var rawValue: OpaquePointer? // ? + fileprivate var operationCore: StreamingOperationCore - public init() { - // TODO: INIT THE MEMBERS - self.rawValue = nil + /// Constructor. The end user should only create the operation through MqttRequestResponseClient->createStream() + fileprivate init(streamOptions: StreamingOperationOptions, client: MqttRequestResponseClientCore) + throws + { + self.operationCore = try StreamingOperationCore(streamOptions: streamOptions, client: client) } /// Opens a streaming operation by making the appropriate MQTT subscription with the broker. - public func open() { - // TODO: open the stream + /// - Throws: CommonRunTimeError + public func open() throws { + try self.operationCore.open() } deinit { - // TODO: close the oepration + self.operationCore.close() } } -// We can't mutate this class after initialization. Swift can not verify the sendability due to the class is non-final, -// so mark it unchecked Sendable -/// Request-response client configuration options -public class MqttRequestResponseClientOptions: CStructWithUserData, @unchecked Sendable { +/// MQTT-based request-response client configuration options +final public class MqttRequestResponseClientOptions: CStructWithUserData, Sendable { - /// Maximum number of subscriptions that the client will concurrently use for request-response operations. + /// Maximum number of request-response subscriptions the client allows to be concurrently active at any one point in time. When the client hits this threshold, + /// requests will be delayed until earlier requests complete and release their subscriptions. Each in-progress request will use either 1 or 2 MQTT subscriptions + /// until completion. public let maxRequestResponseSubscription: Int - /// Maximum number of subscriptions that the client will concurrently use for streaming operations. + /// Maximum number of concurrent streaming operation subscriptions that the client will allow. Each "unique" (different topic filter) streaming operation will use + /// 1 MQTT subscription. When the client hits this threshold, attempts to open new streaming operations will fail. public let maxStreamingSubscription: Int - /// Duration, in seconds, that a request-response operation will wait for completion before giving up. + /// The timeout value, in seconds, for a request-response operation. If a request is not complete by this time interval, the client will complete it as failed. + /// This time interval starts the instant the request is submitted to the client. public let operationTimeout: TimeInterval public init( @@ -282,7 +429,7 @@ public class MqttRequestResponseClientOptions: CStructWithUserData, @unchecked S } } -internal func MqttRRClientTerminationCallback(_ userData: UnsafeMutableRawPointer?) { +private func MqttRRClientTerminationCallback(_ userData: UnsafeMutableRawPointer?) { // Termination callback. This is triggered when the native client is terminated. // It is safe to release the request response client at this point. // `takeRetainedValue()` will release the client reference. IT SHOULD ONLY BE CALLED AFTER YOU RELEASE THE CLIENT. @@ -297,8 +444,9 @@ private func MqttRROperationCompletionCallback( guard let userData else { return } - let continuationCore = Unmanaged>.fromOpaque(userData) - .takeRetainedValue() + let continuationCore = Unmanaged>.fromOpaque( + userData + ).takeRetainedValue() if errorCode != AWS_OP_SUCCESS { return continuationCore.continuation.resume( throwing: CommonRunTimeError.crtError(CRTError(code: errorCode))) @@ -317,10 +465,11 @@ private func MqttRROperationCompletionCallback( // rawValue is only modified by the close() function, which is called exclusively in the // MqttRequestResponseClient destructor. At that point, no operations should be in progress. // Therefore, MqttRequestResponseClientCore can be considered thread-safe. -internal class MqttRequestResponseClientCore: @unchecked Sendable { +/// The internal core of the mqtt request response client. It helps to handle the native request response termination. +private class MqttRequestResponseClientCore: @unchecked Sendable { fileprivate var rawValue: OpaquePointer? // aws_mqtt_request_response_client - internal init(mqttClient: Mqtt5Client, options: MqttRequestResponseClientOptions) throws { + fileprivate init(mqttClient: Mqtt5Client, options: MqttRequestResponseClientOptions) throws { guard let rawValue = (options.withCPointer( @@ -337,13 +486,14 @@ internal class MqttRequestResponseClientCore: @unchecked Sendable { self.rawValue = rawValue } - /// submit a request responds operation, throws CRTError if the operation failed - public func submitRequest(operationOptions: RequestResponseOperationOptions) async throws + /// Submits a request responds operation, throws CRTError if the operation failed + fileprivate func submitRequest(operationOptions: RequestResponseOperationOptions) async throws -> MqttRequestResponse { try operationOptions.validateConversionToNative() return try await withCheckedThrowingContinuation { continuation in - let continuationCore = ContinuationCore(continuation: continuation) + let continuationCore = ContinuationCore( + continuation: continuation) operationOptions.withCPointer( userData: continuationCore.passRetained(), { optionsPointer in @@ -358,20 +508,22 @@ internal class MqttRequestResponseClientCore: @unchecked Sendable { } } - /// create a stream operation, throws CRTError if the creation failed. open() must be called on the operation to start the stream. - public func createStream(streamOptions: StreamingOperationOptions) throws -> StreamingOperation { - // TODO: create streamming operation - return StreamingOperation() + /// Creates a stream operation, throws CRTError if the creation failed. open() must be called on the operation to start the stream. + fileprivate func createStream(streamOptions: StreamingOperationOptions) throws + -> StreamingOperation + { + return try StreamingOperation(streamOptions: streamOptions, client: self) } - /// release the request response client. You must not use the client after calling `close()`. - public func close() { + /// Releases the request response client. You must not use the client after calling `close()`. + fileprivate func close() { aws_mqtt_request_response_client_release(self.rawValue) self.rawValue = nil } } +/// The MqttRequestResponseClient is a client that allows you to send requests and receive responses over MQTT. public class MqttRequestResponseClient { fileprivate var clientCore: MqttRequestResponseClientCore @@ -386,24 +538,25 @@ public class MqttRequestResponseClient { clientCore = try MqttRequestResponseClientCore(mqttClient: mqtt5Client, options: options) } - /// Submit a request responds operation, throws CRTError if the operation failed + /// Submits a generic request to the request-response client, throws CRTError if the operation failed /// /// - Parameters: /// - operationOptions: configuration options for request response operation /// - Returns: MqttRequestResponse - /// - Throws:CommonRuntimeError.crtError if submit failed + /// - Throws: CommonRuntimeError.crtError if submit failed public func submitRequest(operationOptions: RequestResponseOperationOptions) async throws -> MqttRequestResponse { return try await clientCore.submitRequest(operationOptions: operationOptions) } - /// Create a stream operation, throws CRTError if the creation failed. You would need call open() on the operation to start the stream + /// Creates a stream operation, throws CRTError if the creation failed. Streaming operations "listen" to a specific kind of service event and invoke handlers every time + /// one is received. You would need call open() on the operation to start the stream. + /// /// - Parameters: /// - streamOptions: Configuration options for streaming operations - /// - Returns: - /// - StreamingOperation - /// - Throws:CommonRuntimeError.crtError if creation failed + /// - Returns: StreamingOperation, you would need call open() on the operation to start the stream + /// - Throws: CommonRuntimeError.crtError if creation failed public func createStream(streamOptions: StreamingOperationOptions) throws -> StreamingOperation { return try clientCore.createStream(streamOptions: streamOptions) } diff --git a/Test/AwsCommonRuntimeKitTests/mqtt/MqttRRClientTests.swift b/Test/AwsCommonRuntimeKitTests/mqtt/MqttRRClientTests.swift index 8b8a04ed..44ba6198 100644 --- a/Test/AwsCommonRuntimeKitTests/mqtt/MqttRRClientTests.swift +++ b/Test/AwsCommonRuntimeKitTests/mqtt/MqttRRClientTests.swift @@ -11,10 +11,23 @@ class Mqtt5RRClientTests: XCBaseTestCase { final class MqttRRTestContext: @unchecked Sendable { let contextName: String + // The test context var responsePaths: [ResponsePath]? var correlationToken: String? var payload: Data? + // the lock is used to protect the data collected from the callbacks + let callbackRWLock = ReadWriteLock() + var subscriptionStatusEvent: SubscriptionStatusEvent? + var rrPublishEvent: [IncomingPublishEvent] = [] + + // rr events expectations + var subscriptionStatusSuccessExpectation: XCTestExpectation + var subscriptionStatusErrorExpectation: XCTestExpectation + var incomingPublishExpectation: XCTestExpectation + var onSubscriptionStatusUpdate: SubscriptionStatusEventHandler? + var onRRIncomingPublish: IncomingPublishEventHandler? + // protocol client context var publishReceivedExpectation: XCTestExpectation var publishTargetReachedExpectation: XCTestExpectation @@ -33,6 +46,12 @@ class Mqtt5RRClientTests: XCBaseTestCase { init(contextName: String = "MqttClient") { self.contextName = contextName + self.subscriptionStatusSuccessExpectation = XCTestExpectation( + description: "Expect streaming operation publish status success.") + self.subscriptionStatusErrorExpectation = XCTestExpectation( + description: "Expect streaming operation publish status error.") + self.incomingPublishExpectation = XCTestExpectation( + description: "Expect incoming publish event for request response client.") self.publishReceivedExpectation = XCTestExpectation(description: "Expect publish received.") self.publishTargetReachedExpectation = XCTestExpectation( description: "Expect publish target reached") @@ -84,11 +103,40 @@ class Mqtt5RRClientTests: XCBaseTestCase { print(contextName + " MqttRRClientTests: onLifecycleEventDisconnection") disconnectionExpectation.fulfill() } + + self.onRRIncomingPublish = { publishEvent in + self.callbackRWLock.write { + self.rrPublishEvent.append(publishEvent) + self.incomingPublishExpectation.fulfill() + } + } + + self.onSubscriptionStatusUpdate = { statusEvent in + self.callbackRWLock.write { + self.subscriptionStatusEvent = statusEvent + print( + contextName + + " MqttRRClientTests: onSubscriptionStatusUpdate. EventType: \(statusEvent.event)") + if statusEvent.event == SubscriptionStatusEventType.established { + self.subscriptionStatusSuccessExpectation.fulfill() + } else { + if let error = statusEvent.error { + print( + contextName + + " MqttRRClientTests: onSubscriptionStatusUpdate failed with error : (\(error.code)) \(error.name) : \(error.message)" + ) + } + self.subscriptionStatusErrorExpectation.fulfill() + } + } + } + } - // release the context before exit the test case + // make sure to cleanup the resources before exit the test case func cleanup() { self.responsePaths = nil + self.rrPublishEvent = [] } } @@ -348,7 +396,9 @@ class Mqtt5RRClientTests: XCBaseTestCase { options: MqttRequestResponseClientOptions( maxRequestResponseSubscription: 3, maxStreamingSubscription: 2, operationTimeout: 10)) let requestOptions = createRequestResponseGetOptions( - testContext: testContext, thingName: "NoSuchThing", withCorrelationToken: false, + testContext: testContext, + thingName: "NoSuchThing", + withCorrelationToken: false, publishTopic: "wrong/publish/topic") var errorCaught = false @@ -364,15 +414,189 @@ class Mqtt5RRClientTests: XCBaseTestCase { testContext.cleanup() } - func MqttRequestResponse_ShadowUpdatedStreamOpenCloseSuccess() throws { + func testMqttRequestResponse_ShadowUpdatedStreamOpenCloseSuccess() async throws { + let testContext = MqttRRTestContext() + let rrClient = try await setupRequestResponseClient(testContext: testContext) + let streamingOperation = try rrClient.createStream( + streamOptions: StreamingOperationOptions( + topicFilter: "test/topic", + subscriptionStatusCallback: testContext.onSubscriptionStatusUpdate!, + incomingPublishCallback: { _ in })) + + try streamingOperation.open() + await awaitExpectation([testContext.subscriptionStatusSuccessExpectation], 60) + } + + func testMqttRequestResponse_ShadowUpdatedStreamCreationFailed() async throws { + let testContext = MqttRRTestContext() + let rrClient = try await setupRequestResponseClient(testContext: testContext) + + do { + _ = try rrClient.createStream( + streamOptions: StreamingOperationOptions( + topicFilter: "", + subscriptionStatusCallback: { _ in }, + incomingPublishCallback: { _ in })) + + } catch CommonRunTimeError.crtError(let crtError) { + XCTAssertTrue(Int32(AWS_ERROR_INVALID_ARGUMENT.rawValue) == crtError.code) + } + } + + // closing the request-response client should failed the streaming operation + func testMqttRequestResponse_ShadowUpdatedStreamClientClosed() async throws { + let testContext = MqttRRTestContext() + var rrClient: MqttRequestResponseClient? = try await setupRequestResponseClient( + testContext: testContext) + XCTAssertNotNil(rrClient) + let streamingOperation = try rrClient!.createStream( + streamOptions: StreamingOperationOptions( + topicFilter: "test/topic", + subscriptionStatusCallback: testContext.onSubscriptionStatusUpdate!, + incomingPublishCallback: { _ in })) + do { + // open the operation successfully + try streamingOperation.open() + await awaitExpectation([testContext.subscriptionStatusSuccessExpectation], 60) + + // destory the request response client + rrClient = nil + + await awaitExpectation([testContext.subscriptionStatusErrorExpectation], 60) + XCTAssertEqual(testContext.subscriptionStatusEvent?.event, SubscriptionStatusEventType.halted) + XCTAssertEqual( + testContext.subscriptionStatusEvent?.error?.code, + Int32(AWS_ERROR_MQTT_REQUEST_RESPONSE_CLIENT_SHUT_DOWN.rawValue)) + } catch CommonRunTimeError.crtError(let crtError) { + XCTFail("Test failed with error \(crtError.name) (\(crtError.code)): \(crtError.message).") + } } - func MqttRequestResponse_ShadowUpdatedStreamClientClosed() throws { + func testMqttRequestResponse_ShadowUpdatedStreamIncomingPublishSuccess() async throws { + let testContext = MqttRRTestContext() + let mqtt5Client = try createMqtt5Client(testContext: testContext) + var rrClient: MqttRequestResponseClient? = try MqttRequestResponseClient( + mqtt5Client: mqtt5Client, + options: MqttRequestResponseClientOptions( + maxRequestResponseSubscription: 3, maxStreamingSubscription: 2, operationTimeout: 10)) + XCTAssertNotNil(rrClient) + // start the client + try await startClient(client: mqtt5Client, testContext: testContext) + let expectedTopic = UUID().uuidString + let expectedPayload = "incoming publish".data(using: .utf8) + let expectedContentType = "application/json" + let expectedTimeInterval = TimeInterval(8) + let expectedUserProperties = [ + UserProperty(name: "property1", value: "value1"), + UserProperty(name: "property2", value: "value2"), + ] + + var streamingOperation: StreamingOperation? = try rrClient!.createStream( + streamOptions: + StreamingOperationOptions( + topicFilter: expectedTopic, + subscriptionStatusCallback: + testContext.onSubscriptionStatusUpdate!, + incomingPublishCallback: + testContext.onRRIncomingPublish!)) + // open the streaming and wait for subscription success + try streamingOperation!.open() + await awaitExpectation([testContext.subscriptionStatusSuccessExpectation], 60) + let _ = try await mqtt5Client.publish( + publishPacket: PublishPacket( + qos: QoS.atLeastOnce, + topic: expectedTopic, + payload: expectedPayload, + messageExpiryInterval: expectedTimeInterval, + contentType: expectedContentType, + userProperties: expectedUserProperties)) + + await awaitExpectation([testContext.incomingPublishExpectation], 60) + + XCTAssertGreaterThan(testContext.rrPublishEvent.count, 0) + let publishEvent = testContext.rrPublishEvent[0] + XCTAssertTrue(publishEvent.topic == expectedTopic) + XCTAssertTrue(publishEvent.payload == expectedPayload) + XCTAssertTrue(publishEvent.contentType == expectedContentType) + XCTAssertTrue(publishEvent.userProperties.count == expectedUserProperties.count) + for (index, element) in publishEvent.userProperties.enumerated() { + XCTAssertTrue(element == expectedUserProperties[index]) + } + // We can't check for the exact value here as it'll be decremented by the server part. + XCTAssertNotNil(publishEvent.messageExpiryInterval) + + streamingOperation = nil + rrClient = nil + testContext.cleanup() } - func MqttRequestResponse_ShadowUpdatedStreamIncomingPublishSuccess() throws { + func testMqttRequestResponse_ShadowUpdatedStreamIncomingPublishNilValue() async throws { + let testContext = MqttRRTestContext() + let mqtt5Client = try createMqtt5Client(testContext: testContext) + var rrClient: MqttRequestResponseClient? = try MqttRequestResponseClient( + mqtt5Client: mqtt5Client, + options: MqttRequestResponseClientOptions( + maxRequestResponseSubscription: 3, maxStreamingSubscription: 2, operationTimeout: 10)) + XCTAssertNotNil(rrClient) + // start the client + try await startClient(client: mqtt5Client, testContext: testContext) + let expectedTopic = UUID().uuidString + let expectedPayload = "incoming publish".data(using: .utf8) + + var streamingOperation: StreamingOperation? = try rrClient!.createStream( + streamOptions: StreamingOperationOptions( + topicFilter: expectedTopic, + subscriptionStatusCallback: + testContext.onSubscriptionStatusUpdate!, + incomingPublishCallback: + testContext.onRRIncomingPublish!)) + // open the streaming and wait for subscription success + try streamingOperation!.open() + await awaitExpectation([testContext.subscriptionStatusSuccessExpectation], 60) + let _ = try await mqtt5Client.publish( + publishPacket: PublishPacket( + qos: QoS.atLeastOnce, + topic: expectedTopic, + payload: expectedPayload)) + + await awaitExpectation([testContext.incomingPublishExpectation], 60) + + XCTAssertGreaterThan(testContext.rrPublishEvent.count, 0) + let publishEvent = testContext.rrPublishEvent[0] + XCTAssertTrue(publishEvent.topic == expectedTopic) + XCTAssertTrue(publishEvent.payload == expectedPayload) + XCTAssertNil(publishEvent.contentType) + XCTAssertTrue(publishEvent.userProperties.count == 0) + XCTAssertNil(publishEvent.messageExpiryInterval) + + streamingOperation = nil + rrClient = nil + testContext.cleanup() + } + + func testMqttRequestResponse_ShadowUpdatedStreamReopenFailed() async throws { + let testContext = MqttRRTestContext() + let rrClient = try await setupRequestResponseClient(testContext: testContext) + let streamingOperation = try rrClient.createStream( + streamOptions: StreamingOperationOptions( + topicFilter: "test/topic", + subscriptionStatusCallback: testContext.onSubscriptionStatusUpdate!, + incomingPublishCallback: { _ in })) + var reopenFailed = false; + try streamingOperation.open() + await awaitExpectation([testContext.subscriptionStatusSuccessExpectation], 60) + + do { + // reopen the streaming operation + try streamingOperation.open() + } catch CommonRunTimeError.crtError(let crtError) { + reopenFailed = true + XCTAssertTrue( + crtError.code == AWS_ERROR_MQTT_REUQEST_RESPONSE_STREAM_ALREADY_ACTIVATED.rawValue) + } + XCTAssertTrue(reopenFailed) } }