Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,9 +1,18 @@
import Darwin
import Dispatch
import Foundation
import SupacodeSettingsShared

private let watcherLogger = SupaLogger("WorktreeInfoWatcher")

@MainActor
final class WorktreeInfoWatcherManager {
/// Hard cap on the live event buffer. These events are refresh signals (not
/// coalescable state), so the stream is capped rather than deduped: a wedged
/// consumer drops the oldest signals instead of letting the buffer grow
/// without bound.
static let eventBufferCap = 2048

private struct HeadWatcher {
let headURL: URL
let source: DispatchSourceFileSystemObject
Expand Down Expand Up @@ -80,7 +89,10 @@ final class WorktreeInfoWatcherManager {

func eventStream() -> AsyncStream<WorktreeInfoWatcherClient.Event> {
eventContinuation?.finish()
let (stream, continuation) = AsyncStream.makeStream(of: WorktreeInfoWatcherClient.Event.self)
let (stream, continuation) = AsyncStream.makeStream(
of: WorktreeInfoWatcherClient.Event.self,
bufferingPolicy: .bufferingNewest(Self.eventBufferCap)
)
eventContinuation = continuation
return stream
}
Expand Down Expand Up @@ -441,6 +453,9 @@ final class WorktreeInfoWatcherManager {
do {
try await sleep(request.interval)
} catch {
if !(error is CancellationError) {
watcherLogger.error("Worktree refresh loop for \(worktreeID) ended: \(error).")
}
break
}
guard !Task.isCancelled else {
Expand All @@ -460,7 +475,23 @@ final class WorktreeInfoWatcherManager {
{
return
}
eventContinuation?.yield(event)
let result = eventContinuation?.yield(event)
if case .dropped(let shed)? = result {
let cap = Self.eventBufferCap
watcherLogger.error(
"Worktree info event buffer full (cap \(cap)); shed oldest refresh signal: \(Self.label(for: shed)).")
}
}

/// Compact identity for a backpressure-drop log. Strips the pull-request
/// refresh's worktree-id list to a count so a drop storm can't flood the log;
/// the single-id signals carry small payloads and describe themselves.
private static func label(for event: WorktreeInfoWatcherClient.Event) -> String {
switch event {
case .repositoryPullRequestRefresh(let rootURL, let worktreeIDs):
"repositoryPullRequestRefresh(\(rootURL.lastPathComponent), \(worktreeIDs.count) worktrees)"
default: String(describing: event)
}
}

private func cancelPullRequestSelectionCooldown(for repositoryRootURL: URL) {
Expand Down
148 changes: 139 additions & 9 deletions supacode/Features/Terminal/BusinessLogic/WorktreeTerminalManager.swift
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,19 @@ final class WorktreeTerminalManager {
private var lastEmittedProjections: [Worktree.ID: WorktreeRowProjection] = [:]
private var eventContinuation: AsyncStream<TerminalClient.Event>.Continuation?
private var pendingEvents: [TerminalClient.Event] = []
/// Latest-wins events deduped by identity: drops a value equal to the
/// immediately-previous one per key (a burst of distinct values still passes),
/// so per-tab projection / progress / task-status / focus repeats don't flood
/// the stream. Cleared on resubscribe and purged on tab / worktree teardown.
private var lastEmittedCoalescable: [CoalesceKey: TerminalClient.Event] = [:]
/// Hard cap on the live event buffer. Source coalescing keeps it near-empty in
/// practice; this backstops a wedged consumer so memory stays bounded instead
/// of growing without limit.
static let eventBufferCap = 2048
/// Cap for lifecycle events buffered before the first subscriber attaches.
/// Coalescable state collapses per key and doesn't count, so this only bounds
/// one-shot events; the sole consumer attaches at launch, well under the cap.
static let pendingEventCap = 1024
@ObservationIgnored
private var pendingIdleHookEvents: [IdleDebounceKey: Task<Void, Never>] = [:]
@ObservationIgnored
Expand Down Expand Up @@ -57,6 +70,50 @@ final class WorktreeTerminalManager {
let agent: SkillAgent
}

/// Identity for a latest-wins event. Two events sharing a key carry the same
/// piece of state, so an identical repeat is a no-op and is dropped.
private enum CoalesceKey: Hashable {
case worktreeProjection(Worktree.ID)
case tabProjection(TerminalTabID)
case tabProgress(TerminalTabID)
case taskStatus(Worktree.ID)
case focus(Worktree.ID)
case notificationIndicator
case hasAnySurface
}

/// Non-nil for state events that are safe to coalesce by identity. Lifecycle /
/// one-shot events (tab create / close / remove, notifications, script
/// completion, command-palette, teardown) return nil and are never dropped.
private static func coalesceKey(for event: TerminalClient.Event) -> CoalesceKey? {
switch event {
case .worktreeProjectionChanged(let worktreeID, _): .worktreeProjection(worktreeID)
case .tabProjectionChanged(_, let projection): .tabProjection(projection.tabID)
case .tabProgressDisplayChanged(_, let tabID, _): .tabProgress(tabID)
case .taskStatusChanged(let worktreeID, _): .taskStatus(worktreeID)
case .focusChanged(let worktreeID, _): .focus(worktreeID)
case .notificationIndicatorChanged: .notificationIndicator
case .terminalHasAnySurfaceChanged: .hasAnySurface
default: nil
}
}

/// Compact identity for a backpressure-drop log. Strips the payload-heavy
/// cases (projections / notification bodies) to their key ids so a drop storm
/// can't flood the log; the rest carry small payloads and describe themselves.
private static func label(for event: TerminalClient.Event) -> String {
switch event {
case .worktreeProjectionChanged(let worktreeID, _): "worktreeProjectionChanged(\(worktreeID))"
case .tabProjectionChanged(let worktreeID, let projection):
"tabProjectionChanged(\(worktreeID), tab: \(projection.tabID))"
case .tabProgressDisplayChanged(let worktreeID, let tabID, _):
"tabProgressDisplayChanged(\(worktreeID), tab: \(tabID))"
case .notificationReceived(let worktreeID, let surfaceID, _, _):
"notificationReceived(\(worktreeID), surface: \(surfaceID))"
default: String(describing: event)
}
}

var selectedWorktreeID: Worktree.ID?
var saveLayoutSnapshot: ((Worktree.ID, TerminalLayoutSnapshot?) -> Void)?
var loadLayoutSnapshot: ((Worktree.ID) -> TerminalLayoutSnapshot?)?
Expand Down Expand Up @@ -164,6 +221,13 @@ final class WorktreeTerminalManager {
emit(.agentHookEventReceived(event))
}

#if DEBUG
/// Count of idle-hook debounce tasks still scheduled (test-only). A clock-awoken
/// resume removes its key only after it emits, so a non-zero count means a
/// pending idle event has not yet landed in the stream.
var pendingIdleHookCountForTesting: Int { pendingIdleHookEvents.count }
#endif

// MARK: - CLI queries.

func listTabs(worktreeID: String) -> [[String: String]]? {
Expand Down Expand Up @@ -338,17 +402,27 @@ final class WorktreeTerminalManager {

func eventStream() -> AsyncStream<TerminalClient.Event> {
eventContinuation?.finish()
let (stream, continuation) = AsyncStream.makeStream(of: TerminalClient.Event.self)
let (stream, continuation) = AsyncStream.makeStream(
of: TerminalClient.Event.self,
bufferingPolicy: .bufferingNewest(Self.eventBufferCap)
)
eventContinuation = continuation
lastNotificationIndicatorCount = nil
// Reset dedup state before replaying so the replay re-seeds both caches; a
// fresh subscriber then has the latest value recorded for every key.
lastEmittedProjections.removeAll()
lastEmittedCoalescable.removeAll()
if !pendingEvents.isEmpty {
let bufferedEvents = pendingEvents
pendingEvents.removeAll()
for event in bufferedEvents {
// Re-emitted fresh below, so drop the buffered copy.
if case .notificationIndicatorChanged = event {
continue
}
continuation.yield(event)
// Route through emit() (not a raw yield) so a coalescable buffered event
// seeds lastEmittedCoalescable and the first identical live event dedups.
emit(event)
}
}
emitNotificationIndicatorCountIfNeeded()
Expand All @@ -358,19 +432,16 @@ final class WorktreeTerminalManager {
// Seed each worktree's projection so rows attached after the stream start
// pick up the current snapshot (otherwise they'd stay default until the
// next mutation).
lastEmittedProjections.removeAll()
for id in states.keys { emitProjection(for: id) }
// Replay per-tab projections / stripe-progress displays for the same reason:
// a new subscriber needs the existing `terminalTabs[id:]` rows seeded so
// tab-bar leaves don't render empty until the next per-tab mutation.
for (worktreeID, state) in states {
for projection in state.currentTabProjections() {
continuation.yield(.tabProjectionChanged(worktreeID: worktreeID, projection))
emit(.tabProjectionChanged(worktreeID: worktreeID, projection))
}
for (tabID, display) in state.currentTabProgressDisplays() {
continuation.yield(
.tabProgressDisplayChanged(worktreeID: worktreeID, tabID: tabID, display: display)
)
emit(.tabProgressDisplayChanged(worktreeID: worktreeID, tabID: tabID, display: display))
}
}
return stream
Expand Down Expand Up @@ -529,7 +600,7 @@ final class WorktreeTerminalManager {
}
states = states.filter { worktreeIDs.contains($0.key) }
cancelPendingIdleHooks(forSurfaceIDs: prunedSurfaceIDs)
for (id, _) in removed { lastEmittedProjections.removeValue(forKey: id) }
for (id, _) in removed { invalidateCaches(forPrunedWorktree: id) }
emitNotificationIndicatorCountIfNeeded()
emitHasAnyTerminalSurfaceIfNeeded()
killZmxSessions(prunedSessionIDs)
Expand Down Expand Up @@ -821,10 +892,69 @@ final class WorktreeTerminalManager {

private func emit(_ event: TerminalClient.Event) {
guard let eventContinuation else {
bufferPendingEvent(event)
return
}
if let key = Self.coalesceKey(for: event) {
guard lastEmittedCoalescable[key] != event else { return }
lastEmittedCoalescable[key] = event
}
// During prune this fires first and clears the coalesce keys; invalidateCaches
// then runs second only to clear the worktree-keyed lastEmittedProjections.
for key in Self.invalidatedCoalesceKeys(by: event) {
lastEmittedCoalescable.removeValue(forKey: key)
}
let result = eventContinuation.yield(event)
if case .dropped(let shed) = result {
terminalLogger.error(
"Terminal event buffer full (cap \(Self.eventBufferCap)); shed oldest buffered event: \(Self.label(for: shed))."
)
}
}

/// Buffers an event emitted before a subscriber attaches. Coalescable state
/// keeps only its latest value per key; lifecycle events accumulate up to a
/// cap, dropping the oldest so the pre-subscription buffer stays bounded.
private func bufferPendingEvent(_ event: TerminalClient.Event) {
if let key = Self.coalesceKey(for: event) {
pendingEvents.removeAll { Self.coalesceKey(for: $0) == key }
pendingEvents.append(event)
return
}
eventContinuation.yield(event)
// Mirror the live-path teardown purge so a buffered projection for a
// torn-down id can't replay ahead of its teardown on resubscribe.
let invalidated = Set(Self.invalidatedCoalesceKeys(by: event))
if !invalidated.isEmpty {
pendingEvents.removeAll { Self.coalesceKey(for: $0).map(invalidated.contains) ?? false }
}
if pendingEvents.count >= Self.pendingEventCap {
let dropped = pendingEvents.removeFirst()
terminalLogger.error(
"Pending terminal event buffer full (cap \(Self.pendingEventCap)); dropped oldest: \(Self.label(for: dropped))."
)
}
pendingEvents.append(event)
}

/// Coalesce keys a teardown event invalidates. A coalesced value for a removed
/// tab / worktree must not linger: a same-id reuse (snapshot restore reuses
/// persisted tab UUIDs) would otherwise be wrongly deduped and dropped.
private static func invalidatedCoalesceKeys(by event: TerminalClient.Event) -> [CoalesceKey] {
switch event {
case .tabRemoved(_, let tabID): [.tabProjection(tabID), .tabProgress(tabID)]
case .worktreeStateTornDown(let worktreeID):
[.worktreeProjection(worktreeID), .taskStatus(worktreeID), .focus(worktreeID)]
default: []
}
}

/// Clears the worktree-keyed lastEmittedProjections during prune; emit's purge has
/// already cleared the coalesce keys, which this re-clears as a guard against drift.
private func invalidateCaches(forPrunedWorktree id: Worktree.ID) {
lastEmittedProjections.removeValue(forKey: id)
for key in Self.invalidatedCoalesceKeys(by: .worktreeStateTornDown(worktreeID: id)) {
lastEmittedCoalescable.removeValue(forKey: key)
}
}

private func emitNotificationIndicatorCountIfNeeded() {
Expand Down
22 changes: 18 additions & 4 deletions supacodeTests/AgentPresence+TestHelpers.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ final class PresenceTestHarness {
private let reducer = AgentPresenceFeature()
private var stream: AsyncStream<TerminalClient.Event>?
private var consumeTask: Task<Void, Never>?
private weak var manager: WorktreeTerminalManager?
/// Bumped each time the consume task reduces a stream event.
private var processedCount = 0
/// Bumped each time the consume task is about to wait for the next event, i.e.
Expand Down Expand Up @@ -49,15 +50,28 @@ final class PresenceTestHarness {
for _ in 0..<64 {
let parksBefore = parkCount
let processedBefore = processedCount
await Task.megaYield(count: 10_000)
let parkedAgain = parkCount > parksBefore
let quiet = processedCount == processedBefore
settled = parkedAgain && quiet ? settled + 1 : 0
// Each megaYield spawns `count` detached tasks. A clock-awoken producer
// (e.g. an idle debounce resuming after `clock.advance`) needs enough
// yields within a single pass to resume, emit, and let the consumer
// reduce before we sample quiescence; too few and a busy suite schedules
// the resume after the sample, so we conclude "idle" before the idle
// event lands. 1000 keeps the per-call cost two orders below the legacy
// 10_000 while staying robust under contention.
await Task.megaYield(count: 1000)
// Quiescent when the consumer is parked, nothing processed this pass, and
// no idle-hook debounce is still scheduled. The last clause closes the
// race where `clock.advance` returned but the awoken idle task hasn't yet
// emitted: its key lingers in the manager until it does, so a pending
// count keeps draining instead of concluding "idle" too early.
let consumerIdle = parkCount == parksBefore && processedCount == processedBefore
let noPendingIdle = (manager?.pendingIdleHookCountForTesting ?? 0) == 0
settled = consumerIdle && noPendingIdle ? settled + 1 : 0
if settled >= 2 { return }
}
}

func attach(to manager: WorktreeTerminalManager) {
self.manager = manager
let stream = manager.eventStream()
self.stream = stream
consumeTask?.cancel()
Expand Down
23 changes: 23 additions & 0 deletions supacodeTests/WorktreeInfoWatcherManagerTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,29 @@ struct WorktreeInfoWatcherManagerTests {
await task.value
try FileManager.default.removeItem(at: tempRepository.tempRoot)
}

@Test func capsTheEventBufferUnderBackpressure() async throws {
let tempWorktree = try makeTempWorktree()
let manager = WorktreeInfoWatcherManager()
manager.handleCommand(.setPullRequestTrackingEnabled(false))
let stream = manager.eventStream()

// Each setWorktrees re-emits an immediate filesChanged for the worktree;
// with nothing draining, the buffer must cap rather than grow unbounded.
let overflow = WorktreeInfoWatcherManager.eventBufferCap + 50
for _ in 0..<overflow {
manager.handleCommand(.setWorktrees([tempWorktree.worktree]))
}
manager.handleCommand(.stop)

var count = 0
for await event in stream where event == .filesChanged(worktreeID: tempWorktree.worktree.id) {
count += 1
}

#expect(count == WorktreeInfoWatcherManager.eventBufferCap)
try FileManager.default.removeItem(at: tempWorktree.tempRoot)
}
}

actor EventCollector {
Expand Down
Loading
Loading