Skip to content

Commit a760df7

Browse files
committed
[SPARK-51667][SS][PYTHON] Disable Nagle's algorithm (via TCP_NODELAY = true) in TWS + PySpark for python <-> state server
### What changes were proposed in this pull request? This PR proposes to disable Nagle's algorithm (TCP_NODELAY = true) for the connection between Python worker and state server, in TWS + PySpark. ### Why are the changes needed? We have observed the consistent latency increment, which is almost slightly more than 40ms, from specific state interactions. e.g. ListState.put() / ListState.get() / ListState.appendList(). The root cause is figured out as the bad combination of Nagle's algorithm and delayed ACK. The sequence is following: 1. Python worker sends the proto message to JVM, and flushes the socket. 2. Additionally, Python worker sends the follow-up data to JVM, and flushes the socket. 3. JVM reads the proto message, and realizes there is follow-up data. 4. JVM reads the follow-up data. 5. JVM processes the request, and sends the response back to Python worker. Due to delayed ACK, even after 3, ACK is not sent back from JVM to Python worker. It is waiting for some data or multiple ACKs to be sent, but JVM is not going to send the data during that phase. Due to Nagle's algorithm, the message from 2 is not sent to JVM since there is no ACK for the message from 1. (There is in-flight unacknowledged message.) This deadlock situation is resolved after the timeout of delayed ACK, which is 40ms (minimum duration) in Linux. After the timeout, ACK is sent back from JVM to Python worker, hence Nagle's algorithm allows the message from 2 to be finally sent to JVM. The direction can be flipped depending on the command - the same thing can happen on the opposite direction of communication, JVM to Python worker. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Manually tested (via adding debug log to measure the time spent from the state interaction). Beyond that, this should pass the existing tests, which will be verified by CI. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #50460 from HeartSaVioR/SPARK-51667. Authored-by: Jungtaek Lim <[email protected]> Signed-off-by: Jungtaek Lim <[email protected]>
1 parent 02db872 commit a760df7

2 files changed

Lines changed: 24 additions & 0 deletions

File tree

python/pyspark/sql/streaming/stateful_processor_api_client.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,18 @@ def __init__(
5454
self.key_schema = key_schema
5555
self._client_socket = socket.socket()
5656
self._client_socket.connect(("localhost", state_server_port))
57+
58+
# SPARK-51667: We have a pattern of sending messages continuously from one side
59+
# (Python -> JVM, and vice versa) before getting response from other side. Since most
60+
# messages we are sending are small, this triggers the bad combination of Nagle's algorithm
61+
# and delayed ACKs, which can cause a significant delay on the latency.
62+
# See SPARK-51667 for more details on how this can be a problem.
63+
#
64+
# Disabling either would work, but it's more common to disable Nagle's algorithm; there is
65+
# lot less reference to disabling delayed ACKs, while there are lots of resources to
66+
# disable Nagle's algorithm.
67+
self._client_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
68+
5769
self.sockfile = self._client_socket.makefile(
5870
"rwb", int(os.environ.get("SPARK_BUFFER_SIZE", 65536))
5971
)

sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPandasStateServer.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,18 @@ class TransformWithStateInPandasStateServer(
138138

139139
def run(): Unit = {
140140
val listeningSocket = stateServerSocket.accept()
141+
142+
// SPARK-51667: We have a pattern of sending messages continuously from one side
143+
// (Python -> JVM, and vice versa) before getting response from other side. Since most
144+
// messages we are sending are small, this triggers the bad combination of Nagle's algorithm
145+
// and delayed ACKs, which can cause a significant delay on the latency.
146+
// See SPARK-51667 for more details on how this can be a problem.
147+
//
148+
// Disabling either would work, but it's more common to disable Nagle's algorithm; there is
149+
// lot less reference to disabling delayed ACKs, while there are lots of resources to
150+
// disable Nagle's algorithm.
151+
listeningSocket.setTcpNoDelay(true)
152+
141153
inputStream = new DataInputStream(
142154
new BufferedInputStream(listeningSocket.getInputStream))
143155
outputStream = new DataOutputStream(

0 commit comments

Comments
 (0)