From f7508349cea04cc514580af15d562de16f6fe895 Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Sat, 14 Sep 2024 12:12:23 +1200 Subject: [PATCH 1/5] Initial streaming tests. --- test/async/http/protocol/closable.rb | 124 +++++++++++++++++++++++++++ 1 file changed, 124 insertions(+) create mode 100644 test/async/http/protocol/closable.rb diff --git a/test/async/http/protocol/closable.rb b/test/async/http/protocol/closable.rb new file mode 100644 index 0000000..85c928f --- /dev/null +++ b/test/async/http/protocol/closable.rb @@ -0,0 +1,124 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2024, by Samuel Williams. + +require "async/http/protocol/http" +require "protocol/http/body/streamable" +require "sus/fixtures/async/http" + +AnEchoServer = Sus::Shared("an echo server") do + let(:app) do + ::Protocol::HTTP::Middleware.for do |request| + body = ::Protocol::HTTP::Body::Streamable.response(request) do |stream| + # $stderr.puts "Server stream: #{stream.inspect}" + + while chunk = stream.readpartial(1024) + # $stderr.puts "Server reading chunk: #{chunk.inspect}" + stream.write(chunk) + end + rescue EOFError + # Ignore. + ensure + # $stderr.puts "Server closing stream." + stream.close + end + + ::Protocol::HTTP::Response[200, {}, body] + end + end + + it "should echo the request body" do + chunks = ["Hello,", "World!"] + response_chunks = Queue.new + + body = ::Protocol::HTTP::Body::Streamable.request do |stream| + # $stderr.puts "Client stream: #{stream.inspect}" + + chunks.each do |chunk| + # $stderr.puts "Client writing chunk: #{chunk.inspect}" + stream.write(chunk) + end + + # $stderr.puts "Client closing write." + stream.close_write + + # $stderr.puts "Client reading chunks..." + while chunk = stream.readpartial(1024) + # $stderr.puts "Client reading chunk: #{chunk.inspect}" + response_chunks << chunk + end + rescue EOFError + # Ignore. + ensure + # $stderr.puts "Client closing stream." + stream.close + response_chunks.close + end + + response = client.post("/", body: body) + body.stream(response.body) + + chunks.each do |chunk| + expect(response_chunks.pop).to be == chunk + end + end +end + +AnEchoClient = Sus::Shared("an echo client") do + let(:chunks) {["Hello,", "World!"]} + let(:response_chunks) {Queue.new} + + let(:app) do + ::Protocol::HTTP::Middleware.for do |request| + body = ::Protocol::HTTP::Body::Streamable.response(request) do |stream| + chunks.each do |chunk| + stream.write(chunk) + end + + stream.close_write + + while chunk = stream.readpartial(1024) + response_chunks << chunk + end + rescue EOFError + # Ignore. + ensure + # $stderr.puts "Server closing stream." + stream.close + end + + ::Protocol::HTTP::Response[200, {}, body] + end + end + + it "should echo the response body" do + body = ::Protocol::HTTP::Body::Streamable.request do |stream| + while chunk = stream.readpartial(1024) + stream.write(chunk) + end + rescue EOFError + # Ignore. + ensure + stream.close + end + + response = client.post("/", body: body) + body.stream(response.body) + + chunks.each do |chunk| + expect(response_chunks.pop).to be == chunk + end + end +end + +[Async::HTTP::Protocol::HTTP1].each do |protocol| + describe protocol do + include Sus::Fixtures::Async::HTTP::ServerContext + + let(:protocol) {subject} + + it_behaves_like AnEchoServer + # it_behaves_like AnEchoClient + end +end From 97382e18f8ac6b101c8f69a37e249895665220ec Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Mon, 16 Sep 2024 11:27:27 +1200 Subject: [PATCH 2/5] Wait for input to be consumed before continuing. --- lib/async/http/body/finishable.rb | 39 +++++++ lib/async/http/client.rb | 3 + lib/async/http/protocol/http1/server.rb | 14 ++- test/async/http/protocol/streaming.rb | 142 ++++++++++++++++++++++++ 4 files changed, 194 insertions(+), 4 deletions(-) create mode 100644 lib/async/http/body/finishable.rb create mode 100644 test/async/http/protocol/streaming.rb diff --git a/lib/async/http/body/finishable.rb b/lib/async/http/body/finishable.rb new file mode 100644 index 0000000..cfdf996 --- /dev/null +++ b/lib/async/http/body/finishable.rb @@ -0,0 +1,39 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2019-2023, by Samuel Williams. + +require 'protocol/http/body/wrapper' +require 'async/variable' + +module Async + module HTTP + module Body + class Finishable < ::Protocol::HTTP::Body::Wrapper + def initialize(body) + super(body) + + @closed = Async::Variable.new + @error = nil + end + + def close(error = nil) + unless @closed.resolved? + @error = error + @closed.value = true + end + + super + end + + def wait + @closed.wait + end + + def inspect + "#<#{self.class} closed=#{@closed} error=#{@error}> | #{super}" + end + end + end + end +end diff --git a/lib/async/http/client.rb b/lib/async/http/client.rb index bbaeb18..53468a7 100755 --- a/lib/async/http/client.rb +++ b/lib/async/http/client.rb @@ -188,6 +188,9 @@ def make_response(request, connection) # The connection won't be released until the body is completely read/released. ::Protocol::HTTP::Body::Completable.wrap(response) do + # TODO: We should probably wait until the request is fully consumed and/or the connection is ready before releasing it back into the pool. + + # Release the connection back into the pool: @pool.release(connection) end diff --git a/lib/async/http/protocol/http1/server.rb b/lib/async/http/protocol/http1/server.rb index 51d32ca..71c1c51 100644 --- a/lib/async/http/protocol/http1/server.rb +++ b/lib/async/http/protocol/http1/server.rb @@ -46,6 +46,11 @@ def each(task: Task.current) task.annotate("Reading #{self.version} requests for #{self.class}.") while request = next_request + if body = request.body + finishable = Body::Finishable.new(body) + request.body = finishable + end + response = yield(request, self) version = request.version body = response&.body @@ -102,23 +107,24 @@ def each(task: Task.current) head = request.head? # Same as above: - request = nil unless request.body + request = nil response = nil write_body(version, body, head, trailer) end end - # We are done with the body, you shouldn't need to call close on it: + # We are done with the body: body = nil else # If the request failed to generate a response, it was an internal server error: write_response(@version, 500, {}) write_body(version, nil) + + request&.finish end - # Gracefully finish reading the request body if it was not already done so. - request&.each{} + finishable&.wait # This ensures we yield at least once every iteration of the loop and allow other fibers to execute. task.yield diff --git a/test/async/http/protocol/streaming.rb b/test/async/http/protocol/streaming.rb new file mode 100644 index 0000000..1b85715 --- /dev/null +++ b/test/async/http/protocol/streaming.rb @@ -0,0 +1,142 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2024, by Samuel Williams. + +require "async/http/protocol/http" +require "protocol/http/body/streamable" +require "sus/fixtures/async/http" + +AnEchoServer = Sus::Shared("an echo server") do + let(:app) do + ::Protocol::HTTP::Middleware.for do |request| + output = ::Protocol::HTTP::Body::Writable.new + + Async do + stream = ::Protocol::HTTP::Body::Stream.new(request.body, output) + + Console.debug(self, "Echoing chunks...") + while chunk = stream.readpartial(1024) + Console.debug(self, "Reading chunk:", chunk: chunk) + stream.write(chunk) + end + rescue EOFError + Console.debug(self, "EOF.") + # Ignore. + ensure + Console.debug(self, "Closing stream.") + stream.close + end + + ::Protocol::HTTP::Response[200, {}, output] + end + end + + it "should echo the request body" do + chunks = ["Hello,", "World!"] + response_chunks = Queue.new + + output = ::Protocol::HTTP::Body::Writable.new + response = client.post("/", body: output) + stream = ::Protocol::HTTP::Body::Stream.new(response.body, output) + + begin + Console.debug(self, "Echoing chunks...") + chunks.each do |chunk| + Console.debug(self, "Writing chunk:", chunk: chunk) + stream.write(chunk) + end + + Console.debug(self, "Closing write.") + stream.close_write + + Console.debug(self, "Reading chunks...") + while chunk = stream.readpartial(1024) + Console.debug(self, "Reading chunk:", chunk: chunk) + response_chunks << chunk + end + rescue EOFError + Console.debug(self, "EOF.") + # Ignore. + ensure + Console.debug(self, "Closing stream.") + stream.close + response_chunks.close + end + + chunks.each do |chunk| + expect(response_chunks.pop).to be == chunk + end + end +end + +AnEchoClient = Sus::Shared("an echo client") do + let(:chunks) {["Hello,", "World!"]} + let(:response_chunks) {Queue.new} + + let(:app) do + ::Protocol::HTTP::Middleware.for do |request| + output = ::Protocol::HTTP::Body::Writable.new + + Async do + stream = ::Protocol::HTTP::Body::Stream.new(request.body, output) + + Console.debug(self, "Echoing chunks...") + chunks.each do |chunk| + stream.write(chunk) + end + + Console.debug(self, "Closing write.") + stream.close_write + + Console.debug(self, "Reading chunks...") + while chunk = stream.readpartial(1024) + Console.debug(self, "Reading chunk:", chunk: chunk) + response_chunks << chunk + end + rescue EOFError + Console.debug(self, "EOF.") + # Ignore. + ensure + Console.debug(self, "Closing stream.") + stream.close + end + + ::Protocol::HTTP::Response[200, {}, output] + end + end + + it "should echo the response body" do + output = ::Protocol::HTTP::Body::Writable.new + response = client.post("/", body: output) + stream = ::Protocol::HTTP::Body::Stream.new(response.body, output) + + begin + Console.debug(self, "Echoing chunks...") + while chunk = stream.readpartial(1024) + stream.write(chunk) + end + rescue EOFError + Console.debug(self, "EOF.") + # Ignore. + ensure + Console.debug(self, "Closing stream.") + stream.close + end + + chunks.each do |chunk| + expect(response_chunks.pop).to be == chunk + end + end +end + +[Async::HTTP::Protocol::HTTP1].each do |protocol| + describe protocol do + include Sus::Fixtures::Async::HTTP::ServerContext + + let(:protocol) {subject} + + it_behaves_like AnEchoServer + it_behaves_like AnEchoClient + end +end From d04e3dc334a279456407572b39e60a72cbdb11f1 Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Mon, 16 Sep 2024 11:55:05 +1200 Subject: [PATCH 3/5] Minor fixes. --- lib/async/http/protocol/http1/server.rb | 2 ++ test/async/http/middleware/location_redirector.rb | 2 ++ 2 files changed, 4 insertions(+) diff --git a/lib/async/http/protocol/http1/server.rb b/lib/async/http/protocol/http1/server.rb index 71c1c51..2451363 100644 --- a/lib/async/http/protocol/http1/server.rb +++ b/lib/async/http/protocol/http1/server.rb @@ -7,6 +7,8 @@ # Copyright, 2024, by Anton Zhuravsky. require_relative 'connection' +require_relative '../../body/finishable' + require 'console/event/failure' module Async diff --git a/test/async/http/middleware/location_redirector.rb b/test/async/http/middleware/location_redirector.rb index 1efb8ee..9a950b9 100644 --- a/test/async/http/middleware/location_redirector.rb +++ b/test/async/http/middleware/location_redirector.rb @@ -18,6 +18,8 @@ with '301' do let(:app) do Protocol::HTTP::Middleware.for do |request| + request.finish # TODO: request.discard - or some default handling? + case request.path when '/home' Protocol::HTTP::Response[301, {'location' => '/'}, []] From 9b1c6b870f0dee1092c5a24361cb35330f74b0e3 Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Mon, 16 Sep 2024 22:44:29 +1200 Subject: [PATCH 4/5] Update tests. --- .../http/body/stream.rb} | 4 +- .../http/body/streamable.rb} | 64 +++++++++++-------- 2 files changed, 39 insertions(+), 29 deletions(-) rename test/{async/http/protocol/streaming.rb => protocol/http/body/stream.rb} (96%) rename test/{async/http/protocol/closable.rb => protocol/http/body/streamable.rb} (52%) diff --git a/test/async/http/protocol/streaming.rb b/test/protocol/http/body/stream.rb similarity index 96% rename from test/async/http/protocol/streaming.rb rename to test/protocol/http/body/stream.rb index 1b85715..430a1ee 100644 --- a/test/async/http/protocol/streaming.rb +++ b/test/protocol/http/body/stream.rb @@ -130,8 +130,8 @@ end end -[Async::HTTP::Protocol::HTTP1].each do |protocol| - describe protocol do +[Async::HTTP::Protocol::HTTP1, Async::HTTP::Protocol::HTTP2].each do |protocol| + describe protocol, unique: protocol.name do include Sus::Fixtures::Async::HTTP::ServerContext let(:protocol) {subject} diff --git a/test/async/http/protocol/closable.rb b/test/protocol/http/body/streamable.rb similarity index 52% rename from test/async/http/protocol/closable.rb rename to test/protocol/http/body/streamable.rb index 85c928f..17f2cef 100644 --- a/test/async/http/protocol/closable.rb +++ b/test/protocol/http/body/streamable.rb @@ -10,21 +10,21 @@ AnEchoServer = Sus::Shared("an echo server") do let(:app) do ::Protocol::HTTP::Middleware.for do |request| - body = ::Protocol::HTTP::Body::Streamable.response(request) do |stream| - # $stderr.puts "Server stream: #{stream.inspect}" - + streamable = ::Protocol::HTTP::Body::Streamable.response(request) do |stream| + Console.debug(self, "Echoing chunks...") while chunk = stream.readpartial(1024) - # $stderr.puts "Server reading chunk: #{chunk.inspect}" + Console.debug(self, "Reading chunk:", chunk: chunk) stream.write(chunk) end rescue EOFError + Console.debug(self, "EOF.") # Ignore. ensure - # $stderr.puts "Server closing stream." + Console.debug(self, "Closing stream.") stream.close end - ::Protocol::HTTP::Response[200, {}, body] + ::Protocol::HTTP::Response[200, {}, streamable] end end @@ -32,33 +32,34 @@ chunks = ["Hello,", "World!"] response_chunks = Queue.new - body = ::Protocol::HTTP::Body::Streamable.request do |stream| - # $stderr.puts "Client stream: #{stream.inspect}" - + output = ::Protocol::HTTP::Body::Writable.new + response = client.post("/", body: output) + stream = ::Protocol::HTTP::Body::Stream.new(response.body, output) + + begin + Console.debug(self, "Echoing chunks...") chunks.each do |chunk| - # $stderr.puts "Client writing chunk: #{chunk.inspect}" + Console.debug(self, "Writing chunk:", chunk: chunk) stream.write(chunk) end - # $stderr.puts "Client closing write." + Console.debug(self, "Closing write.") stream.close_write - # $stderr.puts "Client reading chunks..." + Console.debug(self, "Reading chunks...") while chunk = stream.readpartial(1024) - # $stderr.puts "Client reading chunk: #{chunk.inspect}" + Console.debug(self, "Reading chunk:", chunk: chunk) response_chunks << chunk end rescue EOFError + Console.debug(self, "EOF.") # Ignore. ensure - # $stderr.puts "Client closing stream." + Console.debug(self, "Closing stream.") stream.close response_chunks.close end - response = client.post("/", body: body) - body.stream(response.body) - chunks.each do |chunk| expect(response_chunks.pop).to be == chunk end @@ -71,54 +72,63 @@ let(:app) do ::Protocol::HTTP::Middleware.for do |request| - body = ::Protocol::HTTP::Body::Streamable.response(request) do |stream| + streamable = ::Protocol::HTTP::Body::Streamable.response(request) do |stream| + Console.debug(self, "Echoing chunks...") chunks.each do |chunk| stream.write(chunk) end + Console.debug(self, "Closing write.") stream.close_write + Console.debug(self, "Reading chunks...") while chunk = stream.readpartial(1024) + Console.debug(self, "Reading chunk:", chunk: chunk) response_chunks << chunk end rescue EOFError + Console.debug(self, "EOF.") # Ignore. ensure - # $stderr.puts "Server closing stream." + Console.debug(self, "Closing stream.") stream.close end - ::Protocol::HTTP::Response[200, {}, body] + ::Protocol::HTTP::Response[200, {}, streamable] end end it "should echo the response body" do - body = ::Protocol::HTTP::Body::Streamable.request do |stream| + output = ::Protocol::HTTP::Body::Writable.new + response = client.post("/", body: output) + stream = ::Protocol::HTTP::Body::Stream.new(response.body, output) + + begin + Console.debug(self, "Echoing chunks...") while chunk = stream.readpartial(1024) stream.write(chunk) end rescue EOFError + Console.debug(self, "EOF.") # Ignore. ensure + Console.debug(self, "Closing stream.") stream.close end - response = client.post("/", body: body) - body.stream(response.body) - chunks.each do |chunk| expect(response_chunks.pop).to be == chunk end end end -[Async::HTTP::Protocol::HTTP1].each do |protocol| - describe protocol do +[Async::HTTP::Protocol::HTTP1, Async::HTTP::Protocol::HTTP2].each do |protocol| + describe protocol, unique: protocol.name do include Sus::Fixtures::Async::HTTP::ServerContext let(:protocol) {subject} it_behaves_like AnEchoServer - # it_behaves_like AnEchoClient + it_behaves_like AnEchoClient end end From e8167490fd11951879253aed3212f06b2d37d71c Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Mon, 16 Sep 2024 22:45:21 +1200 Subject: [PATCH 5/5] Update dependencies. --- async-http.gemspec | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/async-http.gemspec b/async-http.gemspec index e39e376..c316537 100644 --- a/async-http.gemspec +++ b/async-http.gemspec @@ -28,7 +28,7 @@ Gem::Specification.new do |spec| spec.add_dependency "async-pool", "~> 0.7" spec.add_dependency "io-endpoint", "~> 0.11" spec.add_dependency "io-stream", "~> 0.4" - spec.add_dependency "protocol-http", "~> 0.34" + spec.add_dependency "protocol-http", "~> 0.35" spec.add_dependency "protocol-http1", "~> 0.20" spec.add_dependency "protocol-http2", "~> 0.18" spec.add_dependency "traces", ">= 0.10"