Skip to content
1 change: 1 addition & 0 deletions CustomerIOMessagingInApp.podspec
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,5 @@ Pod::Spec.new do |spec|
spec.module_name = "CioMessagingInApp" # the `import X` name when using SDK in Swift files

spec.dependency "CustomerIOCommon", "= #{spec.version.to_s}"
spec.dependency "LDSwiftEventSource", "~> 3.3"
end
10 changes: 8 additions & 2 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@ let package = Package(


// Make sure the version number is same for DataPipelines cocoapods.
.package(name: "CioAnalytics", url: "https://github.com/customerio/cdp-analytics-swift.git", .exact("1.7.3+cio.1"))
.package(name: "CioAnalytics", url: "https://github.com/customerio/cdp-analytics-swift.git", .exact("1.7.3+cio.1")),

// SSE (Server-Sent Events) client for real-time in-app messaging
.package(url: "https://github.com/LaunchDarkly/swift-eventsource.git", .upToNextMajor(from: "3.3.0"))
],
targets: [
// Common - Code used by multiple modules in the SDK project.
Expand Down Expand Up @@ -113,7 +116,10 @@ let package = Package(

// Messaging in-app
.target(name: "CioMessagingInApp",
dependencies: ["CioInternalCommon"],
dependencies: [
"CioInternalCommon",
.product(name: "LDSwiftEventSource", package: "swift-eventsource")
],
path: "Sources/MessagingInApp",
resources: [
.process("Resources/PrivacyInfo.xcprivacy"),
Expand Down
131 changes: 118 additions & 13 deletions Sources/MessagingInApp/Gist/Gist.swift
Original file line number Diff line number Diff line change
Expand Up @@ -25,50 +25,142 @@ class Gist: GistProvider {
private let inAppMessageManager: InAppMessageManager
private let queueManager: QueueManager
private let threadUtil: ThreadUtil
private let sseLifecycleManager: SseLifecycleManager

private var inAppMessageStoreSubscriber: InAppMessageStoreSubscriber?
private var pollIntervalSubscriber: InAppMessageStoreSubscriber?
private var sseEnabledSubscriber: InAppMessageStoreSubscriber?
private var userIdSubscriber: InAppMessageStoreSubscriber?
private var queueTimer: Timer?

init(
logger: Logger,
gistDelegate: GistDelegate,
inAppMessageManager: InAppMessageManager,
queueManager: QueueManager,
threadUtil: ThreadUtil
threadUtil: ThreadUtil,
sseLifecycleManager: SseLifecycleManager
) {
self.logger = logger
self.gistDelegate = gistDelegate
self.inAppMessageManager = inAppMessageManager
self.queueManager = queueManager
self.threadUtil = threadUtil
self.sseLifecycleManager = sseLifecycleManager

subscribeToInAppMessageState()

// Start the SSE lifecycle manager to observe app foreground/background events
Task {
await sseLifecycleManager.start()
}
}

deinit {
// Unsubscribe from in-app message state changes and release resources to stop polling
// and prevent memory leaks.
if let subscriber = inAppMessageStoreSubscriber {
if let subscriber = pollIntervalSubscriber {
inAppMessageManager.unsubscribe(subscriber: subscriber)
}
if let subscriber = sseEnabledSubscriber {
inAppMessageManager.unsubscribe(subscriber: subscriber)
}
inAppMessageStoreSubscriber = nil
if let subscriber = userIdSubscriber {
inAppMessageManager.unsubscribe(subscriber: subscriber)
}
pollIntervalSubscriber = nil
sseEnabledSubscriber = nil
userIdSubscriber = nil

invalidateTimer()
}

private func subscribeToInAppMessageState() {
// Keep a strong reference to the subscriber to prevent deallocation and continue receiving updates
inAppMessageStoreSubscriber = {
let subscriber = InAppMessageStoreSubscriber { state in
self.setupPollingAndFetch(skipMessageFetch: true, pollingInterval: state.pollInterval)
// Subscribe to poll interval changes
pollIntervalSubscriber = {
let subscriber = InAppMessageStoreSubscriber { [weak self] state in
guard let self else { return }
// Only update polling if SSE is not active
if !state.shouldUseSse {
setupPollingAndFetch(skipMessageFetch: true, pollingInterval: state.pollInterval)
}
}
// Subscribe to changes in `pollInterval` property of `InAppMessageState`
inAppMessageManager.subscribe(keyPath: \.pollInterval, subscriber: subscriber)
return subscriber
}()

// Subscribe to SSE flag changes (matching Android's subscribeToAttribute for sseEnabled)
sseEnabledSubscriber = {
let subscriber = InAppMessageStoreSubscriber { [weak self] state in
guard let self else { return }
handleSseEnabledChange(state: state)
}
inAppMessageManager.subscribe(keyPath: \.useSse, subscriber: subscriber)
logger.logWithModuleTag("Gist: Subscribed to SSE flag changes", level: .debug)
return subscriber
}()

// Subscribe to user identification changes (matching Android's subscribeToAttribute for isUserIdentified)
userIdSubscriber = {
let subscriber = InAppMessageStoreSubscriber { [weak self] state in
guard let self else { return }
handleUserIdentificationChange(state: state)
}
inAppMessageManager.subscribe(keyPath: \.userId, subscriber: subscriber)
logger.logWithModuleTag("Gist: Subscribed to userId changes", level: .debug)
return subscriber
}()
}

/// Handles SSE flag changes for polling control.
/// When SSE becomes active (enabled + identified user), stop polling.
/// When SSE becomes disabled, start polling.
private func handleSseEnabledChange(state: InAppMessageState) {
logger.logWithModuleTag(
"Gist: SSE flag changed - sseEnabled: \(state.useSse), isUserIdentified: \(state.isUserIdentified), shouldUseSse: \(state.shouldUseSse)",
level: .info
)

if state.shouldUseSse {
// SSE is now active - stop polling
logger.logWithModuleTag("Gist: SSE enabled for identified user - stopping polling timer", level: .info)
invalidateTimer()
} else if !state.useSse {
// SSE disabled - start polling
logger.logWithModuleTag("Gist: SSE disabled - starting polling with interval: \(state.pollInterval)s", level: .info)
setupPollingAndFetch(skipMessageFetch: false, pollingInterval: state.pollInterval)
} else {
// SSE enabled but user is anonymous - polling continues
logger.logWithModuleTag("Gist: SSE enabled but user anonymous - polling continues", level: .debug)
}
}

/// Handles user identification changes for polling control.
/// When user becomes identified and SSE is enabled, stop polling (SSE will take over).
/// When user becomes anonymous but SSE flag is still enabled, start polling.
private func handleUserIdentificationChange(state: InAppMessageState) {
logger.logWithModuleTag(
"Gist: User identification changed - isUserIdentified: \(state.isUserIdentified), sseEnabled: \(state.useSse), shouldUseSse: \(state.shouldUseSse)",
level: .info
)

if state.shouldUseSse {
// User became identified and SSE is enabled - stop polling (SSE will take over)
logger.logWithModuleTag("Gist: User identified with SSE enabled - stopping polling (SSE will handle messages)", level: .info)
invalidateTimer()
} else if !state.isUserIdentified, state.useSse {
// User became anonymous but SSE flag is still enabled - start polling
// (SSE won't be used for anonymous users)
logger.logWithModuleTag("Gist: User became anonymous with SSE enabled - starting polling (SSE not used for anonymous users)", level: .info)
setupPollingAndFetch(skipMessageFetch: false, pollingInterval: state.pollInterval)
} else {
logger.logWithModuleTag("Gist: No polling action needed for user identification change", level: .debug)
}
}

private func invalidateTimer() {
// Timer must be scheduled or modified on main.
let timerWasActive = queueTimer != nil
logger.logWithModuleTag("Gist: Invalidating polling timer (wasActive: \(timerWasActive))", level: .debug)
threadUtil.runMain {
self.queueTimer?.invalidate()
self.queueTimer = nil
Expand Down Expand Up @@ -141,7 +233,7 @@ class Gist: GistProvider {
}

private func setupPollingAndFetch(skipMessageFetch: Bool, pollingInterval: Double) {
logger.logWithModuleTag("Setting up polling with interval: \(pollingInterval) seconds and skipMessageFetch: \(skipMessageFetch)", level: .info)
logger.logWithModuleTag("Gist: Setting up polling timer - interval: \(pollingInterval)s, skipInitialFetch: \(skipMessageFetch)", level: .info)
invalidateTimer()

// Timer must be scheduled on the main thread
Expand All @@ -153,6 +245,7 @@ class Gist: GistProvider {
userInfo: nil,
repeats: true
)
self.logger.logWithModuleTag("Gist: Polling timer started with interval: \(pollingInterval)s", level: .debug)
}

if !skipMessageFetch {
Expand All @@ -167,16 +260,28 @@ class Gist: GistProvider {
/// Also, the method must be called on main thread since it checks the application state.
@objc
func fetchUserMessages() {
logger.logWithModuleTag("Attempting to fetch user messages from remote service", level: .info)
guard UIApplication.shared.applicationState != .background else {
logger.logWithModuleTag("Application in background, skipping queue check.", level: .info)
logger.logWithModuleTag("Gist: Application in background, skipping queue check", level: .debug)
return
}

logger.logWithModuleTag("Checking Gist queue service", level: .info)
inAppMessageManager.fetchState { [weak self] state in
guard let self else { return }

// Skip polling only if SSE should be used (enabled + user is identified)
// Anonymous users always use polling even if SSE flag is enabled
guard !state.shouldUseSse else {
logger.logWithModuleTag(
"Gist: Skipping polling - SSE active (sseEnabled: \(state.useSse), isUserIdentified: \(state.isUserIdentified))",
level: .debug
)
return
}

logger.logWithModuleTag(
"Gist: Polling for messages (sseEnabled: \(state.useSse), isUserIdentified: \(state.isUserIdentified))",
level: .info
)
fetchUserQueue(state: state)
}
}
Expand Down
23 changes: 23 additions & 0 deletions Sources/MessagingInApp/Gist/Managers/QueueManager.swift
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class QueueManager {
switch response {
case .success(let (data, response)):
self.updatePollingInterval(headers: response.allHeaderFields)
self.updateSseFlag(headers: response.allHeaderFields)
self.logger.logWithModuleTag("Gist queue fetch response: \(response.statusCode)", level: .debug)
switch response.statusCode {
case 304:
Expand Down Expand Up @@ -141,4 +142,26 @@ class QueueManager {
inAppMessageManager.dispatch(action: .setPollingInterval(interval: newPollingInterval))
}
}

private func updateSseFlag(headers: [AnyHashable: Any]) {
// Check for SSE flag in headers
if let sseHeaderValue = headers["x-cio-use-sse"] as? String {
logger.logWithModuleTag("X-CIO-Use-SSE header found with value: '\(sseHeaderValue)'", level: .info)
let useSse = sseHeaderValue.lowercased() == "true"

inAppMessageManager.fetchState { [weak self] state in
guard let self = self else { return }

// Only update if the value has changed
if state.useSse != useSse {
logger.logWithModuleTag("SSE flag changing from \(state.useSse) to \(useSse)", level: .info)
inAppMessageManager.dispatch(action: .setSseEnabled(enabled: useSse))
} else {
logger.logWithModuleTag("SSE flag unchanged, remains: \(useSse)", level: .debug)
}
}
} else {
logger.logWithModuleTag("X-CIO-Use-SSE header not present in response", level: .debug)
}
}
}
4 changes: 4 additions & 0 deletions Sources/MessagingInApp/Gist/Network/NetworkSettings.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,26 @@ protocol NetworkSettings {
var queueAPI: String { get }
var engineAPI: String { get }
var renderer: String { get }
var sseAPI: String { get }
}

struct NetworkSettingsProduction: NetworkSettings {
let queueAPI = "https://consumer.inapp.customer.io"
let engineAPI = "https://engine.api.gist.build"
let renderer = "https://renderer.gist.build/3.0"
let sseAPI = "https://realtime.inapp.customer.io/api/v3/sse"
}

struct NetworkSettingsDevelopment: NetworkSettings {
let queueAPI = "https://consumer.dev.inapp.customer.io"
let engineAPI = "https://engine.api.dev.gist.build"
let renderer = "https://renderer.gist.build/3.0"
let sseAPI = "https://realtime.inapp.customer.io/api/v3/sse"
}

struct NetworkSettingsLocal: NetworkSettings {
let queueAPI = "http://queue.api.local.gist.build:86"
let engineAPI = "http://engine.api.local.gist.build:82"
let renderer = "http://app.local.gist.build:8080/web"
let sseAPI = "http://realtime.api.local.gist.build:86/api/v3/sse"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import Foundation

/// Backport of `AsyncStream.makeStream()` for iOS 13+.
///
/// `AsyncStream.makeStream()` was introduced in iOS 17 and provides a way to create
/// an AsyncStream and access its continuation separately. This is useful when you need
/// to set up resources that require the continuation before returning the stream.
///
/// This backport provides the same functionality for older iOS versions.
enum AsyncStreamBackport {
/// Creates an AsyncStream and returns both the stream and its continuation separately.
///
/// This allows setup code to access the continuation before the stream is consumed,
/// enabling synchronous setup patterns that avoid race conditions.
///
/// Example:
/// ```swift
/// func connect() -> AsyncStream<Event> {
/// let (stream, continuation) = AsyncStreamBackport.makeStream(of: Event.self)
///
/// // Use continuation immediately for setup
/// let handler = EventHandler(continuation: continuation)
/// self.connection = Connection(handler: handler)
/// self.connection.start()
///
/// return stream
/// }
/// ```
///
/// - Parameters:
/// - elementType: The type of elements in the stream.
/// - bufferingPolicy: The buffering policy for the stream. Defaults to `.unbounded`.
/// - Returns: A tuple containing the stream and its continuation.
static func makeStream<Element>(
of elementType: Element.Type = Element.self,
bufferingPolicy: AsyncStream<Element>.Continuation.BufferingPolicy = .unbounded
) -> (stream: AsyncStream<Element>, continuation: AsyncStream<Element>.Continuation) {
// Capture the continuation from the closure using an optional variable
var continuation: AsyncStream<Element>.Continuation?

let stream = AsyncStream<Element>(bufferingPolicy: bufferingPolicy) { cont in
// This closure is called synchronously during AsyncStream initialization
continuation = cont
}

// By the time AsyncStream's init returns, the closure has executed
// and continuation is guaranteed to be set
guard let continuation = continuation else {
// This should never happen - the closure is called synchronously
fatalError("AsyncStream continuation was not set during initialization")
}

return (stream, continuation)
}
}
Loading
Loading