diff --git a/.travis.yml b/.travis.yml index 2f57d12..50523e8 100644 --- a/.travis.yml +++ b/.travis.yml @@ -4,12 +4,11 @@ os: - linux - osx julia: - - 0.4 - - 0.5 + - 1.0 - nightly notifications: email: false # uncomment the following lines to override the default test script -#script: -# - if [[ -a .git/shallow ]]; then git fetch --unshallow; fi -# - julia -e 'Pkg.clone(pwd()); Pkg.build("PromiseExtractor"); Pkg.test("PromiseExtractor"; coverage=true)' +script: # the default script is equivalent to the following + - if [[ -a .git/shallow ]]; then git fetch --unshallow; fi + - julia -e 'using Pkg; Pkg.clone("https://github.com/tanmaykm/HPack.jl.git"); Pkg.clone(pwd()); Pkg.build("HTTP2"); Pkg.test("HTTP2"; coverage=true)'; diff --git a/README.md b/README.md index 17f6f2b..f131685 100644 --- a/README.md +++ b/README.md @@ -12,60 +12,64 @@ julia> using HTTP2 ## Simple Servers and Clients -The library can directly create simple servers and clients. For full support of -HTTP/2 Upgrade and HTTPS, use `HttpServer.jl` and `Requests.jl`. +The library can directly create simple servers and clients. You only use this library directly if you need low-level functionality. An example for the server is as follows. The code will be explained in the next section. ```julia +using HTTP2 +using Sockets +using Dates + +port = 8888 server = listen(port) -println("Server started.") -while(true) - buffer = accept(server) - println("Processing a connection ...") +println("Waiting for a connection ...") +buffer = accept(server) +println("Processing a connection ...") - connection = Session.new_connection(buffer; isclient=false) - ## Recv the client preface, and send an empty SETTING frame. +connection = HTTP2.Session.new_connection(buffer; isclient=false) - headers_evt = Session.take_evt!(connection) - stream_identifier = headers_evt.stream_identifier +## Recv the client preface, and send an empty SETTING frame. +headers_evt = HTTP2.Session.take_evt!(connection) +stream_identifier = headers_evt.stream_identifier - sending_headers = Headers(":status" => "200", - "server" => "HTTP2.jl", - "date" => Dates.format(now(Dates.UTC), Dates.RFC1123Format), - "content-type" => "text/html; charset=UTF-8") +sending_headers = HTTP2.Headers(":status" => "200", + "server" => "HTTP2.jl", + "date" => Dates.format(now(Dates.UTC), Dates.RFC1123Format), + "content-type" => "text/html; charset=UTF-8") +sending_body = convert(Vector{UInt8}, codeunits("hello")) - Session.put_act!(connection, Session.ActSendHeaders(stream_identifier, sending_headers, false)) - Session.put_act!(connection, Session.ActSendData(stream_identifier, body, true)) +@info("Resopnding", sending_headers, sending_body) - ## We are done! -end +HTTP2.Session.put_act!(connection, HTTP2.Session.ActSendHeaders(stream_identifier, sending_headers, false)) +HTTP2.Session.put_act!(connection, HTTP2.Session.ActSendData(stream_identifier, sending_body, true)) +## We are done! ``` A client can be started in a similar way. Again the code will be explained in the next section. ```julia -buffer = connect(dest, port) - -## Create a HTTPConnection object -connection = Session.new_connection(buffer; isclient=true) - -## Create a request with headers -headers = Headers(":method" => "GET", - ":path" => url, +using HTTP2 +using Sockets + +@info("Opening connection", conn_id) +buffer = connect("127.0.0.1", 8888) +connection = HTTP2.Session.new_connection(buffer; isclient=true) +headers = HTTP2.Headers(":method" => "GET", + ":path" => "/", ":scheme" => "http", ":authority" => "127.0.0.1:9000", "accept" => "*/*", "accept-encoding" => "gzip, deflate", "user-agent" => "HTTP2.jl") -Session.put_act!(connection, Session.ActSendHeaders(Session.next_free_stream_identifier(connection), headers, true)) - -return (Session.take_evt!(connection).headers, Session.take_evt!(connection).data) +@info("Sending request", req_id) +HTTP2.Session.put_act!(connection, HTTP2.Session.ActSendHeaders(HTTP2.Session.next_free_stream_identifier(connection), headers, true)) +(rcvd_headers, rcvd_data) = (HTTP2.Session.take_evt!(connection).headers, HTTP2.Session.take_evt!(connection).data) ``` ## Connection Lifecycle diff --git a/REQUIRE b/REQUIRE index 87a8582..65cc198 100644 --- a/REQUIRE +++ b/REQUIRE @@ -1,3 +1,3 @@ -julia 0.4 -HPack 0.2.0 -HttpCommon 0.2.4 \ No newline at end of file +julia 1.0 +HPack +MbedTLS diff --git a/src/Frame.jl b/src/Frame.jl index ef40719..ed84648 100644 --- a/src/Frame.jl +++ b/src/Frame.jl @@ -1,9 +1,10 @@ module Frame -import Base: ==, AbstractIOBuffer +import Base: == +import ..HTTP2: readallbytes @enum FRAME_TYPES DATA=0x0 HEADERS=0x1 PRIORITY=0x2 RST_STREAM=0x3 SETTINGS=0x4 PUSH_PROMISE=0x5 PING=0x6 GOAWAY=0x7 WINDOW_UPDATE=0x8 CONTINUATION=0x9 -immutable FrameHeader +struct FrameHeader length::UInt32 typ::FRAME_TYPES flags::UInt8 @@ -11,12 +12,12 @@ immutable FrameHeader end function decode_header(buf) - length_arr = read(buf, 3) + length_arr = readallbytes(buf, 3) length = UInt32(length_arr[1]) << 16 + UInt32(length_arr[2]) << 8 + UInt32(length_arr[3]) - typ = FRAME_TYPES(read(buf, 1)[1]) - flags = read(buf, 1)[1] - stream_identifier_arr = read(buf, 4) + typ = FRAME_TYPES(readallbytes(buf, 1)[1]) + flags = readallbytes(buf, 1)[1] + stream_identifier_arr = readallbytes(buf, 4) stream_identifier = UInt32(stream_identifier_arr[1]) << 24 + UInt32(stream_identifier_arr[2]) << 16 + UInt32(stream_identifier_arr[3]) << 8 + UInt32(stream_identifier_arr[4]) @@ -37,11 +38,11 @@ function encode_header(header::FrameHeader) write(buf, UInt8(header.stream_identifier >> 24), UInt8((header.stream_identifier >> 16) & 0x000000ff), UInt8((header.stream_identifier >> 8) & 0x000000ff), UInt8(header.stream_identifier & 0x000000ff)) - return takebuf_array(buf) + return take!(buf) end -type UnimplementedError <: Exception end +struct UnimplementedError <: Exception end include("Frame/utils.jl") include("Frame/data.jl") @@ -56,8 +57,11 @@ include("Frame/window_update.jl") include("Frame/continuation.jl") function decode(buf) + @show "trying to read header" header = decode_header(buf) - payload = read(buf, header.length) + @show header.typ + @show header.length + payload = readallbytes(buf, header.length) @assert length(payload) == header.length if header.typ == DATA diff --git a/src/Frame/continuation.jl b/src/Frame/continuation.jl index 58089cf..fc8ac74 100644 --- a/src/Frame/continuation.jl +++ b/src/Frame/continuation.jl @@ -1,4 +1,4 @@ -immutable ContinuationFrame +struct ContinuationFrame is_end_headers::Bool stream_identifier::UInt32 fragment::Array{UInt8, 1} diff --git a/src/Frame/data.jl b/src/Frame/data.jl index 89a1121..92eb91f 100644 --- a/src/Frame/data.jl +++ b/src/Frame/data.jl @@ -1,4 +1,4 @@ -immutable DataFrame +struct DataFrame stream_identifier::UInt32 is_end_stream::Bool data::Array{UInt8, 1} diff --git a/src/Frame/goaway.jl b/src/Frame/goaway.jl index a301ffb..e881ab3 100644 --- a/src/Frame/goaway.jl +++ b/src/Frame/goaway.jl @@ -1,7 +1,7 @@ -immutable GoawayFrame +struct GoawayFrame last_stream_identifier::UInt32 error_code::UInt32 - debug_data::Array{UInt8, 1} + debug_data::Vector{UInt8} end ==(a::GoawayFrame, b::GoawayFrame) = @@ -22,14 +22,14 @@ function decode_goaway(header, payload) end function encode_goaway(frame) - payload::Array{UInt8, 1} = [ UInt8(frame.last_stream_identifier >> 24) & 0x7f; - UInt8(frame.last_stream_identifier >> 16 & 0x000000ff); - UInt8(frame.last_stream_identifier >> 8 & 0x000000ff); - UInt8(frame.last_stream_identifier & 0x000000ff); - UInt8(frame.error_code >> 24); - UInt8(frame.error_code >> 16 & 0x000000ff); - UInt8(frame.error_code >> 8 & 0x000000ff); - UInt8(frame.error_code & 0x000000ff) ] + payload::Vector{UInt8} = [ UInt8(frame.last_stream_identifier >> 24) & 0x7f; + UInt8(frame.last_stream_identifier >> 16 & 0x000000ff); + UInt8(frame.last_stream_identifier >> 8 & 0x000000ff); + UInt8(frame.last_stream_identifier & 0x000000ff); + UInt8(frame.error_code >> 24); + UInt8(frame.error_code >> 16 & 0x000000ff); + UInt8(frame.error_code >> 8 & 0x000000ff); + UInt8(frame.error_code & 0x000000ff) ] append!(payload, frame.debug_data) return wrap_payload(payload, GOAWAY, 0x0, 0x0) diff --git a/src/Frame/headers.jl b/src/Frame/headers.jl index 1d625a3..2146a49 100644 --- a/src/Frame/headers.jl +++ b/src/Frame/headers.jl @@ -1,11 +1,11 @@ -immutable HeadersFrame +struct HeadersFrame is_end_stream::Bool is_end_headers::Bool is_priority::Bool stream_identifier::UInt32 - exclusive::Nullable{Bool} - dependent_stream_identifier::Nullable{UInt32} - weight::Nullable{UInt8} + exclusive::Union{Nothing,Bool} + dependent_stream_identifier::Union{Nothing,UInt32} + weight::Union{Nothing,UInt8} fragment::Array{UInt8, 1} end @@ -14,9 +14,9 @@ end a.is_end_headers == b.is_end_headers && a.is_priority == b.is_priority && a.stream_identifier == b.stream_identifier && - (isnull(a.exclusive) || a.exclusive.value == b.exclusive.value) && - (isnull(a.exclusive) || a.dependent_stream_identifier.value == b.dependent_stream_identifier.value) && - (isnull(a.exclusive) || a.weight.value == b.weight.value) && + ((a.exclusive === nothing) || a.exclusive == b.exclusive) && + ((a.exclusive === nothing) || a.dependent_stream_identifier == b.dependent_stream_identifier) && + ((a.exclusive === nothing) || a.weight == b.weight) && a.fragment == b.fragment function decode_headers(header, payload) @@ -33,11 +33,11 @@ function decode_headers(header, payload) weight = payload[5] return HeadersFrame(is_end_stream, is_end_headers, is_priority, header.stream_identifier, - Nullable(exclusive), Nullable(dependent_stream_identifier), - Nullable(weight), getindex(payload, 6:length(payload))) + exclusive, dependent_stream_identifier, + weight, getindex(payload, 6:length(payload))) else return HeadersFrame(is_end_stream, is_end_headers, is_priority, header.stream_identifier, - Nullable{Bool}(), Nullable{UInt32}(), Nullable{UInt8}(), payload) + nothing, nothing, nothing, payload) end end @@ -48,12 +48,12 @@ function encode_headers(frame) (frame.is_priority ? 0x20 : 0x0) if frame.is_priority - payload::Array{UInt8, 1} = [ UInt8(frame.dependent_stream_identifier.value >> 24) & 0x7f; - UInt8(frame.dependent_stream_identifier.value >> 16 & 0x000000ff); - UInt8(frame.dependent_stream_identifier.value >> 8 & 0x000000ff); - UInt8(frame.dependent_stream_identifier.value & 0x000000ff) ] - payload[1] = frame.exclusive.value ? (payload[1] | 0x80 ) : payload[1] - push!(payload, frame.weight.value) + payload::Array{UInt8, 1} = [ UInt8(frame.dependent_stream_identifier >> 24) & 0x7f; + UInt8(frame.dependent_stream_identifier >> 16 & 0x000000ff); + UInt8(frame.dependent_stream_identifier >> 8 & 0x000000ff); + UInt8(frame.dependent_stream_identifier & 0x000000ff) ] + payload[1] = frame.exclusive ? (payload[1] | 0x80 ) : payload[1] + push!(payload, frame.weight) append!(payload, frame.fragment) else payload = frame.fragment diff --git a/src/Frame/ping.jl b/src/Frame/ping.jl index 7992bff..e8548f6 100644 --- a/src/Frame/ping.jl +++ b/src/Frame/ping.jl @@ -1,4 +1,4 @@ -immutable PingFrame +struct PingFrame is_ack::Bool data::Array{UInt8, 1} end diff --git a/src/Frame/priority.jl b/src/Frame/priority.jl index fb68a41..0072aa0 100644 --- a/src/Frame/priority.jl +++ b/src/Frame/priority.jl @@ -1,4 +1,4 @@ -immutable PriorityFrame +struct PriorityFrame stream_identifier::UInt32 exclusive::Bool dependent_stream_identifier::UInt32 diff --git a/src/Frame/push_promise.jl b/src/Frame/push_promise.jl index c07923d..c9e9431 100644 --- a/src/Frame/push_promise.jl +++ b/src/Frame/push_promise.jl @@ -1,4 +1,4 @@ -immutable PushPromiseFrame +struct PushPromiseFrame is_end_headers::Bool stream_identifier::UInt32 promised_stream_identifier::UInt32 diff --git a/src/Frame/rst_stream.jl b/src/Frame/rst_stream.jl index 4cb991a..820a4b5 100644 --- a/src/Frame/rst_stream.jl +++ b/src/Frame/rst_stream.jl @@ -1,4 +1,4 @@ -immutable RstStreamFrame +struct RstStreamFrame stream_identifier::UInt32 error_code::UInt32 end diff --git a/src/Frame/settings.jl b/src/Frame/settings.jl index 0c392e2..a057586 100644 --- a/src/Frame/settings.jl +++ b/src/Frame/settings.jl @@ -1,17 +1,17 @@ @enum SETTING_IDENTIFIER SETTINGS_HEADER_TABLE_SIZE=0x1 SETTINGS_ENABLE_PUSH=0x2 SETTINGS_MAX_CONCURRENT_STREAMS=0x3 SETTINGS_INITIAL_WINDOW_SIZE=0x4 SETTINGS_MAX_FRAME_SIZE=0x5 SETTINGS_MAX_HEADER_LIST_SIZE=0x6 -immutable SettingsFrame +struct SettingsFrame is_ack::Bool - parameters::Nullable{Array{Tuple{SETTING_IDENTIFIER, UInt32}, 1}} + parameters::Union{Nothing,Array{Tuple{SETTING_IDENTIFIER, UInt32}, 1}} end -SettingsFrame() = SettingsFrame(false, Nullable(Array{Tuple{Frame.SETTING_IDENTIFIER, UInt32}, 1}())) +SettingsFrame() = SettingsFrame(false, Array{Tuple{Frame.SETTING_IDENTIFIER, UInt32}, 1}()) ==(a::SettingsFrame, b::SettingsFrame) = a.is_ack == b.is_ack && - (isnull(a.parameters) || a.parameters.value == b.parameters.value) + ((a.parameters === nothing) || a.parameters == b.parameters) -type UnknownIdentifierError <: Exception end +struct UnknownIdentifierError <: Exception end function decode_settings(header, payload) @assert header.stream_identifier == 0x0 @@ -20,7 +20,7 @@ function decode_settings(header, payload) if is_ack @assert length(payload) == 0 - return SettingsFrame(is_ack, Nullable{Array{Tuple{SETTING_IDENTIFIER, UInt32}}}()) + return SettingsFrame(is_ack, nothing) else parameters = Array{Tuple{SETTING_IDENTIFIER, UInt32}, 1}() for i = 1:div(length(payload), 6) @@ -29,7 +29,7 @@ function decode_settings(header, payload) UInt32(payload[(i-1)*6+5]) << 8 + UInt32(payload[(i-1)*6+6]) push!(parameters, (SETTING_IDENTIFIER(identifier), value)) end - return SettingsFrame(is_ack, Nullable(parameters)) + return SettingsFrame(is_ack, parameters) end end @@ -38,7 +38,7 @@ function encode_settings(frame) return wrap_payload([], SETTINGS, 0x1, 0x0) else payload = Array{UInt8, 1}() - for val in frame.parameters.value + for val in frame.parameters append!(payload, [ UInt8(UInt16(val[1]) >> 8); UInt8(UInt16(val[1]) & 0x00ff); UInt8(val[2] >> 24); diff --git a/src/Frame/window_update.jl b/src/Frame/window_update.jl index f1c355f..f54b929 100644 --- a/src/Frame/window_update.jl +++ b/src/Frame/window_update.jl @@ -1,4 +1,4 @@ -immutable WindowUpdateFrame +struct WindowUpdateFrame stream_identifier::UInt32 window_size_increment::UInt32 end diff --git a/src/HTTP2.jl b/src/HTTP2.jl index 5fa96d8..b7cf05c 100644 --- a/src/HTTP2.jl +++ b/src/HTTP2.jl @@ -1,71 +1,41 @@ module HTTP2 -import HttpCommon: Headers +using MbedTLS + +const Headers = Vector{Tuple{String,String}} + +bytearr(a::Vector{UInt8}) = a +bytearr(cs::Base.CodeUnits{UInt8,String}) = convert(Vector{UInt8}, cs) +bytearr(s::String) = bytearr(codeunits(s)) + +readallbytes(s, nbytes) = read(s, nbytes) +function readallbytes(s::MbedTLS.SSLContext, nbytes) + finalbuf = Vector{UInt8}(undef, nbytes) + lfinal = 0 + while (lfinal < nbytes) && !eof(s) + toread = min(nbytes - lfinal, bytesavailable(s)) + if toread > 0 + buf = Vector{UInt8}(undef, toread) + @show("trying to read $toread bytes") + nread = readbytes!(s, buf, toread) + @show("read $nread bytes") + else + nread = 0 + end + if nread > 0 + copyto!(finalbuf, lfinal+1, buf) + lfinal += nread + else + @show nread + sleep(0.5) + yield() + end + end + finalbuf +end # package code goes here include("Frame.jl") include("Session.jl") -## Below we try to fire a HTTP2 request. This request is meant to be tested -## against a local nghttp2 server running on port 9000. -## -## The server is started by the command: `nghttpd --verbose --no-tls 9000` -import HTTP2.Session - -function request(dest, port, url) - ## Fire a HTTP connection to port 9000 - buffer = connect(dest, port) - - ## Create a HTTPConnection object - connection = Session.new_connection(buffer; isclient=true) - - ## Create a request with headers - headers = Headers(":method" => "GET", - ":path" => url, - ":scheme" => "http", - ":authority" => "127.0.0.1:9000", - "accept" => "*/*", - "accept-encoding" => "gzip, deflate", - "user-agent" => "HTTP2.jl") - - Session.put_act!(connection, Session.ActSendHeaders(Session.next_free_stream_identifier(connection), headers, true)) - - return (Session.take_evt!(connection).headers, Session.take_evt!(connection).data) -end - -function handle_util_frames_until(connection) - received = Session.recv_next(connection) - while typeof(received) == Frame.SettingsFrame || typeof(received) == Frame.PriorityFrame - # do nothing for now - received = Session.recv_next(connection) - end - return received -end - -function serve(port, body) - server = listen(port) - - println("Server started.") - while(true) - buffer = accept(server) - println("Processing a connection ...") - - connection = Session.new_connection(buffer; isclient=false) - ## Recv the client preface, and send an empty SETTING frame. - - headers_evt = Session.take_evt!(connection) - stream_identifier = headers_evt.stream_identifier - - sending_headers = Headers(":status" => "200", - "server" => "HTTP2.jl", - "date" => Dates.format(now(Dates.UTC), Dates.RFC1123Format), - "content-type" => "text/html; charset=UTF-8") - - Session.put_act!(connection, Session.ActSendHeaders(stream_identifier, sending_headers, false)) - Session.put_act!(connection, Session.ActSendData(stream_identifier, body, true)) - - ## We are done! - end -end - end # module diff --git a/src/Session.jl b/src/Session.jl index 8019187..2813fa5 100644 --- a/src/Session.jl +++ b/src/Session.jl @@ -2,32 +2,31 @@ module Session import HPack import HPack: DynamicTable -import HttpCommon: Headers -import HTTP2.Frame +import HTTP2: bytearr, Frame, Headers import HTTP2.Frame: ContinuationFrame, DataFrame, GoawayFrame, HeadersFrame, PingFrame, PriorityFrame, PushPromiseFrame, RstStreamFrame, SettingsFrame, WindowUpdateFrame @enum STREAM_STATE IDLE=1 RESERVED_LOCAL=2 RESERVED_REMOTE=3 OPEN=4 HALF_CLOSED_REMOTE=5 HALF_CLOSED_LOCAL=6 CLOSED=7 -type Priority +mutable struct Priority dependent_stream_identifier::UInt32 weight::UInt8 end ## Actions, which should be feeded in to `in` channel. -immutable ActPromise +struct ActPromise stream_identifier::UInt32 promised_stream_identifier::UInt32 headers::Headers end -immutable ActSendHeaders +struct ActSendHeaders stream_identifier::UInt32 headers::Headers is_end_stream::Bool end -immutable ActSendData +struct ActSendData stream_identifier::UInt32 data::Array{UInt8, 1} is_end_stream::Bool @@ -35,44 +34,45 @@ end ## Events, which should be fetched from `out` channel. -immutable EvtPromise +struct EvtPromise stream_identifier::UInt32 promised_stream_identifier::UInt32 headers::Headers end -immutable EvtRecvHeaders +struct EvtRecvHeaders stream_identifier::UInt32 headers::Headers is_end_stream::Bool end -immutable EvtRecvData +struct EvtRecvData stream_identifier::UInt32 data::Array{UInt8, 1} is_end_stream::Bool end -immutable EvtGoaway end +struct EvtGoaway end -type HTTPStream +mutable struct HTTPStream stream_identifier::UInt32 state::STREAM_STATE window_size::UInt32 - priority::Nullable{Priority} + priority::Union{Nothing,Priority} end -type HTTPSettings +mutable struct HTTPSettings push_enabled::Bool - max_concurrent_streams::Nullable{UInt} + max_concurrent_streams::Union{Nothing,UInt} initial_window_size::UInt max_frame_size::UInt - max_header_list_size::Nullable{UInt} + max_header_list_size::Union{Nothing,UInt} end -HTTPSettings() = HTTPSettings(true, Nullable(), 65535, 16384, Nullable()) +HTTPSettings() = HTTPSettings(true, nothing, 65535, 16384, nothing) -type HTTPConnection +const DEFAULT_CHANNEL_SZ = 1024 +mutable struct HTTPConnection dynamic_table::DynamicTable streams::Array{HTTPStream, 1} window_size::UInt32 @@ -88,20 +88,24 @@ type HTTPConnection ## actions -> channel_act -> channel_act_raw -> io ## io -> channel_evt_raw -> channel_evt -> events + act_processor::Union{Nothing,Task} + act_raw_processor::Union{Nothing,Task} + evt_raw_processor::Union{Nothing,Task} + evt_processor::Union{Nothing,Task} + + function HTTPConnection(isclient) + new(HPack.DynamicTable(), + Vector{HTTPStream}(), + 1024, + isclient, + isclient ? 1 : 2, + HTTPSettings(), + false, + Channel(DEFAULT_CHANNEL_SZ), Channel(DEFAULT_CHANNEL_SZ), Channel(DEFAULT_CHANNEL_SZ), Channel(DEFAULT_CHANNEL_SZ), + nothing, nothing, nothing, nothing) + end end -HTTPConnection(isclient) = HTTPConnection(HPack.new_dynamic_table(), - Array{HTTPStream, 1}(), - 65535, - isclient, - isclient ? 1 : 2, - HTTPSettings(), - false, - - Channel(), - Channel(), - Channel(), - Channel()) function next_free_stream_identifier(connection::HTTPConnection) return connection.last_stream_identifier + 2 diff --git a/src/Session/channels.jl b/src/Session/channels.jl index bd4927e..bbc7ea2 100644 --- a/src/Session/channels.jl +++ b/src/Session/channels.jl @@ -4,7 +4,7 @@ function initialize_raw_loop_async(connection::HTTPConnection, buffer; skip_pref channel_evt_raw = connection.channel_evt_raw ## Initialize the connection - CLIENT_PREFACE = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" + CLIENT_PREFACE = bytearr("PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n") if !skip_preface if connection.isclient @@ -14,123 +14,134 @@ function initialize_raw_loop_async(connection::HTTPConnection, buffer; skip_pref end end - @async begin - while true - if connection.closed - break - end - - if eof(buffer) - connection.closed = true - continue - end + connection.evt_raw_processor = @async begin + try + while true + if connection.closed + break + end - frame = try - Frame.decode(buffer) - catch - goaway!(connection, ProtocolError("Decode error.")) - break - end + if eof(buffer) + connection.closed = true + continue + end - # Abstract atom headers frame away - if typeof(frame) == HeadersFrame || typeof(frame) == PushPromiseFrame - continuations = Array{ContinuationFrame, 1}() - while !(frame.is_end_headers || - (!frame.is_end_headers && length(continuations) == 0) || - continuations[length(continuations)].is_end_headers) - - continuation = try - Frame.decode(buffer) - catch - goaway!(connection, ProtocolError("Decode error.")) - break - end + frame = try + Frame.decode(buffer) + catch + goaway!(connection, ProtocolError("Decode error.")) + break + end - if !(typeof(continuation) == ContinuationFrame && - continuation.stream_identifier == frame.stream_identifier) - goaway!(connection, ProtocolError("Headers must be followed by continuations if it is not the end.")) + # Abstract atom headers frame away + if typeof(frame) == HeadersFrame || typeof(frame) == PushPromiseFrame + continuations = Array{ContinuationFrame, 1}() + while !(frame.is_end_headers || + (!frame.is_end_headers && length(continuations) == 0) || + continuations[length(continuations)].is_end_headers) + + continuation = try + Frame.decode(buffer) + catch + goaway!(connection, ProtocolError("Decode error.")) + break + end + + if !(typeof(continuation) == ContinuationFrame && + continuation.stream_identifier == frame.stream_identifier) + goaway!(connection, ProtocolError("Headers must be followed by continuations if it is not the end.")) + end + push!(continuations, continuation) end - push!(continuations, continuation) - end - fragment = copy(frame.fragment) - if length(continuations) > 0 - for i = 1:length(continuations) - append!(fragment, continuations[i].fragment) + fragment = copy(frame.fragment) + if length(continuations) > 0 + for i = 1:length(continuations) + append!(fragment, continuations[i].fragment) + end end - end - if typeof(frame) == HeadersFrame - put!(channel_evt_raw, HeadersFrame(frame.is_end_stream, - true, - frame.is_priority, - frame.stream_identifier, - frame.exclusive, - frame.dependent_stream_identifier, - frame.weight, - fragment)) - else - put!(channel_evt_raw, PushPromiseFrame(true, + if typeof(frame) == HeadersFrame + put!(channel_evt_raw, HeadersFrame(frame.is_end_stream, + true, + frame.is_priority, frame.stream_identifier, - frame.promised_stream_identifier, + frame.exclusive, + frame.dependent_stream_identifier, + frame.weight, fragment)) - end - else - put!(channel_evt_raw, frame) + else + put!(channel_evt_raw, PushPromiseFrame(true, + frame.stream_identifier, + frame.promised_stream_identifier, + fragment)) + end + else + put!(channel_evt_raw, frame) - if typeof(frame) == DataFrame && !frame.is_end_stream - put!(channel_act_raw, WindowUpdateFrame(0, length(frame.data))) - put!(channel_act_raw, WindowUpdateFrame(frame.stream_identifier, length(frame.data))) - end + if typeof(frame) == DataFrame && !frame.is_end_stream + put!(channel_act_raw, WindowUpdateFrame(0, length(frame.data))) + put!(channel_act_raw, WindowUpdateFrame(frame.stream_identifier, length(frame.data))) + end - if typeof(frame) == GoawayFrame - connection.closed = true + if typeof(frame) == GoawayFrame + connection.closed = true + end end end + catch ex + @info("Exception in connection.evt_raw_processor $ex") + rethrow(ex) end end - @async begin - while true - if connection.closed - break - end - - frame = take!(channel_act_raw) - - if typeof(frame) == HeadersFrame || typeof(frame) == PushPromiseFrame - write(buffer, Frame.encode(frame)) - while !frame.is_end_headers - continuation = take!(channel_act_raw) - if !(typeof(continuation) == ContinuationFrame && - continuation.stream_identifier == frame.stream_identifier) - goaway!(connection, InternalError("Headers must be followed by continuations if it is not the end.")) - end - write(buffer, Frame.encode(continuation)) + connection.act_raw_processor = @async begin + try + while true + if connection.closed + break end - else - encoded = Frame.encode(frame) - - if typeof(frame) == DataFrame - stream = get_stream(connection, frame.stream_identifier) - if stream.window_size < length(encoded) - put!(channel_act_raw, frame) - continue + + frame = take!(channel_act_raw) + + if typeof(frame) == HeadersFrame || typeof(frame) == PushPromiseFrame + write(buffer, Frame.encode(frame)) + while !frame.is_end_headers + continuation = take!(channel_act_raw) + if !(typeof(continuation) == ContinuationFrame && + continuation.stream_identifier == frame.stream_identifier) + goaway!(connection, InternalError("Headers must be followed by continuations if it is not the end.")) + end + write(buffer, Frame.encode(continuation)) end + else + encoded = Frame.encode(frame) - stream.window_size -= length(encoded) - end + if typeof(frame) == DataFrame + stream = get_stream(connection, frame.stream_identifier) + if stream.window_size < length(encoded) + @show("putting back as window_size=$(stream.window_size), len=$(length(encoded))") + put!(channel_act_raw, frame) + continue + end - write(buffer, encoded) + stream.window_size -= length(encoded) + end - if typeof(frame) == GoawayFrame - connection.closed = true + write(buffer, encoded) + + if typeof(frame) == GoawayFrame + connection.closed = true + end end end + catch ex + @info("Exception in connection.act_raw_processor $ex") + rethrow(ex) end end - put!(channel_act_raw, SettingsFrame(false, Nullable(Array{Tuple{Frame.SETTING_IDENTIFIER, UInt32}, 1}()))) + put!(channel_act_raw, SettingsFrame(false, Array{Tuple{Frame.SETTING_IDENTIFIER, UInt32}, 1}())) end function process_channel_act(connection::HTTPConnection) @@ -146,21 +157,21 @@ function process_channel_act(connection::HTTPConnection) end stream = get_stream(connection, act.stream_identifier) - if stream.state == IDLE && !isnull(connection.settings.max_concurrent_streams) && - concurrent_streams_count(connection) > get(connection.settings.max_concurrent_streams) + if stream.state == IDLE && (connection.settings.max_concurrent_streams !== nothing) && + concurrent_streams_count(connection) > connection.settings.max_concurrent_streams put!(channel_act, act) return end if typeof(act) == ActSendHeaders - if !isnull(connection.settings.max_header_list_size) + if (connection.settings.max_header_list_size !== nothing) sum = 0 for k in keys(act.headers) sum += length(k) + length(act.headers[k]) + 32 end - if sum > get(connection.settings.max_header_list_size) + if sum > connection.settings.max_header_list_size goaway!(connection, InternalError("Header list size exceeded.")) return end @@ -196,14 +207,13 @@ function process_channel_evt(connection::HTTPConnection) if typeof(frame) == SettingsFrame if !frame.is_ack - parameters = frame.parameters.value + parameters = frame.parameters if length(parameters) > 0 for i = 1:length(parameters) handle_setting!(connection, parameters[i][1], parameters[i][2]) end end - put!(channel_act_raw, - SettingsFrame(true, Nullable{Tuple{Frame.SETTING_IDENTIFIER, UInt32}}())) + put!(channel_act_raw, SettingsFrame(true, Array{Tuple{Frame.SETTING_IDENTIFIER, UInt32}, 1}())) end return end @@ -213,6 +223,7 @@ function process_channel_evt(connection::HTTPConnection) end if typeof(frame) == GoawayFrame @assert frame.error_code == 0x0 + put!(channel_evt, EvtGoaway()) return end if typeof(frame) == WindowUpdateFrame && frame.stream_identifier == 0x0 @@ -253,35 +264,33 @@ function process_channel_evt(connection::HTTPConnection) @assert false end -function select(waitset::Array) - c = Channel(length(waitset)) - for w in waitset - @async put!(c, (wait(w); w)) - end - take!(c) -end - -function initialize_loop_async(connection::HTTPConnection, buffer; skip_preface=false) - initialize_raw_loop_async(connection, buffer; skip_preface=skip_preface) - - channel_act_raw = connection.channel_act_raw - channel_act = connection.channel_act - channel_evt_raw = connection.channel_evt_raw - channel_evt = connection.channel_evt - - @async begin - while true - if connection.closed - put!(channel_evt, EvtGoaway()) - break +function initialize_in_loop_async(connection::HTTPConnection) + connection.act_processor = @async begin + try + while !connection.closed + process_channel_act(connection) end + put!(channel_evt, EvtGoaway()) + catch ex + @info("Exception in connection.act_processor $ex") + rethrow(ex) + end + end - c = select([channel_evt_raw, channel_act]) - if c == channel_evt_raw + connection.evt_raw_processor = @async begin + try + while !connection.closed # || (connection.closed && isready(connection.channel_evt_raw)) process_channel_evt(connection) - else - process_channel_act(connection) end + catch ex + @info("Exception in connection.evt_raw_processor $ex") + rethrow(ex) end end + nothing +end + +function initialize_loop_async(connection::HTTPConnection, buffer; skip_preface=false) + initialize_raw_loop_async(connection, buffer; skip_preface=skip_preface) + initialize_in_loop_async(connection) end diff --git a/src/Session/errors.jl b/src/Session/errors.jl index 8f38633..f5e8816 100644 --- a/src/Session/errors.jl +++ b/src/Session/errors.jl @@ -1,38 +1,40 @@ -@enum ERROR_CODE ERROR_NONE=0x0 - ERROR_PROTOCOL=0x1 - ERROR_INTERNAL=0x2 - ERROR_FLOW_CONTROL=0x3 - ERROR_SETTINGS_TIMEOUT=0x4 - ERROR_STREAM_CLOSED=0x5 - ERROR_FRAME_SIZE=0x6 - ERROR_REFUSED_STREAM=0x7 - ERROR_CANCEL=0x8 - ERROR_COMPRESSION=0x9 - ERROR_CONNECT=0xa - ERROR_ENHANCE_YOUR_CALM=0xb - ERROR_INADEQUATE_SECURITY=0xc - ERROR_HTTP_1_1_REQUIRED=0xd +@enum ERROR_CODE begin + ERROR_NONE=0x0 + ERROR_PROTOCOL=0x1 + ERROR_INTERNAL=0x2 + ERROR_FLOW_CONTROL=0x3 + ERROR_SETTINGS_TIMEOUT=0x4 + ERROR_STREAM_CLOSED=0x5 + ERROR_FRAME_SIZE=0x6 + ERROR_REFUSED_STREAM=0x7 + ERROR_CANCEL=0x8 + ERROR_COMPRESSION=0x9 + ERROR_CONNECT=0xa + ERROR_ENHANCE_YOUR_CALM=0xb + ERROR_INADEQUATE_SECURITY=0xc + ERROR_HTTP_1_1_REQUIRED=0xd +end -type ProtocolError - message::AbstractString +struct ProtocolError + message::String end -type InternalError - message::AbstractString +struct InternalError + message::String end -type NullError end +struct NullError end function goaway!(connection::HTTPConnection, error) error_code = if typeof(error) == NullError - 0x0 + ERROR_NONE elseif typeof(error) == InternalError - 0x2 + ERROR_INTERNAL else - 0x1 + ERROR_PROTOCOL end - frame = GoawayFrame(0x0, error_code, Array{UInt8, 1}()) + frame = GoawayFrame(0x0, UInt32(error_code), Array{UInt8, 1}()) put!(connection.channel_act_raw, frame) end diff --git a/src/Session/handlers.jl b/src/Session/handlers.jl index 2583869..7818e83 100644 --- a/src/Session/handlers.jl +++ b/src/Session/handlers.jl @@ -5,9 +5,9 @@ function recv_stream_headers(connection::HTTPConnection, frame::HeadersFrame) stream = get_stream(connection, stream_identifier) if frame.is_priority - handle_priority!(connection, stream_identifier, frame.exclusive.value, - frame.dependent_stream_identifier.value, - frame.weight.value) + handle_priority!(connection, stream_identifier, frame.exclusive, + frame.dependent_stream_identifier, + frame.weight) end block = copy(frame.fragment) @@ -25,8 +25,8 @@ function send_stream_headers(connection::HTTPConnection, act::ActSendHeaders) # We don't use padding in this implementation if connection.settings.max_frame_size < (length(block) + 6) splitLength = connection.settings.max_frame_size - 6 - header = HeadersFrame(is_end_stream, false, false, stream_identifier, Nullable{Bool}(), - Nullable{UInt32}(), Nullable{UInt8}(), getindex(block, 1:splitLength)) + header = HeadersFrame(is_end_stream, false, false, stream_identifier, nothing, + nothing, nothing, getindex(block, 1:splitLength)) put!(connection.channel_act_raw, header) curPos = splitLength + 1 @@ -37,8 +37,8 @@ function send_stream_headers(connection::HTTPConnection, act::ActSendHeaders) curPos = endPos + 1 end else - frame = HeadersFrame(is_end_stream, true, false, stream_identifier, Nullable{Bool}(), - Nullable{UInt32}(), Nullable{UInt8}(), block) + frame = HeadersFrame(is_end_stream, true, false, stream_identifier, nothing, + nothing, nothing, block) put!(connection.channel_act_raw, frame) end diff --git a/src/Session/settings.jl b/src/Session/settings.jl index a7b4476..55fa7be 100644 --- a/src/Session/settings.jl +++ b/src/Session/settings.jl @@ -4,7 +4,7 @@ function handle_setting!(connection::HTTPConnection, key::Frame.SETTING_IDENTIFI elseif key == Frame.SETTINGS_ENABLE_PUSH connection.settings.push_enabled = value != 0 elseif key == Frame.SETTINGS_MAX_CONCURRENT_STREAMS - connection.settings.max_concurrent_streams = Nullable(UInt(value)) + connection.settings.max_concurrent_streams = UInt(value) elseif key == Frame.SETTINGS_INITIAL_WINDOW_SIZE diff = UInt(value) - connection.settings.initial_window_size for stream in connection.streams @@ -14,7 +14,7 @@ function handle_setting!(connection::HTTPConnection, key::Frame.SETTING_IDENTIFI elseif key == Frame.SETTINGS_MAX_FRAME_SIZE connection.settings.max_frame_size = UInt(value) elseif key == Frame.SETTINGS_MAX_HEADER_LIST_SIZE - connection.settings.max_header_list_size = Nullable(UInt(value)) + connection.settings.max_header_list_size = UInt(value) else goaway!(connection, ProtocolError("Unknown settings key.")) end diff --git a/src/Session/states.jl b/src/Session/states.jl index 7b174bd..1a336da 100644 --- a/src/Session/states.jl +++ b/src/Session/states.jl @@ -39,7 +39,7 @@ function handle_stream_state!(connection::HTTPConnection, frame, issend::Bool) ## The endpoint can send a HEADERS frame. This causes the stream to open ## in a "half-closed (remote)" state. if typeof(frame) == HeadersFrame && issend - stream.state = HALC_CLOSED_REMOTE + stream.state = HALF_CLOSED_REMOTE return end diff --git a/src/Session/utils.jl b/src/Session/utils.jl index 06e2eda..9c14cd6 100644 --- a/src/Session/utils.jl +++ b/src/Session/utils.jl @@ -8,7 +8,7 @@ function get_stream(connection::HTTPConnection, stream_identifier::UInt32) end stream = HTTPStream(stream_identifier, IDLE, - connection.settings.initial_window_size, Nullable{Priority}()) + connection.settings.initial_window_size, nothing) push!(connection.streams, stream) return stream @@ -27,10 +27,10 @@ end function get_dependency_parent(connection::HTTPConnection, stream_identifier::UInt32) stream = get_stream(connection, stream_identifier) - if isnull(stream.priority) - return Nullable{Stream}() + if stream.priority === nothing + return nothing else - return Nullable(get_stream(stream.priority.value.dependent_stream_identifier)) + return get_stream(stream.priority.dependent_stream_identifier) end end @@ -40,8 +40,8 @@ function get_dependency_children(connection::HTTPConnection, stream_identifier:: for i = 1:length(connection.streams) stream = connection.streams[i] - if !isnull(stream.priority) && - stream.priority.value.dependent_stream_identifier == stream_identifier + if (stream.priority !== nothing) && + stream.priority.dependent_stream_identifier == stream_identifier push!(result, stream) end end @@ -57,11 +57,11 @@ function handle_priority!(connection::HTTPConnection, stream_identifier::UInt32, children = get_dependency_children(connection, stream_identifier) for i = 1:length(children) - children[i].priority.value.dependent_stream_identifier = stream_identifier + children[i].priority.dependent_stream_identifier = stream_identifier end end - stream.priority = Nullable(Priority(dependent_stream_identifier, weight)) + stream.priority = Priority(dependent_stream_identifier, weight) end function concurrent_streams_count(connection::HTTPConnection) diff --git a/test/.gitignore b/test/.gitignore new file mode 100644 index 0000000..4ee097b --- /dev/null +++ b/test/.gitignore @@ -0,0 +1,2 @@ +*.cert +*.key diff --git a/test/client.jl b/test/client.jl index 977c427..7a4eaf3 100644 --- a/test/client.jl +++ b/test/client.jl @@ -1,23 +1,65 @@ import HTTP2 using HTTP2.Frame -using Base.Test +using Test +using Sockets +using MbedTLS -## Run `nghttpd --verbose --no-tls 9000` to make this test pass -(headers, body) = HTTP2.request(ip"127.0.0.1", 9000, "/") +function sslconnect(dest, port, certhostname) + println("Connecting over SSL ...") + buffer = connect(dest, port) + sslconfig = MbedTLS.SSLConfig(false) + sslbuffer = MbedTLS.SSLContext() + MbedTLS.setup!(sslbuffer, sslconfig) + MbedTLS.set_bio!(sslbuffer, buffer) + MbedTLS.hostname!(sslbuffer, certhostname) + MbedTLS.handshake!(sslbuffer) + return sslbuffer +end + +function show_response(headers, body) + for header in headers + @info("Result header: " * String(header[1]) * ": " * String(header[2])) + end + @info("Result body: " * String(body)) +end + + +function test_request(dest, port, url, certhostname=nothing) + for conn_id in 1:3 + @info("Opening connection", conn_id) + buffer = (certhostname === nothing) ? connect(dest, port) : sslconnect(dest, port, certhostname) + connection = HTTP2.Session.new_connection(buffer; isclient=true) + headers = [(":method", "GET"), + (":path", url), + (":scheme", "http"), + (":authority", "127.0.0.1:9000"), + ("accept", "*/*"), + ("accept-encoding", "gzip, deflate"), + ("user-agent", "HTTP2.jl")] + + @sync begin + @async for req_id in 1:3 + @info("Sending request", req_id) + HTTP2.Session.put_act!(connection, HTTP2.Session.ActSendHeaders(HTTP2.Session.next_free_stream_identifier(connection), headers, true)) + end + @async for resp_id in 1:3 + resp_headers = HTTP2.Session.take_evt!(connection).headers + @show resp_headers + resp_data = HTTP2.Session.take_evt!(connection).data + @show resp_data + show_response(resp_headers, resp_data) + end + end + + @info("Closing connection", conn_id) + close(connection) + sleep(2) # wait for close message to percolate + close(buffer) + end +end -println() -println("Results of the request") -println("======================") -println() -println("Headers") -println("======================") -for header in headers - print(ascii(header[1])) - print(": ") - print(ascii(header[2])) - print("\n") +if length(ARGS) == 1 + test_request(ip"127.0.0.1", 8000, "/", ARGS[1]) +else + test_request(ip"127.0.0.1", 8000, "/") end -println() -println("Body") -println("======================") -println(ascii(body)) diff --git a/test/frames.jl b/test/frames.jl index eb7bc37..8dde11e 100644 --- a/test/frames.jl +++ b/test/frames.jl @@ -1,25 +1,13 @@ using HTTP2.Frame +import HTTP2: bytearr -@test decode(IOBuffer(encode(ContinuationFrame(false, 0x51, b"test")))) == - ContinuationFrame(false, 0x51, b"test") -@test decode(IOBuffer(encode(DataFrame(0x51, false, b"test")))) == - DataFrame(0x51, false, b"test") -@test decode(IOBuffer(encode(GoawayFrame(0x4f, 0x4, b"test")))) == - GoawayFrame(0x4f, 0x4, b"test") -@test decode(IOBuffer(encode(HeadersFrame(false, false, true, 0x51, - Nullable(false), Nullable(0x50), - Nullable(0x1), b"test")))) == - HeadersFrame(false, false, true, 0x51, - Nullable(false), Nullable(0x50), - Nullable(0x1), b"test") -@test decode(IOBuffer(encode(PingFrame(false, b"testtest")))) == - PingFrame(false, b"testtest") -@test decode(IOBuffer(encode(PriorityFrame(0x51, false, 0x50, 0x2)))) == - PriorityFrame(0x51, false, 0x50, 0x2) -@test decode(IOBuffer(encode(PushPromiseFrame(false, 0x51, 0x54, b"test")))) == - PushPromiseFrame(false, 0x51, 0x54, b"test") -@test decode(IOBuffer(encode(RstStreamFrame(0x51, 0x4)))) == - RstStreamFrame(0x51, 0x4) -@test decode(IOBuffer(encode(SettingsFrame(true, Nullable())))) == SettingsFrame(true, Nullable()) -@test decode(IOBuffer(encode(WindowUpdateFrame(0x51, 0x2)))) == - WindowUpdateFrame(0x51, 0x2) +@test decode(IOBuffer(encode(ContinuationFrame(false, 0x51, bytearr("test"))))) == ContinuationFrame(false, 0x51, bytearr("test")) +@test decode(IOBuffer(encode(DataFrame(0x51, false, bytearr("test"))))) == DataFrame(0x51, false, bytearr("test")) +@test decode(IOBuffer(encode(GoawayFrame(0x4f, 0x4, bytearr("test"))))) == GoawayFrame(0x4f, 0x4, bytearr("test")) +@test decode(IOBuffer(encode(HeadersFrame(false, false, true, 0x51, false, 0x50, 0x1, bytearr("test"))))) == HeadersFrame(false, false, true, 0x51, false, 0x50, 0x1, bytearr("test")) +@test decode(IOBuffer(encode(PingFrame(false, bytearr("testtest"))))) == PingFrame(false, bytearr("testtest")) +@test decode(IOBuffer(encode(PriorityFrame(0x51, false, 0x50, 0x2)))) == PriorityFrame(0x51, false, 0x50, 0x2) +@test decode(IOBuffer(encode(PushPromiseFrame(false, 0x51, 0x54, bytearr("test"))))) == PushPromiseFrame(false, 0x51, 0x54, bytearr("test")) +@test decode(IOBuffer(encode(RstStreamFrame(0x51, 0x4)))) == RstStreamFrame(0x51, 0x4) +@test decode(IOBuffer(encode(SettingsFrame(true, nothing)))) == SettingsFrame(true, nothing) +@test decode(IOBuffer(encode(WindowUpdateFrame(0x51, 0x2)))) == WindowUpdateFrame(0x51, 0x2) diff --git a/test/genkey.sh b/test/genkey.sh new file mode 100755 index 0000000..0f90696 --- /dev/null +++ b/test/genkey.sh @@ -0,0 +1,14 @@ +#! /usr/bin/env bash + +DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" +HOSTNAME=$1 + +openssl req \ + -new \ + -newkey rsa:4096 \ + -days 3650 \ + -nodes \ + -x509 \ + -subj "/C=IN/ST=Karnataka/L=Bangalore/O=Julia/CN=$HOSTNAME" \ + -keyout ${DIR}/$HOSTNAME.key \ + -out ${DIR}/$HOSTNAME.cert diff --git a/test/runtests.jl b/test/runtests.jl index 723c043..aecd584 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -1,3 +1,40 @@ -using Base.Test +using Test +using Sockets include("frames.jl") + +const opts = Base.JLOptions() +const inline_flag = opts.can_inline == 1 ? `` : `--inline=no` +const cov_flag = (opts.code_coverage == 1) ? `--code-coverage=user` : + (opts.code_coverage == 2) ? `--code-coverage=all` : + `` + +function run_test(script, args...) + srvrscript = joinpath(dirname(@__FILE__), script) + if isempty(args) + srvrcmd = `$(joinpath(Sys.BINDIR, "julia")) $cov_flag $inline_flag $script` + elseif length(args) == 2 + srvrcmd = `$(joinpath(Sys.BINDIR, "julia")) $cov_flag $inline_flag $script $(args[1]) $(args[2])` + elseif length(args) == 1 + srvrcmd = `$(joinpath(Sys.BINDIR, "julia")) $cov_flag $inline_flag $script $(args[1])` + end + println("Running tests from ", script, "\n", "="^60) + ret = run(srvrcmd) + println("Finished ", script, "\n", "="^60) + nothing +end + +@async run_test("server.jl") +sleep(10) +run_test("client.jl") +sleep(10) + +GENKEYSCRIPT = joinpath(dirname(@__FILE__), "genkey.sh") +HOSTNAME = gethostname() +run(`$GENKEYSCRIPT $HOSTNAME`) + +keyfile = joinpath(dirname(@__FILE__), "$HOSTNAME.key") +certfile = joinpath(dirname(@__FILE__), "$HOSTNAME.cert") +@async run_test("server.jl", certfile, keyfile) +sleep(10) +run_test("client.jl", HOSTNAME) diff --git a/test/server.jl b/test/server.jl index 70e885d..6b3077a 100644 --- a/test/server.jl +++ b/test/server.jl @@ -1,6 +1,70 @@ -import HTTP2 +using HTTP2 +import HTTP2: bytearr using HTTP2.Frame -using Base.Test +using Test +using Dates +using Sockets +using MbedTLS + +function sslaccept(server, certfile, keyfile) + println("Expecting a SSL connection ...") + sslconfig = MbedTLS.SSLConfig(certfile, keyfile) + buffer = accept(server) + sslbuffer = MbedTLS.SSLContext() + MbedTLS.setup!(sslbuffer, sslconfig) + MbedTLS.associate!(sslbuffer, buffer) + MbedTLS.handshake!(sslbuffer) + return sslbuffer +end + +# test serve method +function test_serve(port, body, certfile=nothing, keyfile=nothing) + server = listen(port) + + println("Server started.") + connid = 1 + while connid < 4 + println("Waiting for a connection ...") + buffer = ((certfile === nothing) || (keyfile === nothing)) ? accept(server) : sslaccept(server, certfile, keyfile) + println("Processing a connection ...") + + connection = HTTP2.Session.new_connection(buffer; isclient=false) + @info("Connected", connection) + ## Recv the client preface, and send an empty SETTING frame. + + while true + headers_evt = HTTP2.Session.take_evt!(connection) + @info("Received headers", headers_evt) + if isa(headers_evt, HTTP2.Session.EvtGoaway) + close(buffer) + break + end + + stream_identifier = headers_evt.stream_identifier + @info("Stream ", stream_identifier) + + sending_headers = [(":status", "200"), + ("server", "HTTP2.jl"), + ("date", Dates.format(now(Dates.UTC), Dates.RFC1123Format)), + ("content-type", "text/html; charset=UTF-8")] + sending_body = isa(body, String) ? convert(Vector{UInt8}, codeunits(body)) : body + @info("Resopnding", sending_headers, sending_body) + + HTTP2.Session.put_act!(connection, HTTP2.Session.ActSendHeaders(stream_identifier, sending_headers, false)) + @info("sent headers") + HTTP2.Session.put_act!(connection, HTTP2.Session.ActSendData(stream_identifier, sending_body, true)) + @info("sent body") + end + ## We are done! + connid += 1 + end +end # A server example -HTTP2.serve(8000, b"