Skip to content

Commit 060b1b7

Browse files
authored
[Dataflow Streaming] Remove one wait for GetData (#36417)
Harness threads will no longer wait for sending threads directly. They'll wait on the responseStream and will observe failures when the responseStream is cancelled. Reduces context switches and cpu usage under GetData
1 parent 6d5b984 commit 060b1b7

File tree

2 files changed

+6
-9
lines changed

2 files changed

+6
-9
lines changed

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -502,11 +502,9 @@ private void queueRequestAndWait(QueuedRequest request)
502502
prevBatch.waitForSendOrFailNotification();
503503
}
504504
trySendBatch(batch);
505-
// Since the above send may not succeed, we fall through to block on sending or failure.
505+
// If the send fails, request.responseStream will be cancelled and
506+
// reading responseStream will throw.
506507
}
507-
508-
// Wait for this batch to be sent before parsing the response.
509-
batch.waitForSendOrFailNotification();
510508
}
511509

512510
private synchronized void trySendBatch(QueuedBatch batch) throws WindmillStreamShutdownException {

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStreamRequests.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -190,13 +190,12 @@ void notifySent() {
190190
sent.countDown();
191191
}
192192

193-
/**
194-
* Let waiting for threads know that a failure occurred.
195-
*
196-
* @implNote Thread safe.
197-
*/
193+
/** Let waiting for threads know that a failure occurred. */
198194
void notifyFailed() {
199195
failed = true;
196+
for (QueuedRequest request : requests) {
197+
request.getResponseStream().cancel();
198+
}
200199
sent.countDown();
201200
}
202201

0 commit comments

Comments
 (0)