Skip to content
168 changes: 142 additions & 26 deletions internal/runtime/executor/codex_websockets_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,40 @@ type codexWebsocketRead struct {
err error
}

func trySendCodexWebsocketRead(ch chan codexWebsocketRead, done <-chan struct{}, ev codexWebsocketRead) {
if ch == nil {
return
}
defer func() {
if r := recover(); r != nil {
log.Debugf("codex websockets executor: recover trySendCodexWebsocketRead panic=%v", r)
}
}()
select {
case ch <- ev:
case <-done:
default:
}
}

func tryCloseCodexWebsocketRead(ch chan codexWebsocketRead) {
if ch == nil {
return
}
defer func() {
if r := recover(); r != nil {
log.Debugf("codex websockets executor: recover tryCloseCodexWebsocketRead panic=%v", r)
}
}()
close(ch)
}

func (s *codexWebsocketSession) setActive(ch chan codexWebsocketRead) {
if s == nil {
return
}
// 该方法仅持有 activeMu 调用避免与 connMu->activeMu 锁序冲突
// 不要在持有 connMu 时调用避免未来引入反向锁序
s.activeMu.Lock()
if s.activeCancel != nil {
s.activeCancel()
Expand All @@ -105,6 +135,8 @@ func (s *codexWebsocketSession) clearActive(ch chan codexWebsocketRead) {
if s == nil {
return
}
// 该方法仅持有 activeMu 调用避免与 connMu->activeMu 锁序冲突
// 不要在持有 connMu 时调用避免未来引入反向锁序
s.activeMu.Lock()
if s.activeCh == ch {
s.activeCh = nil
Expand All @@ -117,6 +149,61 @@ func (s *codexWebsocketSession) clearActive(ch chan codexWebsocketRead) {
s.activeMu.Unlock()
}

func (s *codexWebsocketSession) isCurrentConn(conn *websocket.Conn) bool {
if s == nil || conn == nil {
return false
}
s.connMu.Lock()
current := s.conn
s.connMu.Unlock()
return current == conn
}

func (s *codexWebsocketSession) activeSnapshotForCurrentConn(conn *websocket.Conn) (chan codexWebsocketRead, <-chan struct{}, bool) {
if s == nil || conn == nil {
return nil, nil, false
}
// 锁顺序固定为 connMu -> activeMu
s.connMu.Lock()
if s.conn != conn {
s.connMu.Unlock()
return nil, nil, false
}
s.activeMu.Lock()
ch := s.activeCh
done := s.activeDone
s.activeMu.Unlock()
s.connMu.Unlock()
return ch, done, true
}

func (s *codexWebsocketSession) clearActiveForCurrentConn(conn *websocket.Conn, ch chan codexWebsocketRead) bool {
if s == nil || conn == nil || ch == nil {
return false
}
// 锁顺序固定为 connMu -> activeMu
s.connMu.Lock()
if s.conn != conn {
s.connMu.Unlock()
return false
}
s.activeMu.Lock()
if s.activeCh != ch {
s.activeMu.Unlock()
s.connMu.Unlock()
return false
}
s.activeCh = nil
if s.activeCancel != nil {
s.activeCancel()
}
s.activeCancel = nil
s.activeDone = nil
s.activeMu.Unlock()
s.connMu.Unlock()
return true
}

func (s *codexWebsocketSession) writeMessage(conn *websocket.Conn, msgType int, payload []byte) error {
if s == nil {
return fmt.Errorf("codex websockets executor: session is nil")
Expand Down Expand Up @@ -1064,15 +1151,24 @@ func (e *CodexWebsocketsExecutor) getOrCreateSession(sessionID string) *codexWeb
}

func (e *CodexWebsocketsExecutor) ensureUpstreamConn(ctx context.Context, auth *cliproxyauth.Auth, sess *codexWebsocketSession, authID string, wsURL string, headers http.Header) (*websocket.Conn, *http.Response, error) {
authID = strings.TrimSpace(authID)
if sess == nil {
return e.dialCodexWebsocket(ctx, auth, wsURL, headers)
}

sess.connMu.Lock()
conn := sess.conn
readerConn := sess.readerConn
currentAuthID := strings.TrimSpace(sess.authID)
sess.connMu.Unlock()
if conn != nil && currentAuthID != authID {
// 账号切换时先断开旧连接避免继续复用旧账号
e.invalidateUpstreamConn(sess, conn, "auth_switched", nil)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Avoid tearing down active reader channel on auth switch

Calling invalidateUpstreamConn during auth changes closes the old socket immediately, but the old readUpstreamLoop can still wake later and run its error path, which unconditionally clearActive/closes whatever channel is currently active for the session. If a new request has already installed its own activeCh, that channel gets closed and readCodexWebsocketMessage returns session read channel closed, aborting a healthy post-switch request. This race is specific to in-session auth switching and can break the new quota-recovery flow intermittently.

Useful? React with 👍 / 👎.

conn = nil
readerConn = nil
}
if conn != nil {
// 账号未变化时复用连接减少不必要重连
if readerConn != conn {
sess.connMu.Lock()
sess.readerConn = conn
Expand Down Expand Up @@ -1114,21 +1210,24 @@ func (e *CodexWebsocketsExecutor) readUpstreamLoop(sess *codexWebsocketSession,
return
}
for {
if !sess.isCurrentConn(conn) {
// 旧连接读循环直接退出避免误伤新请求通道
return
}
_ = conn.SetReadDeadline(time.Now().Add(codexResponsesWebsocketIdleTimeout))
msgType, payload, errRead := conn.ReadMessage()
if errRead != nil {
sess.activeMu.Lock()
ch := sess.activeCh
done := sess.activeDone
sess.activeMu.Unlock()
// 在同一临界区做归属校验和通道快照避免检查后竞态
ch, done, current := sess.activeSnapshotForCurrentConn(conn)
if !current {
// 旧连接读错时不触碰当前活跃通道
return
}
Comment on lines +1222 to +1225

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Notify active reader when old conn is force-closed

Do not return immediately on !current here, because closeExecutionSession sets sess.conn = nil before closing the socket; when that close unblocks ReadMessage, this branch exits without sending an error to the active channel. In that case readCodexWebsocketMessage keeps waiting on readCh for the in-flight request until its context is canceled, so explicit session shutdown (for example CloseExecutionSession/executor replacement paths) can hang requests instead of failing fast.

Useful? React with 👍 / 👎.

if ch != nil {
select {
case ch <- codexWebsocketRead{conn: conn, err: errRead}:
case <-done:
default:
trySendCodexWebsocketRead(ch, done, codexWebsocketRead{conn: conn, err: errRead})
if sess.clearActiveForCurrentConn(conn, ch) {
tryCloseCodexWebsocketRead(ch)
}
sess.clearActive(ch)
close(ch)
}
e.invalidateUpstreamConn(sess, conn, "upstream_disconnected", errRead)
return
Expand All @@ -1137,29 +1236,29 @@ func (e *CodexWebsocketsExecutor) readUpstreamLoop(sess *codexWebsocketSession,
if msgType != websocket.TextMessage {
if msgType == websocket.BinaryMessage {
errBinary := fmt.Errorf("codex websockets executor: unexpected binary message")
sess.activeMu.Lock()
ch := sess.activeCh
done := sess.activeDone
sess.activeMu.Unlock()
// 在同一临界区做归属校验和通道快照避免检查后竞态
ch, done, current := sess.activeSnapshotForCurrentConn(conn)
if !current {
// 旧连接二进制异常时不触碰当前活跃通道
return
}
if ch != nil {
select {
case ch <- codexWebsocketRead{conn: conn, err: errBinary}:
case <-done:
default:
trySendCodexWebsocketRead(ch, done, codexWebsocketRead{conn: conn, err: errBinary})
if sess.clearActiveForCurrentConn(conn, ch) {
tryCloseCodexWebsocketRead(ch)
}
sess.clearActive(ch)
close(ch)
}
e.invalidateUpstreamConn(sess, conn, "unexpected_binary", errBinary)
return
}
continue
}

sess.activeMu.Lock()
ch := sess.activeCh
done := sess.activeDone
sess.activeMu.Unlock()
// 在同一临界区做归属校验和通道快照避免检查后竞态
ch, done, current := sess.activeSnapshotForCurrentConn(conn)
if !current {
// 旧连接消息不再分发给新连接请求
return
}
if ch == nil {
continue
}
Expand Down Expand Up @@ -1246,17 +1345,34 @@ func (e *CodexWebsocketsExecutor) closeExecutionSession(sess *codexWebsocketSess
reason = "session_closed"
}

// 锁顺序固定为 connMu -> activeMu
sess.connMu.Lock()
conn := sess.conn
authID := sess.authID
wsURL := sess.wsURL
sessionID := sess.sessionID
sess.conn = nil
if sess.readerConn == conn {
sess.readerConn = nil
}
sessionID := sess.sessionID
sess.activeMu.Lock()
ch := sess.activeCh
done := sess.activeDone
if sess.activeCancel != nil {
sess.activeCancel()
}
sess.activeCh = nil
sess.activeCancel = nil
sess.activeDone = nil
sess.activeMu.Unlock()
sess.connMu.Unlock()

if ch != nil {
// 会话关闭时允许主动 fail active 唤醒在途 readCodexWebsocketMessage
trySendCodexWebsocketRead(ch, done, codexWebsocketRead{conn: conn, err: fmt.Errorf("codex websockets executor: execution session closed")})
tryCloseCodexWebsocketRead(ch)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Avoid closing active read channel during session teardown

closeExecutionSession now closes activeCh directly, but readUpstreamLoop still forwards text frames with a plain ch <- ... send. If CloseExecutionSession/closeAllExecutionSessions runs while a stream is in flight, that concurrent close(ch) can race with the sender and trigger a send on closed channel panic, crashing the process instead of failing the request cleanly.

Useful? React with 👍 / 👎.

}

if conn == nil {
return
}
Expand Down
Loading
Loading