Skip to content

Conversation

@lp247
Copy link
Contributor

@lp247 lp247 commented Nov 24, 2025

This change implements request timeouts for any request sent to the Kafka cluster. The timeout's duration can be configured via option requestTimeout in every client type.

Additionally, the retrying logic in the base class is changed so that it retries only if every error in the current error chain is marked as retriable. Otherwise, timeout errors, which should be handled as non-retriable errors, would potentially lead to much longer timeouts due to multiple attempts being made.

(But also in general, I would assume that every non-retriable error encountered should block any additional attempt)

on-behalf-of: @SAP [email protected]

This change implements request timeouts for any request sent to the
Kafka cluster. The timeout's duration can be configured via option
`requestTimeout` in every client type.

Additionally, the retrying logic in the base class is changed so that
it retries only if every error in the current error chain is marked
as retriable. Otherwise, timeout errors, which should be handled as
non-retriable errors, would potentially lead to much longer timeouts
due to multiple attempts being made.

(But also in general, I would assume that every non-retriable error
encountered should block any additional attempt)

on-behalf-of: @SAP [email protected]
@lp247 lp247 force-pushed the feature/request_timeouts branch from 10601a1 to 305df00 Compare November 24, 2025 14:40
Copy link
Contributor

@ShogunPanda ShogunPanda left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this, I love the idea.
Few modifications but nothing big.

}

get status (): ConnectionStatusValue {
get status (): ConnectionStatus {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change (and the types change above)?

timedOut: false
}

this.#requestsQueue.push(fastQueueCallback => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than creating two functions here, I would use .bind.

this.#requestsQueue.push(fastQueueCallback => {
      request.callback = fastQueueCallback

      if (this.#socketMustBeDrained) {
        this.#afterDrainRequests.push(request)
        return false
      }

      return this.#sendRequest(request)
    }, this.#onResponse.bind(this, request, callback))

And #onResponse (or similar) you clear the timeout and then invoke callback.

Similarly for the setTimeout below:

if (!request.noResponse) {
        request.timeoutHandle = setTimeout(this.#onRequestTimeout.bind(this, request), this.#options.requestTimeout)
}

this.#socket.removeListener('error', connectingSocketErrorHandler)

this.#socket.on('error', this.#onError.bind(this))
this.#socket.on('error', this.#connectedSocketErrorHandler.bind(this))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please revert this.

let mockCount = 1
mockAPI(consumer[kConnections], consumerGroupHeartbeatV0.api.key, null, null, () => mockCount-- > 0)
mockAPI(consumer[kConnections], consumerGroupHeartbeatV0.api.key, null, null, (
__originalSend,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please only use a single _ in this call.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants