Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-51667][SS][PYTHON] Disable Nagle's algorithm (via TCP_NODELAY = true) in TWS + PySpark for python <-> state server #50460

Closed
wants to merge 1 commit into from

Conversation

HeartSaVioR
Copy link
Contributor

@HeartSaVioR HeartSaVioR commented Mar 31, 2025

What changes were proposed in this pull request?

This PR proposes to disable Nagle's algorithm (TCP_NODELAY = true) for the connection between Python worker and state server, in TWS + PySpark.

Why are the changes needed?

We have observed the consistent latency increment, which is almost slightly more than 40ms, from specific state interactions. e.g. ListState.put() / ListState.get() / ListState.appendList().

The root cause is figured out as the bad combination of Nagle's algorithm and delayed ACK. The sequence is following:

  1. Python worker sends the proto message to JVM, and flushes the socket.
  2. Additionally, Python worker sends the follow-up data to JVM, and flushes the socket.
  3. JVM reads the proto message, and realizes there is follow-up data.
  4. JVM reads the follow-up data.
  5. JVM processes the request, and sends the response back to Python worker.

Due to delayed ACK, even after 3, ACK is not sent back from JVM to Python worker. It is waiting for some data or multiple ACKs to be sent, but JVM is not going to send the data during that phase.

Due to Nagle's algorithm, the message from 2 is not sent to JVM since there is no ACK for the message from 1. (There is in-flight unacknowledged message.)

This deadlock situation is resolved after the timeout of delayed ACK, which is 40ms (minimum duration) in Linux. After the timeout, ACK is sent back from JVM to Python worker, hence Nagle's algorithm allows the message from 2 to be finally sent to JVM.

The direction can be flipped depending on the command - the same thing can happen on the opposite direction of communication, JVM to Python worker.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Manually tested (via adding debug log to measure the time spent from the state interaction).

Beyond that, this should pass the existing tests, which will be verified by CI.

Was this patch authored or co-authored using generative AI tooling?

No.

@HeartSaVioR HeartSaVioR changed the title [SPARK-51667][SS] Disable Nagle's algorithm (TCP_NODELAY = true) in TWS + PySpark for python <-> state server [SPARK-51667][SS] Disable Nagle's algorithm (via TCP_NODELAY = true) in TWS + PySpark for python <-> state server Mar 31, 2025
@neilramaswamy
Copy link
Contributor

Is this not a problem for two back-to-back calls to ValueState.get because the second call to valueState.get() on the Python side will only proceed after the response (not ack) from the first one is received?

@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Mar 31, 2025

I think we do not design any "asynchronous" or "concurrent" execution of state interaction. Every request with state server is "blocking". As an implementation detail, we limit the amount of data in a call for iterator on state interaction, and the other state interaction can be made while the consumption of iterator has not finished, but this doesn't mean we support "concurrent" state interations.

@HeartSaVioR HeartSaVioR changed the title [SPARK-51667][SS] Disable Nagle's algorithm (via TCP_NODELAY = true) in TWS + PySpark for python <-> state server [SPARK-51667][SS][PYTHON] Disable Nagle's algorithm (via TCP_NODELAY = true) in TWS + PySpark for python <-> state server Mar 31, 2025
@HeartSaVioR
Copy link
Contributor Author

cc. @HyukjinKwon @cloud-fan PTAL, thanks!

@HeartSaVioR
Copy link
Contributor Author

cc. @bogao007 since he authored the code.

@HeartSaVioR
Copy link
Contributor Author

Thanks! Merging to master/4.0 (since the delay is not a trivial).

HeartSaVioR added a commit that referenced this pull request Mar 31, 2025
…= true) in TWS + PySpark for python <-> state server

### What changes were proposed in this pull request?

This PR proposes to disable Nagle's algorithm (TCP_NODELAY = true) for the connection between Python worker and state server, in TWS + PySpark.

### Why are the changes needed?

We have observed the consistent latency increment, which is almost slightly more than 40ms, from specific state interactions. e.g. ListState.put() / ListState.get() / ListState.appendList().

The root cause is figured out as the bad combination of Nagle's algorithm and delayed ACK. The sequence is following:

1. Python worker sends the proto message to JVM, and flushes the socket.
2. Additionally, Python worker sends the follow-up data to JVM, and flushes the socket.
3. JVM reads the proto message, and realizes there is follow-up data.
4. JVM reads the follow-up data.
5. JVM processes the request, and sends the response back to Python worker.

Due to delayed ACK, even after 3, ACK is not sent back from JVM to Python worker. It is waiting for some data or multiple ACKs to be sent, but JVM is not going to send the data during that phase.

Due to Nagle's algorithm, the message from 2 is not sent to JVM since there is no ACK for the message from 1. (There is in-flight unacknowledged message.)

This deadlock situation is resolved after the timeout of delayed ACK, which is 40ms (minimum duration) in Linux. After the timeout, ACK is sent back from JVM to Python worker, hence Nagle's algorithm allows the message from 2 to be finally sent to JVM.

The direction can be flipped depending on the command - the same thing can happen on the opposite direction of communication, JVM to Python worker.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Manually tested (via adding debug log to measure the time spent from the state interaction).

Beyond that, this should pass the existing tests, which will be verified by CI.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #50460 from HeartSaVioR/SPARK-51667.

Authored-by: Jungtaek Lim <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
(cherry picked from commit a760df7)
Signed-off-by: Jungtaek Lim <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants