From f131fae164a44140e0fa2823f36ce5b771c3f94a Mon Sep 17 00:00:00 2001 From: Jesse Wilson Date: Sun, 17 Dec 2023 09:39:59 -0500 Subject: [PATCH] Change SSE to use the OkHttp public API only (#8141) * Change SSE to use the OkHttp public API only Previously we prevented end-users from using their own implementations of Call.Factory because we casted down to RealCall in RealEventSource. With this change we're implementing SSE without depending on any OkHttp implementation details. This also introduces a new function in EventSources to create an EventSource.Factory from a Call.Factory, and hides the previous implementation that required a concrete OkHttpClient. Finally this fixes SSE to publish the same EventListener events as regular HTTP calls. * apiDump --- okhttp-sse/api/okhttp-sse.api | 3 +- .../main/kotlin/okhttp3/sse/EventSources.kt | 12 +++++- .../okhttp3/sse/internal/RealEventSource.kt | 18 +++----- .../sse/internal/EventSourceHttpTest.java | 43 ++++++++++++++++++- 4 files changed, 59 insertions(+), 17 deletions(-) diff --git a/okhttp-sse/api/okhttp-sse.api b/okhttp-sse/api/okhttp-sse.api index e9265a686154..d1baa0735188 100644 --- a/okhttp-sse/api/okhttp-sse.api +++ b/okhttp-sse/api/okhttp-sse.api @@ -17,7 +17,8 @@ public abstract class okhttp3/sse/EventSourceListener { public final class okhttp3/sse/EventSources { public static final field INSTANCE Lokhttp3/sse/EventSources; - public static final fun createFactory (Lokhttp3/OkHttpClient;)Lokhttp3/sse/EventSource$Factory; + public static final fun createFactory (Lokhttp3/Call$Factory;)Lokhttp3/sse/EventSource$Factory; + public static final synthetic fun createFactory (Lokhttp3/OkHttpClient;)Lokhttp3/sse/EventSource$Factory; public static final fun processResponse (Lokhttp3/Response;Lokhttp3/sse/EventSourceListener;)V } diff --git a/okhttp-sse/src/main/kotlin/okhttp3/sse/EventSources.kt b/okhttp-sse/src/main/kotlin/okhttp3/sse/EventSources.kt index cb6a3c1e749d..f3dc002f39d7 100644 --- a/okhttp-sse/src/main/kotlin/okhttp3/sse/EventSources.kt +++ b/okhttp-sse/src/main/kotlin/okhttp3/sse/EventSources.kt @@ -15,13 +15,21 @@ */ package okhttp3.sse +import okhttp3.Call import okhttp3.OkHttpClient import okhttp3.Response import okhttp3.sse.internal.RealEventSource object EventSources { + @Deprecated( + message = "required for binary-compatibility!", + level = DeprecationLevel.HIDDEN, + ) @JvmStatic - fun createFactory(client: OkHttpClient): EventSource.Factory { + fun createFactory(client: OkHttpClient) = createFactory(client as Call.Factory) + + @JvmStatic + fun createFactory(callFactory: Call.Factory): EventSource.Factory { return EventSource.Factory { request, listener -> val actualRequest = if (request.header("Accept") == null) { @@ -31,7 +39,7 @@ object EventSources { } RealEventSource(actualRequest, listener).apply { - connect(client) + connect(callFactory) } } } diff --git a/okhttp-sse/src/main/kotlin/okhttp3/sse/internal/RealEventSource.kt b/okhttp-sse/src/main/kotlin/okhttp3/sse/internal/RealEventSource.kt index 1346b28097b2..1cd3d135d479 100644 --- a/okhttp-sse/src/main/kotlin/okhttp3/sse/internal/RealEventSource.kt +++ b/okhttp-sse/src/main/kotlin/okhttp3/sse/internal/RealEventSource.kt @@ -18,12 +18,9 @@ package okhttp3.sse.internal import java.io.IOException import okhttp3.Call import okhttp3.Callback -import okhttp3.EventListener -import okhttp3.OkHttpClient import okhttp3.Request import okhttp3.Response import okhttp3.ResponseBody -import okhttp3.internal.connection.RealCall import okhttp3.internal.stripBody import okhttp3.sse.EventSource import okhttp3.sse.EventSourceListener @@ -32,16 +29,13 @@ internal class RealEventSource( private val request: Request, private val listener: EventSourceListener ) : EventSource, ServerSentEventReader.Callback, Callback { - private var call: RealCall? = null + private var call: Call? = null @Volatile private var canceled = false - fun connect(client: OkHttpClient) { - val client = client.newBuilder() - .eventListener(EventListener.NONE) - .build() - val realCall = client.newCall(request) as RealCall - call = realCall - realCall.enqueue(this) + fun connect(callFactory: Call.Factory) { + call = callFactory.newCall(request).apply { + enqueue(this@RealEventSource) + } } override fun onResponse(call: Call, response: Response) { @@ -64,7 +58,7 @@ internal class RealEventSource( } // This is a long-lived response. Cancel full-call timeouts. - call?.timeoutEarlyExit() + call?.timeout()?.cancel() // Replace the body with a stripped one so the callbacks can't see real data. val response = response.stripBody() diff --git a/okhttp-sse/src/test/java/okhttp3/sse/internal/EventSourceHttpTest.java b/okhttp-sse/src/test/java/okhttp3/sse/internal/EventSourceHttpTest.java index 19229feadbb9..418a595dfeae 100644 --- a/okhttp-sse/src/test/java/okhttp3/sse/internal/EventSourceHttpTest.java +++ b/okhttp-sse/src/test/java/okhttp3/sse/internal/EventSourceHttpTest.java @@ -23,6 +23,7 @@ import mockwebserver3.junit5.internal.MockWebServerExtension; import okhttp3.OkHttpClient; import okhttp3.OkHttpClientTestRule; +import okhttp3.RecordingEventListener; import okhttp3.Request; import okhttp3.sse.EventSource; import okhttp3.sse.EventSources; @@ -34,7 +35,6 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.RegisterExtension; import org.junitpioneer.jupiter.RetryingTest; - import static org.assertj.core.api.Assertions.assertThat; @Tag("Slowish") @@ -45,8 +45,12 @@ public final class EventSourceHttpTest { private MockWebServer server; @RegisterExtension public final OkHttpClientTestRule clientTestRule = new OkHttpClientTestRule(); + private final RecordingEventListener eventListener = new RecordingEventListener(); + private final EventSourceRecorder listener = new EventSourceRecorder(); - private OkHttpClient client = clientTestRule.newClient(); + private OkHttpClient client = clientTestRule.newClientBuilder() + .eventListenerFactory(clientTestRule.wrap(eventListener)) + .build(); @BeforeEach public void before(MockWebServer server) { this.server = server; @@ -177,6 +181,41 @@ public void cancelInEventShortCircuits() throws IOException { assertThat(server.takeRequest().getHeaders().get("Accept")).isEqualTo("text/event-stream"); } + @Test public void eventListenerEvents() { + server.enqueue(new MockResponse.Builder() + .body("" + + "data: hey\n" + + "\n").setHeader("content-type", "text/event-stream") + .build()); + + EventSource source = newEventSource(); + + assertThat(source.request().url().encodedPath()).isEqualTo("/"); + + listener.assertOpen(); + listener.assertEvent(null, null, "hey"); + listener.assertClose(); + + assertThat(eventListener.recordedEventTypes()).containsExactly( + "CallStart", + "ProxySelectStart", + "ProxySelectEnd", + "DnsStart", + "DnsEnd", + "ConnectStart", + "ConnectEnd", + "ConnectionAcquired", + "RequestHeadersStart", + "RequestHeadersEnd", + "ResponseHeadersStart", + "ResponseHeadersEnd", + "ResponseBodyStart", + "ResponseBodyEnd", + "ConnectionReleased", + "CallEnd" + ); + } + private EventSource newEventSource() { return newEventSource(null); }