From 9a66ceb772689c5701386ea218c377985fa4b02b Mon Sep 17 00:00:00 2001 From: Moritz Heidkamp Date: Thu, 15 Feb 2024 12:18:13 +0100 Subject: [PATCH 1/8] Support cancellation during client connection establishment With #714 we added support for cancelling in-flight HTTP requests by putting the response deferred into an error state. However, this only worked once the underlying TCP connection was established. With this patch, it is now possible to cancel requests even while the connection is still being established (possible since Netty 4.1.108.Final via https://github.com/netty/netty/pull/13849). This also works for `aleph.tcp/client`. --- src/aleph/http.clj | 34 +++--- src/aleph/http/client.clj | 226 ++++++++++++++++++++------------------ src/aleph/netty.clj | 73 +++++++----- src/aleph/tcp.clj | 26 +++-- test/aleph/http_test.clj | 30 ++++- test/aleph/tcp_test.clj | 26 ++++- test/aleph/testutils.clj | 30 ++++- 7 files changed, 283 insertions(+), 162 deletions(-) diff --git a/src/aleph/http.clj b/src/aleph/http.clj index 06b70587..7a471844 100644 --- a/src/aleph/http.clj +++ b/src/aleph/http.clj @@ -100,20 +100,23 @@ will be errors, and a new connection must be created." [^URI uri options middleware on-closed] (let [scheme (.getScheme uri) - ssl? (= "https" scheme)] - (-> (client/http-connection - (InetSocketAddress/createUnresolved - (.getHost uri) - (int - (or - (when (pos? (.getPort uri)) (.getPort uri)) - (if ssl? 443 80)))) - ssl? - (if on-closed - (assoc options :on-closed on-closed) - options)) - - (d/chain' middleware)))) + ssl? (= "https" scheme) + conn (client/http-connection + (InetSocketAddress/createUnresolved + (.getHost uri) + (int + (or + (when (pos? (.getPort uri)) (.getPort uri)) + (if ssl? 443 80)))) + ssl? + (if on-closed + (assoc options :on-closed on-closed) + options))] + (doto (d/chain' conn middleware) + (d/catch' (fn [e] + (log/trace e "Terminating creation of HTTP connection") + (d/error! conn e) + (d/error-deferred e)))))) (def ^:private connection-stats-callbacks (atom #{})) @@ -389,6 +392,9 @@ ;; function. (reset! dispose-conn! (fn [] (flow/dispose pool k conn))) + ;; allow cancellation during connection establishment + (d/connect result (first conn)) + (if (realized? result) ;; to account for race condition between setting `dispose-conn!` ;; and putting `result` into error state for cancellation diff --git a/src/aleph/http/client.clj b/src/aleph/http/client.clj index 9b4e252a..2e08f641 100644 --- a/src/aleph/http/client.clj +++ b/src/aleph/http/client.clj @@ -821,112 +821,126 @@ :local-address local-address :transport (netty/determine-transport transport epoll?) :name-resolver name-resolver - :connect-timeout connect-timeout})] - - (attach-on-close-handler ch-d on-closed) - - (d/chain' ch-d - (fn setup-client - [^Channel ch] - (log/debug "Channel:" ch) - - ;; We know the SSL handshake must be complete because create-client wraps the - ;; future with maybe-ssl-handshake-future, so we can get the negotiated - ;; protocol, falling back to HTTP/1.1 by default. - (let [pipeline (.pipeline ch) - protocol (cond - ssl? - (or (-> pipeline - ^SslHandler (.get ^Class SslHandler) - (.applicationProtocol)) - ApplicationProtocolNames/HTTP_1_1) ; Not using ALPN, HTTP/2 isn't allowed - - force-h2c? - (do - (log/info "Forcing HTTP/2 over cleartext. Be sure to do this only with servers you control.") - ApplicationProtocolNames/HTTP_2) - - :else - ApplicationProtocolNames/HTTP_1_1) ; Not using SSL, HTTP/2 isn't allowed unless h2c requested - setup-opts (assoc opts - :authority authority - :ch ch - :server? false - :keep-alive? keep-alive? - :keep-alive?' keep-alive?' - :logger logger - :non-tun-proxy? non-tun-proxy? - :pipeline pipeline - :pipeline-transform pipeline-transform - :raw-stream? raw-stream? - :remote-address remote-address - :response-buffer-size response-buffer-size - :ssl-context ssl-context - :ssl? ssl?)] - - (log/debug (str "Using HTTP protocol: " protocol) - {:authority authority - :ssl? ssl? - :force-h2c? force-h2c?}) - - ;; can't use ApnHandler, because we need to coordinate with Manifold code - (let [http-req-handler - (cond (.equals ApplicationProtocolNames/HTTP_1_1 protocol) - (setup-http1-client setup-opts) - - (.equals ApplicationProtocolNames/HTTP_2 protocol) - (do - (http2/setup-conn-pipeline setup-opts) - (http2-req-handler setup-opts)) - - :else - (do - (let [msg (str "Unknown protocol: " protocol) - e (IllegalStateException. msg)] - (log/error e msg) - (netty/close ch) - (throw e))))] - - ;; Both Netty and Aleph are set up, unpause the pipeline - (when (.get pipeline "pause-handler") - (log/debug "Unpausing pipeline") - (.remove pipeline "pause-handler")) - - (fn http-req-fn - [req] - (log/trace "http-req-fn fired") - (log/debug "client request:" (pr-str req)) - - ;; If :aleph/close is set in the req, closes the channel and - ;; returns a deferred containing the result. - (if (or (contains? req :aleph/close) - (contains? req ::close)) - (-> ch (netty/close) (netty/wrap-future)) - - (let [t0 (System/nanoTime) - ;; I suspect the below is an error for http1 - ;; since the shared handler might not match. - ;; Should work for HTTP2, though - raw-stream? (get req :raw-stream? raw-stream?)] - - (if (or (not (.isActive ch)) - (not (.isOpen ch))) - - (d/error-deferred - (ex-info "Channel is inactive/closed." - {:req req - :ch ch - :open? (.isOpen ch) - :active? (.isActive ch)})) - - (-> (http-req-handler req) - (d/chain' (rsp-handler - {:ch ch - :keep-alive? keep-alive? ; why not keep-alive?' - :raw-stream? raw-stream? - :req req - :response-buffer-size response-buffer-size - :t0 t0}))))))))))))) + :connect-timeout connect-timeout}) + + _ (attach-on-close-handler ch-d on-closed) + + close-ch! (atom (fn [])) + result (d/deferred) + + conn (d/chain' ch-d + (fn setup-client + [^Channel ch] + (log/debug "Channel:" ch) + (reset! close-ch! (fn [] @(-> (netty/close ch) (netty/wrap-future)))) + (if (realized? result) + ;; Account for race condition between setting `close-ch!` and putting + ;; `result` into error state for cancellation + (@close-ch!) + ;; We know the SSL handshake must be complete because create-client wraps the + ;; future with maybe-ssl-handshake-future, so we can get the negotiated + ;; protocol, falling back to HTTP/1.1 by default. + (let [pipeline (.pipeline ch) + protocol (cond + ssl? + (or (-> pipeline + ^SslHandler (.get ^Class SslHandler) + (.applicationProtocol)) + ApplicationProtocolNames/HTTP_1_1) ; Not using ALPN, HTTP/2 isn't allowed + + force-h2c? + (do + (log/info "Forcing HTTP/2 over cleartext. Be sure to do this only with servers you control.") + ApplicationProtocolNames/HTTP_2) + + :else + ApplicationProtocolNames/HTTP_1_1) ; Not using SSL, HTTP/2 isn't allowed unless h2c requested + setup-opts (assoc opts + :authority authority + :ch ch + :server? false + :keep-alive? keep-alive? + :keep-alive?' keep-alive?' + :logger logger + :non-tun-proxy? non-tun-proxy? + :pipeline pipeline + :pipeline-transform pipeline-transform + :raw-stream? raw-stream? + :remote-address remote-address + :response-buffer-size response-buffer-size + :ssl-context ssl-context + :ssl? ssl?)] + + (log/debug (str "Using HTTP protocol: " protocol) + {:authority authority + :ssl? ssl? + :force-h2c? force-h2c?}) + + ;; can't use ApnHandler, because we need to coordinate with Manifold code + (let [http-req-handler + (cond (.equals ApplicationProtocolNames/HTTP_1_1 protocol) + (setup-http1-client setup-opts) + + (.equals ApplicationProtocolNames/HTTP_2 protocol) + (do + (http2/setup-conn-pipeline setup-opts) + (http2-req-handler setup-opts)) + + :else + (do + (let [msg (str "Unknown protocol: " protocol) + e (IllegalStateException. msg)] + (log/error e msg) + (netty/close ch) + (throw e))))] + + ;; Both Netty and Aleph are set up, unpause the pipeline + (when (.get pipeline "pause-handler") + (log/debug "Unpausing pipeline") + (.remove pipeline "pause-handler")) + + (fn http-req-fn + [req] + (log/trace "http-req-fn fired") + (log/debug "client request:" (pr-str req)) + + ;; If :aleph/close is set in the req, closes the channel and + ;; returns a deferred containing the result. + (if (or (contains? req :aleph/close) + (contains? req ::close)) + (-> ch (netty/close) (netty/wrap-future)) + + (let [t0 (System/nanoTime) + ;; I suspect the below is an error for http1 + ;; since the shared handler might not match. + ;; Should work for HTTP2, though + raw-stream? (get req :raw-stream? raw-stream?)] + + (if (or (not (.isActive ch)) + (not (.isOpen ch))) + + (d/error-deferred + (ex-info "Channel is inactive/closed." + {:req req + :ch ch + :open? (.isOpen ch) + :active? (.isActive ch)})) + + (-> (http-req-handler req) + (d/chain' (rsp-handler + {:ch ch + :keep-alive? keep-alive? ; why not keep-alive?' + :raw-stream? raw-stream? + :req req + :response-buffer-size response-buffer-size + :t0 t0}))))))))))))] + (d/connect conn result) + (d/catch' result (fn [e] + (log/trace e "Closing HTTP connection channel") + (d/error! ch-d e) + (@close-ch!) + (d/error-deferred e))) + result)) diff --git a/src/aleph/netty.clj b/src/aleph/netty.clj index 73fc1b6e..cdf203f7 100644 --- a/src/aleph/netty.clj +++ b/src/aleph/netty.clj @@ -1521,6 +1521,14 @@ (ssl-handler ch ssl-ctx)))) (pipeline-builder p)))) +(defn- connect-client + ^ChannelFuture [^Bootstrap bootstrap + ^SocketAddress remote-address + ^SocketAddress local-address] + (if local-address + (.connect bootstrap remote-address local-address) + (.connect bootstrap remote-address))) + (defn ^:no-doc create-client-chan "Returns a deferred containing a new Channel. @@ -1529,8 +1537,8 @@ complete." [{:keys [pipeline-builder bootstrap-transform - ^SocketAddress remote-address - ^SocketAddress local-address + remote-address + local-address transport name-resolver connect-timeout] @@ -1543,32 +1551,39 @@ (throw (IllegalArgumentException. "Can't use :ssl-context anymore."))) (let [^Class chan-class (transport-channel-class transport) - initializer (pipeline-initializer pipeline-builder)] - (try - (let [client-event-loop-group @(transport-client-group transport) - resolver' (when (some? name-resolver) - (cond - (= :default name-resolver) nil - (= :noop name-resolver) NoopAddressResolverGroup/INSTANCE - (instance? AddressResolverGroup name-resolver) name-resolver)) - bootstrap (doto (Bootstrap.) - (.option ChannelOption/SO_REUSEADDR true) - (.option ChannelOption/CONNECT_TIMEOUT_MILLIS (int connect-timeout)) - #_(.option ChannelOption/MAX_MESSAGES_PER_READ Integer/MAX_VALUE) ; option deprecated, removed in v5 - (.group client-event-loop-group) - (.channel chan-class) - (.handler initializer) - (.resolver resolver') - bootstrap-transform) - - fut (if local-address - (.connect bootstrap remote-address local-address) - (.connect bootstrap remote-address))] - - (d/chain' (wrap-future fut) - (fn [_] - (let [ch (.channel ^ChannelFuture fut)] - (maybe-ssl-handshake-future ch)))))))) + initializer (pipeline-initializer pipeline-builder) + client-event-loop-group @(transport-client-group transport) + resolver' (when (some? name-resolver) + (cond + (= :default name-resolver) nil + (= :noop name-resolver) NoopAddressResolverGroup/INSTANCE + (instance? AddressResolverGroup name-resolver) name-resolver)) + bootstrap (doto (Bootstrap.) + (.option ChannelOption/SO_REUSEADDR true) + (.option ChannelOption/CONNECT_TIMEOUT_MILLIS (int connect-timeout)) + #_(.option ChannelOption/MAX_MESSAGES_PER_READ Integer/MAX_VALUE) ; option deprecated, removed in v5 + (.group client-event-loop-group) + (.channel chan-class) + (.handler initializer) + (.resolver resolver') + bootstrap-transform) + + fut (connect-client bootstrap remote-address local-address)] + (doto (-> (wrap-future fut) + (d/chain' + (fn [_] + (let [ch (.channel ^ChannelFuture fut)] + (maybe-ssl-handshake-future ch))))) + (d/catch' (fn [e] + (when-not (.isDone fut) + (log/trace e "Cancelling Bootstrap#connect future") + (when-not (.cancel fut true) + (when-not (.isDone fut) + (log/warn "Transport" transport "does not support cancellation of connection attempts." + "Instead, you have to wait for the connect timeout to expire for it to be terminated." + "Its current value is" connect-timeout "ms." + "It can be set via the `connect-timeout` option.")))) + (d/error-deferred e)))))) (defn ^:no-doc ^:deprecated create-client @@ -1732,7 +1747,7 @@ (fn [shutdown-output] (when (= shutdown-output ::timeout) (log/error - (format "Timeout while waiting for requests to close (exceeded: %ss)" + (format "Timeout while waiting for connections to close (exceeded: %ss)" shutdown-timeout))))) (d/finally' ;; 3. At this stage, stop the EventLoopGroup, this will cancel any diff --git a/src/aleph/tcp.clj b/src/aleph/tcp.clj index 1d66cb4a..8192a901 100644 --- a/src/aleph/tcp.clj +++ b/src/aleph/tcp.clj @@ -155,6 +155,11 @@ "Given a host and port, returns a deferred which yields a duplex stream that can be used to communicate with the server. + Closing the stream will also close the underlying connection. + + Putting the returned deferred into an error state before it yielded the stream will cancel an + in-flight connection attempt. + Param key | Description | --- | --- | `host` | the hostname of the server. @@ -204,13 +209,16 @@ (netty/ssl-handler (.channel pipeline) ssl-context remote-address ssl-endpoint-id-alg))) (.addLast pipeline "handler" handler) (when pipeline-transform - (pipeline-transform pipeline)))] - (-> (netty/create-client-chan - {:pipeline-builder pipeline-builder - :bootstrap-transform bootstrap-transform - :remote-address remote-address - :local-address local-address - :transport (netty/determine-transport transport epoll?) - :connect-timeout connect-timeout}) - (d/catch' #(d/error! s %))) + (pipeline-transform pipeline))) + ch-d (netty/create-client-chan + {:pipeline-builder pipeline-builder + :bootstrap-transform bootstrap-transform + :remote-address remote-address + :local-address local-address + :transport (netty/determine-transport transport epoll?) + :connect-timeout connect-timeout})] + (d/catch' ch-d #(d/error! s %)) + (d/catch' s (fn [e] + (d/error! ch-d e) + (d/error-deferred e))) s)) diff --git a/test/aleph/http_test.clj b/test/aleph/http_test.clj index b6ed9f7e..8c412552 100644 --- a/test/aleph/http_test.clj +++ b/test/aleph/http_test.clj @@ -7,7 +7,7 @@ [aleph.resource-leak-detector] [aleph.ssl :as test-ssl] [aleph.tcp :as tcp] - [aleph.testutils :refer [str=]] + [aleph.testutils :refer [passive-tcp-server str=]] [clj-commons.byte-streams :as bs] [clojure.java.io :as io] [clojure.string :as str] @@ -1451,6 +1451,34 @@ (is (instance? IllegalArgumentException result)) (is (= "use-h2c? may only be true when HTTP/2 is enabled." (ex-message result)))))) +(deftest test-request-cancellation-during-connection-acquisition + (let [starved-pool (http/connection-pool + {:total-connections 0})] + (try + (let [rsp (http-get "/" {:pool starved-pool + :pool-timeout 500})] + (http/cancel-request! rsp) + (is (thrown? RequestCancellationException (deref rsp 0 :timeout)))) + (finally + (.shutdown ^Pool starved-pool))))) + +(deftest test-request-cancellation-during-connection-establishment + (let [connect-client @#'aleph.netty/connect-client + connect-future (promise)] + (with-redefs [aleph.netty/connect-client (fn [& args] + (let [fut (apply connect-client args)] + (deliver connect-future fut) + fut))] + (with-server (passive-tcp-server port) + (let [rsp (http-get "/")] + (is (some? (deref connect-future 1000 nil))) + (http/cancel-request! rsp) + (is (thrown? RequestCancellationException (deref rsp 1000 :timeout))) + (some-> @connect-future (.await 2000 TimeUnit/MILLISECONDS)) + (is (some-> @connect-future .isSuccess false?)) + (is (some-> @connect-future .isDone)) + (is (some-> @connect-future .isCancelled))))))) + (deftest test-in-flight-request-cancellation (let [conn-established (promise) conn-closed (promise)] diff --git a/test/aleph/tcp_test.clj b/test/aleph/tcp_test.clj index 11e26881..875c3470 100644 --- a/test/aleph/tcp_test.clj +++ b/test/aleph/tcp_test.clj @@ -3,9 +3,13 @@ [aleph.netty :as netty] [aleph.resource-leak-detector] [aleph.tcp :as tcp] + [aleph.testutils :refer [passive-tcp-server]] [clj-commons.byte-streams :as bs] - [clojure.test :refer [deftest testing is]] - [manifold.stream :as s])) + [clojure.test :refer [deftest is testing]] + [manifold.deferred :as d] + [manifold.stream :as s]) + (:import + (java.util.concurrent TimeUnit))) (defn echo-handler [s _] (s/connect s s)) @@ -55,4 +59,22 @@ (catch Exception _ (is (not (netty/io-uring-available?))))))) +(deftest test-cancellation-during-connection-establishment + (let [connect-client @#'aleph.netty/connect-client + connect-future (promise) + server (passive-tcp-server 0)] + (with-redefs [aleph.netty/connect-client (fn [& args] + (let [fut (apply connect-client args)] + (deliver connect-future fut) + fut))] + (with-server server + (let [c (tcp/client {:host "localhost" + :port (netty/port server)})] + (is (some? (deref connect-future 1000 nil))) + (d/timeout! c 10) + (some-> @connect-future (.await 2000 TimeUnit/MILLISECONDS)) + (is (some-> @connect-future .isSuccess false?)) + (is (some-> @connect-future .isDone)) + (is (some-> @connect-future .isCancelled))))))) + (aleph.resource-leak-detector/instrument-tests!) diff --git a/test/aleph/testutils.clj b/test/aleph/testutils.clj index f225c2c0..c0059c8c 100644 --- a/test/aleph/testutils.clj +++ b/test/aleph/testutils.clj @@ -1,8 +1,36 @@ (ns aleph.testutils - (:import (io.netty.util AsciiString))) + (:require + [aleph.netty :as netty]) + (:import + (io.netty.util AsciiString) + (java.io Closeable) + (java.net ServerSocket Socket))) (defn str= "AsciiString-aware equals" [^CharSequence x ^CharSequence y] (AsciiString/contentEquals x y)) +(defn passive-tcp-server + "Starts a TCP server which never accepts a connection." + [port] + (let [;; A backlog of 0 would be ideal for this purpose but: "The value provided should be greater + ;; than 0. If it is less than or equal to 0, then an implementation specific default will be + ;; used." Source: + ;; https://docs.oracle.com/en%2Fjava%2Fjavase%2F21%2Fdocs%2Fapi%2F%2F/java.base/java/net/ServerSocket.html#%3Cinit%3E(int,int) + backlog 1 + server (ServerSocket. port backlog) + port (.getLocalPort server) + ;; Fill up the backlog with pending connection attempts. For some reason, the backlog length + ;; is off by one, thus the `inc`. + pending-connects (doall (repeatedly (inc backlog) #(Socket. "localhost" (int port))))] + (reify + netty/AlephServer + (port [_] + port) + (wait-for-close [_] + true) + Closeable + (close [_] + (run! #(.close %) pending-connects) + (.close server))))) From 8e1edd93e2ca0dffdbc2ace7a65744b8945f4c58 Mon Sep 17 00:00:00 2001 From: Moritz Heidkamp Date: Thu, 11 Apr 2024 16:29:24 +0200 Subject: [PATCH 2/8] fixup! Support cancellation during client connection establishment --- src/aleph/http.clj | 16 ++++++++++------ src/aleph/http/client.clj | 12 ++++++------ src/aleph/netty.clj | 30 +++++++++++++++--------------- src/aleph/tcp.clj | 7 ++----- 4 files changed, 33 insertions(+), 32 deletions(-) diff --git a/src/aleph/http.clj b/src/aleph/http.clj index 7a471844..47bfd671 100644 --- a/src/aleph/http.clj +++ b/src/aleph/http.clj @@ -112,11 +112,11 @@ (if on-closed (assoc options :on-closed on-closed) options))] - (doto (d/chain' conn middleware) - (d/catch' (fn [e] - (log/trace e "Terminating creation of HTTP connection") - (d/error! conn e) - (d/error-deferred e)))))) + (-> (d/chain' conn middleware) + (d/on-realized identity + (fn [e] + (log/trace e "Terminating creation of HTTP connection") + (d/error! conn e)))))) (def ^:private connection-stats-callbacks (atom #{})) @@ -393,7 +393,11 @@ (reset! dispose-conn! (fn [] (flow/dispose pool k conn))) ;; allow cancellation during connection establishment - (d/connect result (first conn)) + (d/on-realized result + identity + (fn [e] + (log/trace e "Aborting connection acquisition") + (d/error! (first conn) e))) (if (realized? result) ;; to account for race condition between setting `dispose-conn!` diff --git a/src/aleph/http/client.clj b/src/aleph/http/client.clj index 2e08f641..af9dc674 100644 --- a/src/aleph/http/client.clj +++ b/src/aleph/http/client.clj @@ -935,12 +935,12 @@ :response-buffer-size response-buffer-size :t0 t0}))))))))))))] (d/connect conn result) - (d/catch' result (fn [e] - (log/trace e "Closing HTTP connection channel") - (d/error! ch-d e) - (@close-ch!) - (d/error-deferred e))) - result)) + (d/on-realized result + identity + (fn [e] + (log/trace e "Closing HTTP connection channel") + (d/error! ch-d e) + (@close-ch!))))) diff --git a/src/aleph/netty.clj b/src/aleph/netty.clj index cdf203f7..54a0dc7c 100644 --- a/src/aleph/netty.clj +++ b/src/aleph/netty.clj @@ -1569,21 +1569,21 @@ bootstrap-transform) fut (connect-client bootstrap remote-address local-address)] - (doto (-> (wrap-future fut) - (d/chain' - (fn [_] - (let [ch (.channel ^ChannelFuture fut)] - (maybe-ssl-handshake-future ch))))) - (d/catch' (fn [e] - (when-not (.isDone fut) - (log/trace e "Cancelling Bootstrap#connect future") - (when-not (.cancel fut true) - (when-not (.isDone fut) - (log/warn "Transport" transport "does not support cancellation of connection attempts." - "Instead, you have to wait for the connect timeout to expire for it to be terminated." - "Its current value is" connect-timeout "ms." - "It can be set via the `connect-timeout` option.")))) - (d/error-deferred e)))))) + (-> (wrap-future fut) + (d/chain' + (fn [_] + (let [ch (.channel ^ChannelFuture fut)] + (maybe-ssl-handshake-future ch)))) + (d/on-realized identity + (fn [e] + (when-not (.isDone fut) + (log/trace e "Cancelling Bootstrap#connect future") + (when-not (.cancel fut true) + (when-not (.isDone fut) + (log/warn "Transport" transport "does not support cancellation of connection attempts." + "Instead, you have to wait for the connect timeout to expire for it to be terminated." + "Its current value is" connect-timeout "ms." + "It can be set via the `connect-timeout` option."))))))))) (defn ^:no-doc ^:deprecated create-client diff --git a/src/aleph/tcp.clj b/src/aleph/tcp.clj index 8192a901..3d293c0e 100644 --- a/src/aleph/tcp.clj +++ b/src/aleph/tcp.clj @@ -217,8 +217,5 @@ :local-address local-address :transport (netty/determine-transport transport epoll?) :connect-timeout connect-timeout})] - (d/catch' ch-d #(d/error! s %)) - (d/catch' s (fn [e] - (d/error! ch-d e) - (d/error-deferred e))) - s)) + (d/on-realized ch-d identity #(d/error! s %)) + (d/on-realized s identity #(d/error! ch-d %)))) From ad5886118754e8a8305e321589358363f0fe16e7 Mon Sep 17 00:00:00 2001 From: Moritz Heidkamp Date: Thu, 18 Apr 2024 16:58:24 +0200 Subject: [PATCH 3/8] fixup! fixup! Support cancellation during client connection establishment --- src/aleph/http/client.clj | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/src/aleph/http/client.clj b/src/aleph/http/client.clj index af9dc674..34bcb49d 100644 --- a/src/aleph/http/client.clj +++ b/src/aleph/http/client.clj @@ -814,16 +814,15 @@ :logger logger :pipeline-transform pipeline-transform)) - ch-d (netty/create-client-chan - {:pipeline-builder pipeline-builder - :bootstrap-transform bootstrap-transform - :remote-address remote-address - :local-address local-address - :transport (netty/determine-transport transport epoll?) - :name-resolver name-resolver - :connect-timeout connect-timeout}) - - _ (attach-on-close-handler ch-d on-closed) + ch-d (doto (netty/create-client-chan + {:pipeline-builder pipeline-builder + :bootstrap-transform bootstrap-transform + :remote-address remote-address + :local-address local-address + :transport (netty/determine-transport transport epoll?) + :name-resolver name-resolver + :connect-timeout connect-timeout}) + (attach-on-close-handler on-closed)) close-ch! (atom (fn [])) result (d/deferred) From 568847c841bff74cf1f0d1ce25a0d531bfef552f Mon Sep 17 00:00:00 2001 From: Moritz Heidkamp Date: Thu, 18 Apr 2024 17:00:40 +0200 Subject: [PATCH 4/8] fixup! fixup! fixup! Support cancellation during client connection establishment --- src/aleph/http.clj | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/aleph/http.clj b/src/aleph/http.clj index 47bfd671..d35edd00 100644 --- a/src/aleph/http.clj +++ b/src/aleph/http.clj @@ -466,11 +466,11 @@ (middleware/handle-redirects request req)))))))))))) req))] (d/connect response result) - (d/catch' result - (fn [e] - (log/trace e "Request failed. Disposing of connection...") - (@dispose-conn!) - (d/error-deferred e))) + (d/on-realized result + identity + (fn [e] + (log/trace e "Request failed. Disposing of connection...") + (@dispose-conn!))) result))) (defn cancel-request! From d14b8469d35bb00da37c04d7a7e551016ed3cd39 Mon Sep 17 00:00:00 2001 From: Moritz Heidkamp Date: Fri, 19 Apr 2024 15:54:56 +0200 Subject: [PATCH 5/8] Introduce utils for making error propagation more self-describing The new namespace `aleph.util` introduces two new helpers: * `on-error` which is like `d/on-realized` but only for the error case (success case uses `identity`). * `propagate-error` which propagates error states from a source deferred to a destination deferred and optinally accepts a callback which is only run when the propagation has indeed occurred. These are now used in all places where we propagate error states back to upstream deferreds for the purpose of cancellation which hopefully makes it a bit more obvious what's going on. --- src/aleph/http.clj | 20 ++++++++--------- src/aleph/http/client.clj | 12 +++++------ src/aleph/netty.clj | 4 ++-- src/aleph/tcp.clj | 7 ++++-- src/aleph/util.clj | 20 +++++++++++++++++ test/aleph/util_test.clj | 45 +++++++++++++++++++++++++++++++++++++++ 6 files changed, 87 insertions(+), 21 deletions(-) create mode 100644 src/aleph/util.clj create mode 100644 test/aleph/util_test.clj diff --git a/src/aleph/http.clj b/src/aleph/http.clj index d35edd00..26f95acc 100644 --- a/src/aleph/http.clj +++ b/src/aleph/http.clj @@ -11,6 +11,7 @@ [aleph.http.websocket.common :as ws.common] [aleph.http.websocket.server :as ws.server] [aleph.netty :as netty] + [aleph.util :as util] [clojure.string :as str] [clojure.tools.logging :as log] [manifold.deferred :as d] @@ -113,10 +114,9 @@ (assoc options :on-closed on-closed) options))] (-> (d/chain' conn middleware) - (d/on-realized identity - (fn [e] - (log/trace e "Terminating creation of HTTP connection") - (d/error! conn e)))))) + (util/propagate-error conn + (fn [e] + (log/trace e "Terminated creation of HTTP connection")))))) (def ^:private connection-stats-callbacks (atom #{})) @@ -393,11 +393,10 @@ (reset! dispose-conn! (fn [] (flow/dispose pool k conn))) ;; allow cancellation during connection establishment - (d/on-realized result - identity - (fn [e] - (log/trace e "Aborting connection acquisition") - (d/error! (first conn) e))) + (util/propagate-error result + (first conn) + (fn [e] + (log/trace e "Aborted connection acquisition"))) (if (realized? result) ;; to account for race condition between setting `dispose-conn!` @@ -466,8 +465,7 @@ (middleware/handle-redirects request req)))))))))))) req))] (d/connect response result) - (d/on-realized result - identity + (util/on-error result (fn [e] (log/trace e "Request failed. Disposing of connection...") (@dispose-conn!))) diff --git a/src/aleph/http/client.clj b/src/aleph/http/client.clj index 34bcb49d..468f2fea 100644 --- a/src/aleph/http/client.clj +++ b/src/aleph/http/client.clj @@ -6,6 +6,7 @@ [aleph.http.multipart :as multipart] [aleph.http.websocket.client :as ws.client] [aleph.netty :as netty] + [aleph.util :as util] [clj-commons.byte-streams :as bs] [clojure.tools.logging :as log] [manifold.deferred :as d] @@ -934,12 +935,11 @@ :response-buffer-size response-buffer-size :t0 t0}))))))))))))] (d/connect conn result) - (d/on-realized result - identity - (fn [e] - (log/trace e "Closing HTTP connection channel") - (d/error! ch-d e) - (@close-ch!))))) + (util/propagate-error result + ch-d + (fn [e] + (log/trace e "Closing HTTP connection channel") + (@close-ch!))))) diff --git a/src/aleph/netty.clj b/src/aleph/netty.clj index 54a0dc7c..be372792 100644 --- a/src/aleph/netty.clj +++ b/src/aleph/netty.clj @@ -1,6 +1,7 @@ (ns aleph.netty (:refer-clojure :exclude [flush]) (:require + [aleph.util :as util] [clj-commons.byte-streams :as bs] [clj-commons.primitive-math :as p] [clojure.string :as str] @@ -1574,8 +1575,7 @@ (fn [_] (let [ch (.channel ^ChannelFuture fut)] (maybe-ssl-handshake-future ch)))) - (d/on-realized identity - (fn [e] + (util/on-error (fn [e] (when-not (.isDone fut) (log/trace e "Cancelling Bootstrap#connect future") (when-not (.cancel fut true) diff --git a/src/aleph/tcp.clj b/src/aleph/tcp.clj index 3d293c0e..b71a009c 100644 --- a/src/aleph/tcp.clj +++ b/src/aleph/tcp.clj @@ -1,6 +1,7 @@ (ns aleph.tcp (:require [aleph.netty :as netty] + [aleph.util :as util] [clojure.tools.logging :as log] [manifold.deferred :as d] [manifold.stream :as s] @@ -217,5 +218,7 @@ :local-address local-address :transport (netty/determine-transport transport epoll?) :connect-timeout connect-timeout})] - (d/on-realized ch-d identity #(d/error! s %)) - (d/on-realized s identity #(d/error! ch-d %)))) + (util/propagate-error s + ch-d + (fn [e] + (log/trace e "Closed TCP client channel"))))) diff --git a/src/aleph/util.clj b/src/aleph/util.clj new file mode 100644 index 00000000..f0c1db09 --- /dev/null +++ b/src/aleph/util.clj @@ -0,0 +1,20 @@ +(ns aleph.util + (:require [manifold.deferred :as d])) + +(defn on-error + [d f] + (d/on-realized d identity f)) + +(defn propagate-error + "Registers an error callback with d which will attempt to propagate the error to destination. + + If the error was propagated (i.e. destination wasn't yet realized), on-propagate is invoked with + the error value. + + Returns d." + ([d destination] + (propagate-error d destination identity)) + ([d destination on-propagate] + (on-error d (fn [e] + (when (d/error! destination e) + (on-propagate e)))))) diff --git a/test/aleph/util_test.clj b/test/aleph/util_test.clj new file mode 100644 index 00000000..17f2634e --- /dev/null +++ b/test/aleph/util_test.clj @@ -0,0 +1,45 @@ +(ns aleph.util-test + (:require [aleph.util :as util] + [clojure.test :refer [deftest is testing]] + [manifold.deferred :as d])) + +(deftest test-propagate-error + (testing "Happy path" + (let [src (d/deferred) + dst (d/deferred) + prp (promise)] + (util/propagate-error src dst (fn [e] (deliver prp e))) + (d/error! src :boom) + (is (d/realized? dst)) + (is (= :boom (d/error-value dst nil))) + (is (= :boom (deref prp 0 nil))))) + + (testing "Without on-propagate" + (let [src (d/deferred) + dst (d/deferred)] + (util/propagate-error src dst) + (d/error! src :boom) + (is (d/realized? dst)))) + + (testing "Exception in on-propagate" + (let [src (d/deferred) + dst (d/deferred)] + (util/propagate-error src dst (fn [_] (throw (RuntimeException. "Oops")))) + (d/error! src :boom) + (is (d/realized? dst)) + (is (= :boom (d/error-value dst nil))))) + + (testing "Already realized destination" + (let [src (d/deferred) + dst (d/success-deferred :ok)] + (util/propagate-error src dst) + (d/error! src :boom) + (is (d/realized? dst)) + (is (= nil (d/error-value dst nil))))) + + (testing "Successfully realized source" + (let [src (d/deferred) + dst (d/deferred)] + (util/propagate-error src dst) + (d/success! src :ok) + (is (not (d/realized? dst)))))) From d2233c649e1e204977da133ba1c4acfd3c91a2b0 Mon Sep 17 00:00:00 2001 From: Moritz Heidkamp Date: Fri, 19 Apr 2024 15:58:50 +0200 Subject: [PATCH 6/8] Fix test-in-flight-request-cancellation test See code comment for background --- test/aleph/http_test.clj | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/test/aleph/http_test.clj b/test/aleph/http_test.clj index 8c412552..221dd9da 100644 --- a/test/aleph/http_test.clj +++ b/test/aleph/http_test.clj @@ -1480,17 +1480,21 @@ (is (some-> @connect-future .isCancelled))))))) (deftest test-in-flight-request-cancellation - (let [conn-established (promise) - conn-closed (promise)] + (let [conn-established (atom nil) + conn-closed (atom nil)] (with-raw-handler (fn [req] - (deliver conn-established true) + (deliver @conn-established true) (s/on-closed (:body req) (fn [] - (deliver conn-closed true)))) + (deliver @conn-closed true)))) + ;; NOTE: The atom indirection here is needed because `with-raw-handler` will run the body + ;; twice (for HTTP1 and HTTP2), so we need a new promise for each run. + (reset! conn-established (promise)) + (reset! conn-closed (promise)) (let [rsp (http-get "/")] - (is (= true (deref conn-established 1000 :timeout))) + (is (= true (deref @conn-established 1000 :timeout))) (http/cancel-request! rsp) - (is (= true (deref conn-closed 1000 :timeout))) + (is (= true (deref @conn-closed 1000 :timeout))) (is (thrown? RequestCancellationException (deref rsp 1000 :timeout))))))) (deftest ^:leak test-leak-in-raw-stream-handler From ef28e8065ac27d289748eee016ed6b00cb40a42f Mon Sep 17 00:00:00 2001 From: Moritz Heidkamp Date: Fri, 19 Apr 2024 16:04:41 +0200 Subject: [PATCH 7/8] fixup! Introduce utils for making error propagation more self-describing --- src/aleph/tcp.clj | 1 + 1 file changed, 1 insertion(+) diff --git a/src/aleph/tcp.clj b/src/aleph/tcp.clj index b71a009c..d1778cee 100644 --- a/src/aleph/tcp.clj +++ b/src/aleph/tcp.clj @@ -218,6 +218,7 @@ :local-address local-address :transport (netty/determine-transport transport epoll?) :connect-timeout connect-timeout})] + (util/propagate-error ch-d s) (util/propagate-error s ch-d (fn [e] From 11e9eefdf86a9a060a1e7113dd0b9157aea0a849 Mon Sep 17 00:00:00 2001 From: Moritz Heidkamp Date: Fri, 19 Apr 2024 16:21:29 +0200 Subject: [PATCH 8/8] fixup! Introduce utils for making error propagation more self-describing --- src/aleph/util.clj | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/aleph/util.clj b/src/aleph/util.clj index f0c1db09..e8b64d27 100644 --- a/src/aleph/util.clj +++ b/src/aleph/util.clj @@ -6,15 +6,15 @@ (d/on-realized d identity f)) (defn propagate-error - "Registers an error callback with d which will attempt to propagate the error to destination. + "Registers an error callback with source which will attempt to propagate the error to destination. If the error was propagated (i.e. destination wasn't yet realized), on-propagate is invoked with the error value. - Returns d." - ([d destination] - (propagate-error d destination identity)) - ([d destination on-propagate] - (on-error d (fn [e] - (when (d/error! destination e) - (on-propagate e)))))) + Returns source." + ([source destination] + (propagate-error source destination identity)) + ([source destination on-propagate] + (on-error source (fn [e] + (when (d/error! destination e) + (on-propagate e))))))