Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CustomerIOMessagingInApp.podspec
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,5 @@ Pod::Spec.new do |spec|
spec.module_name = "CioMessagingInApp" # the `import X` name when using SDK in Swift files

spec.dependency "CustomerIOCommon", "= #{spec.version.to_s}"
spec.dependency "LDSwiftEventSource", "~> 3.3"
end
10 changes: 8 additions & 2 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@ let package = Package(


// Make sure the version number is same for DataPipelines cocoapods.
.package(name: "CioAnalytics", url: "https://github.com/customerio/cdp-analytics-swift.git", .exact("1.7.3+cio.1"))
.package(name: "CioAnalytics", url: "https://github.com/customerio/cdp-analytics-swift.git", .exact("1.7.3+cio.1")),

// SSE (Server-Sent Events) client for real-time in-app messaging
.package(url: "https://github.com/LaunchDarkly/swift-eventsource.git", .upToNextMajor(from: "3.3.0"))
],
targets: [
// Common - Code used by multiple modules in the SDK project.
Expand Down Expand Up @@ -113,7 +116,10 @@ let package = Package(

// Messaging in-app
.target(name: "CioMessagingInApp",
dependencies: ["CioInternalCommon"],
dependencies: [
"CioInternalCommon",
.product(name: "LDSwiftEventSource", package: "swift-eventsource")
],
path: "Sources/MessagingInApp",
resources: [
.process("Resources/PrivacyInfo.xcprivacy"),
Expand Down
131 changes: 118 additions & 13 deletions Sources/MessagingInApp/Gist/Gist.swift
Original file line number Diff line number Diff line change
Expand Up @@ -25,50 +25,142 @@ class Gist: GistProvider {
private let inAppMessageManager: InAppMessageManager
private let queueManager: QueueManager
private let threadUtil: ThreadUtil
private let sseLifecycleManager: SseLifecycleManager

private var inAppMessageStoreSubscriber: InAppMessageStoreSubscriber?
private var pollIntervalSubscriber: InAppMessageStoreSubscriber?
private var sseEnabledSubscriber: InAppMessageStoreSubscriber?
private var userIdSubscriber: InAppMessageStoreSubscriber?
private var queueTimer: Timer?

init(
logger: Logger,
gistDelegate: GistDelegate,
inAppMessageManager: InAppMessageManager,
queueManager: QueueManager,
threadUtil: ThreadUtil
threadUtil: ThreadUtil,
sseLifecycleManager: SseLifecycleManager
) {
self.logger = logger
self.gistDelegate = gistDelegate
self.inAppMessageManager = inAppMessageManager
self.queueManager = queueManager
self.threadUtil = threadUtil
self.sseLifecycleManager = sseLifecycleManager

subscribeToInAppMessageState()

// Start the SSE lifecycle manager to observe app foreground/background events
Task {
await sseLifecycleManager.start()
}
}

deinit {
// Unsubscribe from in-app message state changes and release resources to stop polling
// and prevent memory leaks.
if let subscriber = inAppMessageStoreSubscriber {
if let subscriber = pollIntervalSubscriber {
inAppMessageManager.unsubscribe(subscriber: subscriber)
}
if let subscriber = sseEnabledSubscriber {
inAppMessageManager.unsubscribe(subscriber: subscriber)
}
inAppMessageStoreSubscriber = nil
if let subscriber = userIdSubscriber {
inAppMessageManager.unsubscribe(subscriber: subscriber)
}
pollIntervalSubscriber = nil
sseEnabledSubscriber = nil
userIdSubscriber = nil

invalidateTimer()
}

private func subscribeToInAppMessageState() {
// Keep a strong reference to the subscriber to prevent deallocation and continue receiving updates
inAppMessageStoreSubscriber = {
let subscriber = InAppMessageStoreSubscriber { state in
self.setupPollingAndFetch(skipMessageFetch: true, pollingInterval: state.pollInterval)
// Subscribe to poll interval changes
pollIntervalSubscriber = {
let subscriber = InAppMessageStoreSubscriber { [weak self] state in
guard let self else { return }
// Only update polling if SSE is not active
if !state.shouldUseSse {
setupPollingAndFetch(skipMessageFetch: true, pollingInterval: state.pollInterval)
}
}
// Subscribe to changes in `pollInterval` property of `InAppMessageState`
inAppMessageManager.subscribe(keyPath: \.pollInterval, subscriber: subscriber)
return subscriber
}()

// Subscribe to SSE flag changes (matching Android's subscribeToAttribute for sseEnabled)
sseEnabledSubscriber = {
let subscriber = InAppMessageStoreSubscriber { [weak self] state in
guard let self else { return }
handleSseEnabledChange(state: state)
}
inAppMessageManager.subscribe(keyPath: \.useSse, subscriber: subscriber)
logger.logWithModuleTag("Gist: Subscribed to SSE flag changes", level: .debug)
return subscriber
}()

// Subscribe to user identification changes (matching Android's subscribeToAttribute for isUserIdentified)
userIdSubscriber = {
let subscriber = InAppMessageStoreSubscriber { [weak self] state in
guard let self else { return }
handleUserIdentificationChange(state: state)
}
inAppMessageManager.subscribe(keyPath: \.userId, subscriber: subscriber)
logger.logWithModuleTag("Gist: Subscribed to userId changes", level: .debug)
return subscriber
}()
}

/// Handles SSE flag changes for polling control.
/// When SSE becomes active (enabled + identified user), stop polling.
/// When SSE becomes disabled, start polling.
private func handleSseEnabledChange(state: InAppMessageState) {
logger.logWithModuleTag(
"Gist: SSE flag changed - sseEnabled: \(state.useSse), isUserIdentified: \(state.isUserIdentified), shouldUseSse: \(state.shouldUseSse)",
level: .info
)

if state.shouldUseSse {
// SSE is now active - stop polling
logger.logWithModuleTag("Gist: SSE enabled for identified user - stopping polling timer", level: .info)
invalidateTimer()
} else if !state.useSse {
// SSE disabled - start polling
logger.logWithModuleTag("Gist: SSE disabled - starting polling with interval: \(state.pollInterval)s", level: .info)
setupPollingAndFetch(skipMessageFetch: false, pollingInterval: state.pollInterval)
} else {
// SSE enabled but user is anonymous - polling continues
logger.logWithModuleTag("Gist: SSE enabled but user anonymous - polling continues", level: .debug)
}
}

/// Handles user identification changes for polling control.
/// When user becomes identified and SSE is enabled, stop polling (SSE will take over).
/// When user becomes anonymous but SSE flag is still enabled, start polling.
private func handleUserIdentificationChange(state: InAppMessageState) {
logger.logWithModuleTag(
"Gist: User identification changed - isUserIdentified: \(state.isUserIdentified), sseEnabled: \(state.useSse), shouldUseSse: \(state.shouldUseSse)",
level: .info
)

if state.shouldUseSse {
// User became identified and SSE is enabled - stop polling (SSE will take over)
logger.logWithModuleTag("Gist: User identified with SSE enabled - stopping polling (SSE will handle messages)", level: .info)
invalidateTimer()
} else if !state.isUserIdentified, state.useSse {
// User became anonymous but SSE flag is still enabled - start polling
// (SSE won't be used for anonymous users)
logger.logWithModuleTag("Gist: User became anonymous with SSE enabled - starting polling (SSE not used for anonymous users)", level: .info)
setupPollingAndFetch(skipMessageFetch: false, pollingInterval: state.pollInterval)
} else {
logger.logWithModuleTag("Gist: No polling action needed for user identification change", level: .debug)
}
}

private func invalidateTimer() {
// Timer must be scheduled or modified on main.
let timerWasActive = queueTimer != nil
logger.logWithModuleTag("Gist: Invalidating polling timer (wasActive: \(timerWasActive))", level: .debug)
threadUtil.runMain {
self.queueTimer?.invalidate()
self.queueTimer = nil
Expand Down Expand Up @@ -141,7 +233,7 @@ class Gist: GistProvider {
}

private func setupPollingAndFetch(skipMessageFetch: Bool, pollingInterval: Double) {
logger.logWithModuleTag("Setting up polling with interval: \(pollingInterval) seconds and skipMessageFetch: \(skipMessageFetch)", level: .info)
logger.logWithModuleTag("Gist: Setting up polling timer - interval: \(pollingInterval)s, skipInitialFetch: \(skipMessageFetch)", level: .info)
invalidateTimer()

// Timer must be scheduled on the main thread
Expand All @@ -153,6 +245,7 @@ class Gist: GistProvider {
userInfo: nil,
repeats: true
)
self.logger.logWithModuleTag("Gist: Polling timer started with interval: \(pollingInterval)s", level: .debug)
}

if !skipMessageFetch {
Expand All @@ -167,16 +260,28 @@ class Gist: GistProvider {
/// Also, the method must be called on main thread since it checks the application state.
@objc
func fetchUserMessages() {
logger.logWithModuleTag("Attempting to fetch user messages from remote service", level: .info)
guard UIApplication.shared.applicationState != .background else {
logger.logWithModuleTag("Application in background, skipping queue check.", level: .info)
logger.logWithModuleTag("Gist: Application in background, skipping queue check", level: .debug)
return
}

logger.logWithModuleTag("Checking Gist queue service", level: .info)
inAppMessageManager.fetchState { [weak self] state in
guard let self else { return }

// Skip polling only if SSE should be used (enabled + user is identified)
// Anonymous users always use polling even if SSE flag is enabled
guard !state.shouldUseSse else {
logger.logWithModuleTag(
"Gist: Skipping polling - SSE active (sseEnabled: \(state.useSse), isUserIdentified: \(state.isUserIdentified))",
level: .debug
)
return
}

logger.logWithModuleTag(
"Gist: Polling for messages (sseEnabled: \(state.useSse), isUserIdentified: \(state.isUserIdentified))",
level: .info
)
fetchUserQueue(state: state)
}
}
Expand Down
23 changes: 23 additions & 0 deletions Sources/MessagingInApp/Gist/Managers/QueueManager.swift
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class QueueManager {
switch response {
case .success(let (data, response)):
self.updatePollingInterval(headers: response.allHeaderFields)
self.updateSseFlag(headers: response.allHeaderFields)
self.logger.logWithModuleTag("Gist queue fetch response: \(response.statusCode)", level: .debug)
switch response.statusCode {
case 304:
Expand Down Expand Up @@ -141,4 +142,26 @@ class QueueManager {
inAppMessageManager.dispatch(action: .setPollingInterval(interval: newPollingInterval))
}
}

private func updateSseFlag(headers: [AnyHashable: Any]) {
// Check for SSE flag in headers
if let sseHeaderValue = headers["x-cio-use-sse"] as? String {
logger.logWithModuleTag("X-CIO-Use-SSE header found with value: '\(sseHeaderValue)'", level: .info)
let useSse = sseHeaderValue.lowercased() == "true"

inAppMessageManager.fetchState { [weak self] state in
guard let self = self else { return }

// Only update if the value has changed
if state.useSse != useSse {
logger.logWithModuleTag("SSE flag changing from \(state.useSse) to \(useSse)", level: .info)
inAppMessageManager.dispatch(action: .setSseEnabled(enabled: useSse))
} else {
logger.logWithModuleTag("SSE flag unchanged, remains: \(useSse)", level: .debug)
}
}
} else {
logger.logWithModuleTag("X-CIO-Use-SSE header not present in response", level: .debug)
}
}
}
4 changes: 4 additions & 0 deletions Sources/MessagingInApp/Gist/Network/NetworkSettings.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,26 @@ protocol NetworkSettings {
var queueAPI: String { get }
var engineAPI: String { get }
var renderer: String { get }
var sseAPI: String { get }
}

struct NetworkSettingsProduction: NetworkSettings {
let queueAPI = "https://consumer.inapp.customer.io"
let engineAPI = "https://engine.api.gist.build"
let renderer = "https://renderer.gist.build/3.0"
let sseAPI = "https://realtime.inapp.customer.io/api/v3/sse"
}

struct NetworkSettingsDevelopment: NetworkSettings {
let queueAPI = "https://consumer.dev.inapp.customer.io"
let engineAPI = "https://engine.api.dev.gist.build"
let renderer = "https://renderer.gist.build/3.0"
let sseAPI = "https://realtime.inapp.customer.io/api/v3/sse"
}

struct NetworkSettingsLocal: NetworkSettings {
let queueAPI = "http://queue.api.local.gist.build:86"
let engineAPI = "http://engine.api.local.gist.build:82"
let renderer = "http://app.local.gist.build:8080/web"
let sseAPI = "http://realtime.api.local.gist.build:86/api/v3/sse"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import Foundation

/// Backport of `AsyncStream.makeStream()` for iOS 13+.
///
/// `AsyncStream.makeStream()` was introduced in iOS 17 and provides a way to create
/// an AsyncStream and access its continuation separately. This is useful when you need
/// to set up resources that require the continuation before returning the stream.
///
/// This backport provides the same functionality for older iOS versions.
enum AsyncStreamBackport {
/// Creates an AsyncStream and returns both the stream and its continuation separately.
///
/// This allows setup code to access the continuation before the stream is consumed,
/// enabling synchronous setup patterns that avoid race conditions.
///
/// Example:
/// ```swift
/// func connect() -> AsyncStream<Event> {
/// let (stream, continuation) = AsyncStreamBackport.makeStream(of: Event.self)
///
/// // Use continuation immediately for setup
/// let handler = EventHandler(continuation: continuation)
/// self.connection = Connection(handler: handler)
/// self.connection.start()
///
/// return stream
/// }
/// ```
///
/// - Parameters:
/// - elementType: The type of elements in the stream.
/// - bufferingPolicy: The buffering policy for the stream. Defaults to `.unbounded`.
/// - Returns: A tuple containing the stream and its continuation.
static func makeStream<Element>(
of elementType: Element.Type = Element.self,
bufferingPolicy: AsyncStream<Element>.Continuation.BufferingPolicy = .unbounded
) -> (stream: AsyncStream<Element>, continuation: AsyncStream<Element>.Continuation) {
// Capture the continuation from the closure using an optional variable
var continuation: AsyncStream<Element>.Continuation?

let stream = AsyncStream<Element>(bufferingPolicy: bufferingPolicy) { cont in
// This closure is called synchronously during AsyncStream initialization
continuation = cont
}

// By the time AsyncStream's init returns, the closure has executed
// and continuation is guaranteed to be set
guard let continuation = continuation else {
// This should never happen - the closure is called synchronously
fatalError("AsyncStream continuation was not set during initialization")
}

return (stream, continuation)
}
}
Loading
Loading