From 9822186ffd3fcbb83616d34179d750e95b5b7193 Mon Sep 17 00:00:00 2001 From: tan Date: Sun, 4 Nov 2018 22:32:35 +0530 Subject: [PATCH 01/19] updated for Julia 1.0 --- REQUIRE | 5 ++--- src/Frame.jl | 8 +++---- src/Frame/continuation.jl | 2 +- src/Frame/data.jl | 2 +- src/Frame/goaway.jl | 2 +- src/Frame/headers.jl | 32 ++++++++++++++-------------- src/Frame/ping.jl | 2 +- src/Frame/priority.jl | 2 +- src/Frame/push_promise.jl | 2 +- src/Frame/rst_stream.jl | 2 +- src/Frame/settings.jl | 16 +++++++------- src/Frame/window_update.jl | 2 +- src/HTTP2.jl | 8 ++++++- src/Session.jl | 43 +++++++++++++++++++------------------- src/Session/channels.jl | 16 +++++++------- src/Session/errors.jl | 10 ++++----- src/Session/handlers.jl | 14 ++++++------- src/Session/settings.jl | 4 ++-- src/Session/utils.jl | 16 +++++++------- test/frames.jl | 34 ++++++++++-------------------- test/runtests.jl | 2 +- test/server.jl | 5 +++-- 22 files changed, 111 insertions(+), 118 deletions(-) diff --git a/REQUIRE b/REQUIRE index 87a8582..6ae8081 100644 --- a/REQUIRE +++ b/REQUIRE @@ -1,3 +1,2 @@ -julia 0.4 -HPack 0.2.0 -HttpCommon 0.2.4 \ No newline at end of file +julia 1.0 +HPack diff --git a/src/Frame.jl b/src/Frame.jl index ef40719..4fe30e3 100644 --- a/src/Frame.jl +++ b/src/Frame.jl @@ -1,9 +1,9 @@ module Frame -import Base: ==, AbstractIOBuffer +import Base: == @enum FRAME_TYPES DATA=0x0 HEADERS=0x1 PRIORITY=0x2 RST_STREAM=0x3 SETTINGS=0x4 PUSH_PROMISE=0x5 PING=0x6 GOAWAY=0x7 WINDOW_UPDATE=0x8 CONTINUATION=0x9 -immutable FrameHeader +struct FrameHeader length::UInt32 typ::FRAME_TYPES flags::UInt8 @@ -37,11 +37,11 @@ function encode_header(header::FrameHeader) write(buf, UInt8(header.stream_identifier >> 24), UInt8((header.stream_identifier >> 16) & 0x000000ff), UInt8((header.stream_identifier >> 8) & 0x000000ff), UInt8(header.stream_identifier & 0x000000ff)) - return takebuf_array(buf) + return take!(buf) end -type UnimplementedError <: Exception end +struct UnimplementedError <: Exception end include("Frame/utils.jl") include("Frame/data.jl") diff --git a/src/Frame/continuation.jl b/src/Frame/continuation.jl index 58089cf..fc8ac74 100644 --- a/src/Frame/continuation.jl +++ b/src/Frame/continuation.jl @@ -1,4 +1,4 @@ -immutable ContinuationFrame +struct ContinuationFrame is_end_headers::Bool stream_identifier::UInt32 fragment::Array{UInt8, 1} diff --git a/src/Frame/data.jl b/src/Frame/data.jl index 89a1121..92eb91f 100644 --- a/src/Frame/data.jl +++ b/src/Frame/data.jl @@ -1,4 +1,4 @@ -immutable DataFrame +struct DataFrame stream_identifier::UInt32 is_end_stream::Bool data::Array{UInt8, 1} diff --git a/src/Frame/goaway.jl b/src/Frame/goaway.jl index a301ffb..195b4fc 100644 --- a/src/Frame/goaway.jl +++ b/src/Frame/goaway.jl @@ -1,4 +1,4 @@ -immutable GoawayFrame +struct GoawayFrame last_stream_identifier::UInt32 error_code::UInt32 debug_data::Array{UInt8, 1} diff --git a/src/Frame/headers.jl b/src/Frame/headers.jl index 1d625a3..2146a49 100644 --- a/src/Frame/headers.jl +++ b/src/Frame/headers.jl @@ -1,11 +1,11 @@ -immutable HeadersFrame +struct HeadersFrame is_end_stream::Bool is_end_headers::Bool is_priority::Bool stream_identifier::UInt32 - exclusive::Nullable{Bool} - dependent_stream_identifier::Nullable{UInt32} - weight::Nullable{UInt8} + exclusive::Union{Nothing,Bool} + dependent_stream_identifier::Union{Nothing,UInt32} + weight::Union{Nothing,UInt8} fragment::Array{UInt8, 1} end @@ -14,9 +14,9 @@ end a.is_end_headers == b.is_end_headers && a.is_priority == b.is_priority && a.stream_identifier == b.stream_identifier && - (isnull(a.exclusive) || a.exclusive.value == b.exclusive.value) && - (isnull(a.exclusive) || a.dependent_stream_identifier.value == b.dependent_stream_identifier.value) && - (isnull(a.exclusive) || a.weight.value == b.weight.value) && + ((a.exclusive === nothing) || a.exclusive == b.exclusive) && + ((a.exclusive === nothing) || a.dependent_stream_identifier == b.dependent_stream_identifier) && + ((a.exclusive === nothing) || a.weight == b.weight) && a.fragment == b.fragment function decode_headers(header, payload) @@ -33,11 +33,11 @@ function decode_headers(header, payload) weight = payload[5] return HeadersFrame(is_end_stream, is_end_headers, is_priority, header.stream_identifier, - Nullable(exclusive), Nullable(dependent_stream_identifier), - Nullable(weight), getindex(payload, 6:length(payload))) + exclusive, dependent_stream_identifier, + weight, getindex(payload, 6:length(payload))) else return HeadersFrame(is_end_stream, is_end_headers, is_priority, header.stream_identifier, - Nullable{Bool}(), Nullable{UInt32}(), Nullable{UInt8}(), payload) + nothing, nothing, nothing, payload) end end @@ -48,12 +48,12 @@ function encode_headers(frame) (frame.is_priority ? 0x20 : 0x0) if frame.is_priority - payload::Array{UInt8, 1} = [ UInt8(frame.dependent_stream_identifier.value >> 24) & 0x7f; - UInt8(frame.dependent_stream_identifier.value >> 16 & 0x000000ff); - UInt8(frame.dependent_stream_identifier.value >> 8 & 0x000000ff); - UInt8(frame.dependent_stream_identifier.value & 0x000000ff) ] - payload[1] = frame.exclusive.value ? (payload[1] | 0x80 ) : payload[1] - push!(payload, frame.weight.value) + payload::Array{UInt8, 1} = [ UInt8(frame.dependent_stream_identifier >> 24) & 0x7f; + UInt8(frame.dependent_stream_identifier >> 16 & 0x000000ff); + UInt8(frame.dependent_stream_identifier >> 8 & 0x000000ff); + UInt8(frame.dependent_stream_identifier & 0x000000ff) ] + payload[1] = frame.exclusive ? (payload[1] | 0x80 ) : payload[1] + push!(payload, frame.weight) append!(payload, frame.fragment) else payload = frame.fragment diff --git a/src/Frame/ping.jl b/src/Frame/ping.jl index 7992bff..e8548f6 100644 --- a/src/Frame/ping.jl +++ b/src/Frame/ping.jl @@ -1,4 +1,4 @@ -immutable PingFrame +struct PingFrame is_ack::Bool data::Array{UInt8, 1} end diff --git a/src/Frame/priority.jl b/src/Frame/priority.jl index fb68a41..0072aa0 100644 --- a/src/Frame/priority.jl +++ b/src/Frame/priority.jl @@ -1,4 +1,4 @@ -immutable PriorityFrame +struct PriorityFrame stream_identifier::UInt32 exclusive::Bool dependent_stream_identifier::UInt32 diff --git a/src/Frame/push_promise.jl b/src/Frame/push_promise.jl index c07923d..c9e9431 100644 --- a/src/Frame/push_promise.jl +++ b/src/Frame/push_promise.jl @@ -1,4 +1,4 @@ -immutable PushPromiseFrame +struct PushPromiseFrame is_end_headers::Bool stream_identifier::UInt32 promised_stream_identifier::UInt32 diff --git a/src/Frame/rst_stream.jl b/src/Frame/rst_stream.jl index 4cb991a..820a4b5 100644 --- a/src/Frame/rst_stream.jl +++ b/src/Frame/rst_stream.jl @@ -1,4 +1,4 @@ -immutable RstStreamFrame +struct RstStreamFrame stream_identifier::UInt32 error_code::UInt32 end diff --git a/src/Frame/settings.jl b/src/Frame/settings.jl index 0c392e2..a057586 100644 --- a/src/Frame/settings.jl +++ b/src/Frame/settings.jl @@ -1,17 +1,17 @@ @enum SETTING_IDENTIFIER SETTINGS_HEADER_TABLE_SIZE=0x1 SETTINGS_ENABLE_PUSH=0x2 SETTINGS_MAX_CONCURRENT_STREAMS=0x3 SETTINGS_INITIAL_WINDOW_SIZE=0x4 SETTINGS_MAX_FRAME_SIZE=0x5 SETTINGS_MAX_HEADER_LIST_SIZE=0x6 -immutable SettingsFrame +struct SettingsFrame is_ack::Bool - parameters::Nullable{Array{Tuple{SETTING_IDENTIFIER, UInt32}, 1}} + parameters::Union{Nothing,Array{Tuple{SETTING_IDENTIFIER, UInt32}, 1}} end -SettingsFrame() = SettingsFrame(false, Nullable(Array{Tuple{Frame.SETTING_IDENTIFIER, UInt32}, 1}())) +SettingsFrame() = SettingsFrame(false, Array{Tuple{Frame.SETTING_IDENTIFIER, UInt32}, 1}()) ==(a::SettingsFrame, b::SettingsFrame) = a.is_ack == b.is_ack && - (isnull(a.parameters) || a.parameters.value == b.parameters.value) + ((a.parameters === nothing) || a.parameters == b.parameters) -type UnknownIdentifierError <: Exception end +struct UnknownIdentifierError <: Exception end function decode_settings(header, payload) @assert header.stream_identifier == 0x0 @@ -20,7 +20,7 @@ function decode_settings(header, payload) if is_ack @assert length(payload) == 0 - return SettingsFrame(is_ack, Nullable{Array{Tuple{SETTING_IDENTIFIER, UInt32}}}()) + return SettingsFrame(is_ack, nothing) else parameters = Array{Tuple{SETTING_IDENTIFIER, UInt32}, 1}() for i = 1:div(length(payload), 6) @@ -29,7 +29,7 @@ function decode_settings(header, payload) UInt32(payload[(i-1)*6+5]) << 8 + UInt32(payload[(i-1)*6+6]) push!(parameters, (SETTING_IDENTIFIER(identifier), value)) end - return SettingsFrame(is_ack, Nullable(parameters)) + return SettingsFrame(is_ack, parameters) end end @@ -38,7 +38,7 @@ function encode_settings(frame) return wrap_payload([], SETTINGS, 0x1, 0x0) else payload = Array{UInt8, 1}() - for val in frame.parameters.value + for val in frame.parameters append!(payload, [ UInt8(UInt16(val[1]) >> 8); UInt8(UInt16(val[1]) & 0x00ff); UInt8(val[2] >> 24); diff --git a/src/Frame/window_update.jl b/src/Frame/window_update.jl index f1c355f..f54b929 100644 --- a/src/Frame/window_update.jl +++ b/src/Frame/window_update.jl @@ -1,4 +1,4 @@ -immutable WindowUpdateFrame +struct WindowUpdateFrame stream_identifier::UInt32 window_size_increment::UInt32 end diff --git a/src/HTTP2.jl b/src/HTTP2.jl index 5fa96d8..faa123a 100644 --- a/src/HTTP2.jl +++ b/src/HTTP2.jl @@ -1,6 +1,12 @@ module HTTP2 -import HttpCommon: Headers +using Sockets + +const Headers = Dict{String,String} + +bytearr(a::Vector{UInt8}) = a +bytearr(cs::Base.CodeUnits{UInt8,String}) = convert(Vector{UInt8}, cs) +bytearr(s::String) = bytearr(codeunits(s)) # package code goes here include("Frame.jl") diff --git a/src/Session.jl b/src/Session.jl index 8019187..6f53118 100644 --- a/src/Session.jl +++ b/src/Session.jl @@ -2,32 +2,31 @@ module Session import HPack import HPack: DynamicTable -import HttpCommon: Headers -import HTTP2.Frame +import HTTP2: bytearr, Frame, Headers import HTTP2.Frame: ContinuationFrame, DataFrame, GoawayFrame, HeadersFrame, PingFrame, PriorityFrame, PushPromiseFrame, RstStreamFrame, SettingsFrame, WindowUpdateFrame @enum STREAM_STATE IDLE=1 RESERVED_LOCAL=2 RESERVED_REMOTE=3 OPEN=4 HALF_CLOSED_REMOTE=5 HALF_CLOSED_LOCAL=6 CLOSED=7 -type Priority +mutable struct Priority dependent_stream_identifier::UInt32 weight::UInt8 end ## Actions, which should be feeded in to `in` channel. -immutable ActPromise +struct ActPromise stream_identifier::UInt32 promised_stream_identifier::UInt32 headers::Headers end -immutable ActSendHeaders +struct ActSendHeaders stream_identifier::UInt32 headers::Headers is_end_stream::Bool end -immutable ActSendData +struct ActSendData stream_identifier::UInt32 data::Array{UInt8, 1} is_end_stream::Bool @@ -35,44 +34,45 @@ end ## Events, which should be fetched from `out` channel. -immutable EvtPromise +struct EvtPromise stream_identifier::UInt32 promised_stream_identifier::UInt32 headers::Headers end -immutable EvtRecvHeaders +struct EvtRecvHeaders stream_identifier::UInt32 headers::Headers is_end_stream::Bool end -immutable EvtRecvData +struct EvtRecvData stream_identifier::UInt32 data::Array{UInt8, 1} is_end_stream::Bool end -immutable EvtGoaway end +struct EvtGoaway end -type HTTPStream +mutable struct HTTPStream stream_identifier::UInt32 state::STREAM_STATE window_size::UInt32 - priority::Nullable{Priority} + priority::Union{Nothing,Priority} end -type HTTPSettings +mutable struct HTTPSettings push_enabled::Bool - max_concurrent_streams::Nullable{UInt} + max_concurrent_streams::Union{Nothing,UInt} initial_window_size::UInt max_frame_size::UInt - max_header_list_size::Nullable{UInt} + max_header_list_size::Union{Nothing,UInt} end -HTTPSettings() = HTTPSettings(true, Nullable(), 65535, 16384, Nullable()) +HTTPSettings() = HTTPSettings(true, nothing, 65535, 16384, nothing) -type HTTPConnection +const DEFAULT_CHANNEL_SZ = 1024 +mutable struct HTTPConnection dynamic_table::DynamicTable streams::Array{HTTPStream, 1} window_size::UInt32 @@ -97,11 +97,10 @@ HTTPConnection(isclient) = HTTPConnection(HPack.new_dynamic_table(), isclient ? 1 : 2, HTTPSettings(), false, - - Channel(), - Channel(), - Channel(), - Channel()) + Channel(DEFAULT_CHANNEL_SZ), + Channel(DEFAULT_CHANNEL_SZ), + Channel(DEFAULT_CHANNEL_SZ), + Channel(DEFAULT_CHANNEL_SZ)) function next_free_stream_identifier(connection::HTTPConnection) return connection.last_stream_identifier + 2 diff --git a/src/Session/channels.jl b/src/Session/channels.jl index bd4927e..3488215 100644 --- a/src/Session/channels.jl +++ b/src/Session/channels.jl @@ -4,7 +4,7 @@ function initialize_raw_loop_async(connection::HTTPConnection, buffer; skip_pref channel_evt_raw = connection.channel_evt_raw ## Initialize the connection - CLIENT_PREFACE = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" + CLIENT_PREFACE = bytearr("PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n") if !skip_preface if connection.isclient @@ -130,7 +130,7 @@ function initialize_raw_loop_async(connection::HTTPConnection, buffer; skip_pref end end - put!(channel_act_raw, SettingsFrame(false, Nullable(Array{Tuple{Frame.SETTING_IDENTIFIER, UInt32}, 1}()))) + put!(channel_act_raw, SettingsFrame(false, Array{Tuple{Frame.SETTING_IDENTIFIER, UInt32}, 1}())) end function process_channel_act(connection::HTTPConnection) @@ -146,21 +146,21 @@ function process_channel_act(connection::HTTPConnection) end stream = get_stream(connection, act.stream_identifier) - if stream.state == IDLE && !isnull(connection.settings.max_concurrent_streams) && - concurrent_streams_count(connection) > get(connection.settings.max_concurrent_streams) + if stream.state == IDLE && (connection.settings.max_concurrent_streams !== nothing) && + concurrent_streams_count(connection) > connection.settings.max_concurrent_streams put!(channel_act, act) return end if typeof(act) == ActSendHeaders - if !isnull(connection.settings.max_header_list_size) + if (connection.settings.max_header_list_size !== nothing) sum = 0 for k in keys(act.headers) sum += length(k) + length(act.headers[k]) + 32 end - if sum > get(connection.settings.max_header_list_size) + if sum > connection.settings.max_header_list_size goaway!(connection, InternalError("Header list size exceeded.")) return end @@ -196,14 +196,14 @@ function process_channel_evt(connection::HTTPConnection) if typeof(frame) == SettingsFrame if !frame.is_ack - parameters = frame.parameters.value + parameters = frame.parameters if length(parameters) > 0 for i = 1:length(parameters) handle_setting!(connection, parameters[i][1], parameters[i][2]) end end put!(channel_act_raw, - SettingsFrame(true, Nullable{Tuple{Frame.SETTING_IDENTIFIER, UInt32}}())) + SettingsFrame(true, Tuple{Frame.SETTING_IDENTIFIER, UInt32}())) end return end diff --git a/src/Session/errors.jl b/src/Session/errors.jl index 8f38633..ad36ddc 100644 --- a/src/Session/errors.jl +++ b/src/Session/errors.jl @@ -13,15 +13,15 @@ ERROR_INADEQUATE_SECURITY=0xc ERROR_HTTP_1_1_REQUIRED=0xd -type ProtocolError - message::AbstractString +struct ProtocolError + message::String end -type InternalError - message::AbstractString +struct InternalError + message::String end -type NullError end +struct NullError end function goaway!(connection::HTTPConnection, error) error_code = if typeof(error) == NullError diff --git a/src/Session/handlers.jl b/src/Session/handlers.jl index 2583869..7818e83 100644 --- a/src/Session/handlers.jl +++ b/src/Session/handlers.jl @@ -5,9 +5,9 @@ function recv_stream_headers(connection::HTTPConnection, frame::HeadersFrame) stream = get_stream(connection, stream_identifier) if frame.is_priority - handle_priority!(connection, stream_identifier, frame.exclusive.value, - frame.dependent_stream_identifier.value, - frame.weight.value) + handle_priority!(connection, stream_identifier, frame.exclusive, + frame.dependent_stream_identifier, + frame.weight) end block = copy(frame.fragment) @@ -25,8 +25,8 @@ function send_stream_headers(connection::HTTPConnection, act::ActSendHeaders) # We don't use padding in this implementation if connection.settings.max_frame_size < (length(block) + 6) splitLength = connection.settings.max_frame_size - 6 - header = HeadersFrame(is_end_stream, false, false, stream_identifier, Nullable{Bool}(), - Nullable{UInt32}(), Nullable{UInt8}(), getindex(block, 1:splitLength)) + header = HeadersFrame(is_end_stream, false, false, stream_identifier, nothing, + nothing, nothing, getindex(block, 1:splitLength)) put!(connection.channel_act_raw, header) curPos = splitLength + 1 @@ -37,8 +37,8 @@ function send_stream_headers(connection::HTTPConnection, act::ActSendHeaders) curPos = endPos + 1 end else - frame = HeadersFrame(is_end_stream, true, false, stream_identifier, Nullable{Bool}(), - Nullable{UInt32}(), Nullable{UInt8}(), block) + frame = HeadersFrame(is_end_stream, true, false, stream_identifier, nothing, + nothing, nothing, block) put!(connection.channel_act_raw, frame) end diff --git a/src/Session/settings.jl b/src/Session/settings.jl index a7b4476..55fa7be 100644 --- a/src/Session/settings.jl +++ b/src/Session/settings.jl @@ -4,7 +4,7 @@ function handle_setting!(connection::HTTPConnection, key::Frame.SETTING_IDENTIFI elseif key == Frame.SETTINGS_ENABLE_PUSH connection.settings.push_enabled = value != 0 elseif key == Frame.SETTINGS_MAX_CONCURRENT_STREAMS - connection.settings.max_concurrent_streams = Nullable(UInt(value)) + connection.settings.max_concurrent_streams = UInt(value) elseif key == Frame.SETTINGS_INITIAL_WINDOW_SIZE diff = UInt(value) - connection.settings.initial_window_size for stream in connection.streams @@ -14,7 +14,7 @@ function handle_setting!(connection::HTTPConnection, key::Frame.SETTING_IDENTIFI elseif key == Frame.SETTINGS_MAX_FRAME_SIZE connection.settings.max_frame_size = UInt(value) elseif key == Frame.SETTINGS_MAX_HEADER_LIST_SIZE - connection.settings.max_header_list_size = Nullable(UInt(value)) + connection.settings.max_header_list_size = UInt(value) else goaway!(connection, ProtocolError("Unknown settings key.")) end diff --git a/src/Session/utils.jl b/src/Session/utils.jl index 06e2eda..9c14cd6 100644 --- a/src/Session/utils.jl +++ b/src/Session/utils.jl @@ -8,7 +8,7 @@ function get_stream(connection::HTTPConnection, stream_identifier::UInt32) end stream = HTTPStream(stream_identifier, IDLE, - connection.settings.initial_window_size, Nullable{Priority}()) + connection.settings.initial_window_size, nothing) push!(connection.streams, stream) return stream @@ -27,10 +27,10 @@ end function get_dependency_parent(connection::HTTPConnection, stream_identifier::UInt32) stream = get_stream(connection, stream_identifier) - if isnull(stream.priority) - return Nullable{Stream}() + if stream.priority === nothing + return nothing else - return Nullable(get_stream(stream.priority.value.dependent_stream_identifier)) + return get_stream(stream.priority.dependent_stream_identifier) end end @@ -40,8 +40,8 @@ function get_dependency_children(connection::HTTPConnection, stream_identifier:: for i = 1:length(connection.streams) stream = connection.streams[i] - if !isnull(stream.priority) && - stream.priority.value.dependent_stream_identifier == stream_identifier + if (stream.priority !== nothing) && + stream.priority.dependent_stream_identifier == stream_identifier push!(result, stream) end end @@ -57,11 +57,11 @@ function handle_priority!(connection::HTTPConnection, stream_identifier::UInt32, children = get_dependency_children(connection, stream_identifier) for i = 1:length(children) - children[i].priority.value.dependent_stream_identifier = stream_identifier + children[i].priority.dependent_stream_identifier = stream_identifier end end - stream.priority = Nullable(Priority(dependent_stream_identifier, weight)) + stream.priority = Priority(dependent_stream_identifier, weight) end function concurrent_streams_count(connection::HTTPConnection) diff --git a/test/frames.jl b/test/frames.jl index eb7bc37..8dde11e 100644 --- a/test/frames.jl +++ b/test/frames.jl @@ -1,25 +1,13 @@ using HTTP2.Frame +import HTTP2: bytearr -@test decode(IOBuffer(encode(ContinuationFrame(false, 0x51, b"test")))) == - ContinuationFrame(false, 0x51, b"test") -@test decode(IOBuffer(encode(DataFrame(0x51, false, b"test")))) == - DataFrame(0x51, false, b"test") -@test decode(IOBuffer(encode(GoawayFrame(0x4f, 0x4, b"test")))) == - GoawayFrame(0x4f, 0x4, b"test") -@test decode(IOBuffer(encode(HeadersFrame(false, false, true, 0x51, - Nullable(false), Nullable(0x50), - Nullable(0x1), b"test")))) == - HeadersFrame(false, false, true, 0x51, - Nullable(false), Nullable(0x50), - Nullable(0x1), b"test") -@test decode(IOBuffer(encode(PingFrame(false, b"testtest")))) == - PingFrame(false, b"testtest") -@test decode(IOBuffer(encode(PriorityFrame(0x51, false, 0x50, 0x2)))) == - PriorityFrame(0x51, false, 0x50, 0x2) -@test decode(IOBuffer(encode(PushPromiseFrame(false, 0x51, 0x54, b"test")))) == - PushPromiseFrame(false, 0x51, 0x54, b"test") -@test decode(IOBuffer(encode(RstStreamFrame(0x51, 0x4)))) == - RstStreamFrame(0x51, 0x4) -@test decode(IOBuffer(encode(SettingsFrame(true, Nullable())))) == SettingsFrame(true, Nullable()) -@test decode(IOBuffer(encode(WindowUpdateFrame(0x51, 0x2)))) == - WindowUpdateFrame(0x51, 0x2) +@test decode(IOBuffer(encode(ContinuationFrame(false, 0x51, bytearr("test"))))) == ContinuationFrame(false, 0x51, bytearr("test")) +@test decode(IOBuffer(encode(DataFrame(0x51, false, bytearr("test"))))) == DataFrame(0x51, false, bytearr("test")) +@test decode(IOBuffer(encode(GoawayFrame(0x4f, 0x4, bytearr("test"))))) == GoawayFrame(0x4f, 0x4, bytearr("test")) +@test decode(IOBuffer(encode(HeadersFrame(false, false, true, 0x51, false, 0x50, 0x1, bytearr("test"))))) == HeadersFrame(false, false, true, 0x51, false, 0x50, 0x1, bytearr("test")) +@test decode(IOBuffer(encode(PingFrame(false, bytearr("testtest"))))) == PingFrame(false, bytearr("testtest")) +@test decode(IOBuffer(encode(PriorityFrame(0x51, false, 0x50, 0x2)))) == PriorityFrame(0x51, false, 0x50, 0x2) +@test decode(IOBuffer(encode(PushPromiseFrame(false, 0x51, 0x54, bytearr("test"))))) == PushPromiseFrame(false, 0x51, 0x54, bytearr("test")) +@test decode(IOBuffer(encode(RstStreamFrame(0x51, 0x4)))) == RstStreamFrame(0x51, 0x4) +@test decode(IOBuffer(encode(SettingsFrame(true, nothing)))) == SettingsFrame(true, nothing) +@test decode(IOBuffer(encode(WindowUpdateFrame(0x51, 0x2)))) == WindowUpdateFrame(0x51, 0x2) diff --git a/test/runtests.jl b/test/runtests.jl index 723c043..40c860b 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -1,3 +1,3 @@ -using Base.Test +using Test include("frames.jl") diff --git a/test/server.jl b/test/server.jl index 70e885d..42d391a 100644 --- a/test/server.jl +++ b/test/server.jl @@ -1,6 +1,7 @@ import HTTP2 +import HTTP2: bytearr using HTTP2.Frame -using Base.Test +using Test # A server example -HTTP2.serve(8000, b"

Hello, world!

") +HTTP2.serve(8000, bytearr("

Hello, world!

")) From 7158019353c6b3255082b96cf91a4adcaf7a633a Mon Sep 17 00:00:00 2001 From: tan Date: Mon, 24 Dec 2018 14:24:51 +0530 Subject: [PATCH 02/19] get client-server tests working --- src/HTTP2.jl | 1 + src/Session/channels.jl | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/src/HTTP2.jl b/src/HTTP2.jl index faa123a..9a4f165 100644 --- a/src/HTTP2.jl +++ b/src/HTTP2.jl @@ -1,6 +1,7 @@ module HTTP2 using Sockets +using Dates const Headers = Dict{String,String} diff --git a/src/Session/channels.jl b/src/Session/channels.jl index 3488215..308cd04 100644 --- a/src/Session/channels.jl +++ b/src/Session/channels.jl @@ -203,7 +203,7 @@ function process_channel_evt(connection::HTTPConnection) end end put!(channel_act_raw, - SettingsFrame(true, Tuple{Frame.SETTING_IDENTIFIER, UInt32}())) + SettingsFrame(true, Array{Tuple{Frame.SETTING_IDENTIFIER, UInt32}, 1}())) end return end From c8d565208b2b251f4da4235fe9c151341bd4bb17 Mon Sep 17 00:00:00 2001 From: tan Date: Mon, 24 Dec 2018 16:10:45 +0530 Subject: [PATCH 03/19] update client-server tests --- src/HTTP2.jl | 65 -------------------------------------------------- test/client.jl | 53 ++++++++++++++++++++++++++-------------- test/server.jl | 45 +++++++++++++++++++++++++++++++++- 3 files changed, 79 insertions(+), 84 deletions(-) diff --git a/src/HTTP2.jl b/src/HTTP2.jl index 9a4f165..2966a70 100644 --- a/src/HTTP2.jl +++ b/src/HTTP2.jl @@ -1,8 +1,5 @@ module HTTP2 -using Sockets -using Dates - const Headers = Dict{String,String} bytearr(a::Vector{UInt8}) = a @@ -13,66 +10,4 @@ bytearr(s::String) = bytearr(codeunits(s)) include("Frame.jl") include("Session.jl") -## Below we try to fire a HTTP2 request. This request is meant to be tested -## against a local nghttp2 server running on port 9000. -## -## The server is started by the command: `nghttpd --verbose --no-tls 9000` -import HTTP2.Session - -function request(dest, port, url) - ## Fire a HTTP connection to port 9000 - buffer = connect(dest, port) - - ## Create a HTTPConnection object - connection = Session.new_connection(buffer; isclient=true) - - ## Create a request with headers - headers = Headers(":method" => "GET", - ":path" => url, - ":scheme" => "http", - ":authority" => "127.0.0.1:9000", - "accept" => "*/*", - "accept-encoding" => "gzip, deflate", - "user-agent" => "HTTP2.jl") - - Session.put_act!(connection, Session.ActSendHeaders(Session.next_free_stream_identifier(connection), headers, true)) - - return (Session.take_evt!(connection).headers, Session.take_evt!(connection).data) -end - -function handle_util_frames_until(connection) - received = Session.recv_next(connection) - while typeof(received) == Frame.SettingsFrame || typeof(received) == Frame.PriorityFrame - # do nothing for now - received = Session.recv_next(connection) - end - return received -end - -function serve(port, body) - server = listen(port) - - println("Server started.") - while(true) - buffer = accept(server) - println("Processing a connection ...") - - connection = Session.new_connection(buffer; isclient=false) - ## Recv the client preface, and send an empty SETTING frame. - - headers_evt = Session.take_evt!(connection) - stream_identifier = headers_evt.stream_identifier - - sending_headers = Headers(":status" => "200", - "server" => "HTTP2.jl", - "date" => Dates.format(now(Dates.UTC), Dates.RFC1123Format), - "content-type" => "text/html; charset=UTF-8") - - Session.put_act!(connection, Session.ActSendHeaders(stream_identifier, sending_headers, false)) - Session.put_act!(connection, Session.ActSendData(stream_identifier, body, true)) - - ## We are done! - end -end - end # module diff --git a/test/client.jl b/test/client.jl index 977c427..97ad9ec 100644 --- a/test/client.jl +++ b/test/client.jl @@ -1,23 +1,40 @@ import HTTP2 using HTTP2.Frame -using Base.Test +using Test +using Sockets -## Run `nghttpd --verbose --no-tls 9000` to make this test pass -(headers, body) = HTTP2.request(ip"127.0.0.1", 9000, "/") +function show_response(headers, body) + for header in headers + @info("Result header: " * String(header[1]) * ": " * String(header[2])) + end + @info("Result body: " * String(body)) +end + + +function test_request(dest, port, url) + for conn_id in 1:3 + @info("Opening connection", conn_id) + buffer = connect(dest, port) + connection = HTTP2.Session.new_connection(buffer; isclient=true) + headers = HTTP2.Headers(":method" => "GET", + ":path" => url, + ":scheme" => "http", + ":authority" => "127.0.0.1:9000", + "accept" => "*/*", + "accept-encoding" => "gzip, deflate", + "user-agent" => "HTTP2.jl") -println() -println("Results of the request") -println("======================") -println() -println("Headers") -println("======================") -for header in headers - print(ascii(header[1])) - print(": ") - print(ascii(header[2])) - print("\n") + for req_id in 1:5 + @info("Sending request", req_id) + HTTP2.Session.put_act!(connection, HTTP2.Session.ActSendHeaders(HTTP2.Session.next_free_stream_identifier(connection), headers, true)) + show_response(HTTP2.Session.take_evt!(connection).headers, HTTP2.Session.take_evt!(connection).data) + end + + @info("Closing connection", conn_id) + close(connection) + sleep(2) # wait for close message to percolate + close(buffer) + end end -println() -println("Body") -println("======================") -println(ascii(body)) + +test_request(ip"127.0.0.1", 8000, "/") diff --git a/test/server.jl b/test/server.jl index 42d391a..df8cef5 100644 --- a/test/server.jl +++ b/test/server.jl @@ -2,6 +2,49 @@ import HTTP2 import HTTP2: bytearr using HTTP2.Frame using Test +using Dates +using Sockets + +# test serve method +function test_serve(port, body) + server = listen(port) + + println("Server started.") + while true + println("Waiting for a connection ...") + buffer = accept(server) + println("Processing a connection ...") + + connection = HTTP2.Session.new_connection(buffer; isclient=false) + @info("Connected", connection) + ## Recv the client preface, and send an empty SETTING frame. + + while true + headers_evt = HTTP2.Session.take_evt!(connection) + @info("Received headers", headers_evt) + if isa(headers_evt, HTTP2.Session.EvtGoaway) + close(buffer) + break + end + + stream_identifier = headers_evt.stream_identifier + @info("Stream ", stream_identifier) + + sending_headers = HTTP2.Headers(":status" => "200", + "server" => "HTTP2.jl", + "date" => Dates.format(now(Dates.UTC), Dates.RFC1123Format), + "content-type" => "text/html; charset=UTF-8") + sending_body = isa(body, String) ? convert(Vector{UInt8}, codeunits(body)) : body + @info("Resopnding", sending_headers, sending_body) + + HTTP2.Session.put_act!(connection, HTTP2.Session.ActSendHeaders(stream_identifier, sending_headers, false)) + @info("sent headers") + HTTP2.Session.put_act!(connection, HTTP2.Session.ActSendData(stream_identifier, sending_body, true)) + @info("sent body") + end + ## We are done! + end +end # A server example -HTTP2.serve(8000, bytearr("

Hello, world!

")) +test_serve(8000, bytearr("

Hello, world!

")) From 4f76540c0d7e5be10ed814caf83c442de8635113 Mon Sep 17 00:00:00 2001 From: tan Date: Mon, 24 Dec 2018 16:11:05 +0530 Subject: [PATCH 04/19] update README --- README.md | 62 +++++++++++++++++++++++++++++-------------------------- 1 file changed, 33 insertions(+), 29 deletions(-) diff --git a/README.md b/README.md index 17f6f2b..f131685 100644 --- a/README.md +++ b/README.md @@ -12,60 +12,64 @@ julia> using HTTP2 ## Simple Servers and Clients -The library can directly create simple servers and clients. For full support of -HTTP/2 Upgrade and HTTPS, use `HttpServer.jl` and `Requests.jl`. +The library can directly create simple servers and clients. You only use this library directly if you need low-level functionality. An example for the server is as follows. The code will be explained in the next section. ```julia +using HTTP2 +using Sockets +using Dates + +port = 8888 server = listen(port) -println("Server started.") -while(true) - buffer = accept(server) - println("Processing a connection ...") +println("Waiting for a connection ...") +buffer = accept(server) +println("Processing a connection ...") - connection = Session.new_connection(buffer; isclient=false) - ## Recv the client preface, and send an empty SETTING frame. +connection = HTTP2.Session.new_connection(buffer; isclient=false) - headers_evt = Session.take_evt!(connection) - stream_identifier = headers_evt.stream_identifier +## Recv the client preface, and send an empty SETTING frame. +headers_evt = HTTP2.Session.take_evt!(connection) +stream_identifier = headers_evt.stream_identifier - sending_headers = Headers(":status" => "200", - "server" => "HTTP2.jl", - "date" => Dates.format(now(Dates.UTC), Dates.RFC1123Format), - "content-type" => "text/html; charset=UTF-8") +sending_headers = HTTP2.Headers(":status" => "200", + "server" => "HTTP2.jl", + "date" => Dates.format(now(Dates.UTC), Dates.RFC1123Format), + "content-type" => "text/html; charset=UTF-8") +sending_body = convert(Vector{UInt8}, codeunits("hello")) - Session.put_act!(connection, Session.ActSendHeaders(stream_identifier, sending_headers, false)) - Session.put_act!(connection, Session.ActSendData(stream_identifier, body, true)) +@info("Resopnding", sending_headers, sending_body) - ## We are done! -end +HTTP2.Session.put_act!(connection, HTTP2.Session.ActSendHeaders(stream_identifier, sending_headers, false)) +HTTP2.Session.put_act!(connection, HTTP2.Session.ActSendData(stream_identifier, sending_body, true)) +## We are done! ``` A client can be started in a similar way. Again the code will be explained in the next section. ```julia -buffer = connect(dest, port) - -## Create a HTTPConnection object -connection = Session.new_connection(buffer; isclient=true) - -## Create a request with headers -headers = Headers(":method" => "GET", - ":path" => url, +using HTTP2 +using Sockets + +@info("Opening connection", conn_id) +buffer = connect("127.0.0.1", 8888) +connection = HTTP2.Session.new_connection(buffer; isclient=true) +headers = HTTP2.Headers(":method" => "GET", + ":path" => "/", ":scheme" => "http", ":authority" => "127.0.0.1:9000", "accept" => "*/*", "accept-encoding" => "gzip, deflate", "user-agent" => "HTTP2.jl") -Session.put_act!(connection, Session.ActSendHeaders(Session.next_free_stream_identifier(connection), headers, true)) - -return (Session.take_evt!(connection).headers, Session.take_evt!(connection).data) +@info("Sending request", req_id) +HTTP2.Session.put_act!(connection, HTTP2.Session.ActSendHeaders(HTTP2.Session.next_free_stream_identifier(connection), headers, true)) +(rcvd_headers, rcvd_data) = (HTTP2.Session.take_evt!(connection).headers, HTTP2.Session.take_evt!(connection).data) ``` ## Connection Lifecycle From 8d05bcc57da4b4b14f0522d4eafd8d2a19538bfc Mon Sep 17 00:00:00 2001 From: tan Date: Mon, 24 Dec 2018 16:11:48 +0530 Subject: [PATCH 05/19] update .travis.yml for Julia 1.0 --- .travis.yml | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/.travis.yml b/.travis.yml index 2f57d12..ed1afe4 100644 --- a/.travis.yml +++ b/.travis.yml @@ -4,12 +4,11 @@ os: - linux - osx julia: - - 0.4 - - 0.5 + - 1.0 - nightly notifications: email: false # uncomment the following lines to override the default test script -#script: -# - if [[ -a .git/shallow ]]; then git fetch --unshallow; fi -# - julia -e 'Pkg.clone(pwd()); Pkg.build("PromiseExtractor"); Pkg.test("PromiseExtractor"; coverage=true)' +script: # the default script is equivalent to the following + - if [[ -a .git/shallow ]]; then git fetch --unshallow; fi + - julia -e 'using Pkg; Pkg.clone("https://github.com/sorpaas/HPack.jl.git"); Pkg.clone(pwd()); Pkg.build("HTTP2"); Pkg.test("HTTP2"; coverage=true)'; From 44c082a64fa67476d1bce8c18fea397e590419eb Mon Sep 17 00:00:00 2001 From: tan Date: Tue, 1 Jan 2019 11:27:23 +0530 Subject: [PATCH 06/19] use latest HPack, improve testing --- .travis.yml | 2 +- src/HTTP2.jl | 2 +- src/Session.jl | 2 +- test/client.jl | 14 +++++++------- test/runtests.jl | 19 +++++++++++++++++++ test/server.jl | 12 +++++++----- 6 files changed, 36 insertions(+), 15 deletions(-) diff --git a/.travis.yml b/.travis.yml index ed1afe4..50523e8 100644 --- a/.travis.yml +++ b/.travis.yml @@ -11,4 +11,4 @@ notifications: # uncomment the following lines to override the default test script script: # the default script is equivalent to the following - if [[ -a .git/shallow ]]; then git fetch --unshallow; fi - - julia -e 'using Pkg; Pkg.clone("https://github.com/sorpaas/HPack.jl.git"); Pkg.clone(pwd()); Pkg.build("HTTP2"); Pkg.test("HTTP2"; coverage=true)'; + - julia -e 'using Pkg; Pkg.clone("https://github.com/tanmaykm/HPack.jl.git"); Pkg.clone(pwd()); Pkg.build("HTTP2"); Pkg.test("HTTP2"; coverage=true)'; diff --git a/src/HTTP2.jl b/src/HTTP2.jl index 2966a70..2bd5003 100644 --- a/src/HTTP2.jl +++ b/src/HTTP2.jl @@ -1,6 +1,6 @@ module HTTP2 -const Headers = Dict{String,String} +const Headers = Vector{Tuple{String,String}} bytearr(a::Vector{UInt8}) = a bytearr(cs::Base.CodeUnits{UInt8,String}) = convert(Vector{UInt8}, cs) diff --git a/src/Session.jl b/src/Session.jl index 6f53118..7d437ef 100644 --- a/src/Session.jl +++ b/src/Session.jl @@ -90,7 +90,7 @@ mutable struct HTTPConnection ## io -> channel_evt_raw -> channel_evt -> events end -HTTPConnection(isclient) = HTTPConnection(HPack.new_dynamic_table(), +HTTPConnection(isclient) = HTTPConnection(HPack.DynamicTable(), Array{HTTPStream, 1}(), 65535, isclient, diff --git a/test/client.jl b/test/client.jl index 97ad9ec..0249bcb 100644 --- a/test/client.jl +++ b/test/client.jl @@ -16,13 +16,13 @@ function test_request(dest, port, url) @info("Opening connection", conn_id) buffer = connect(dest, port) connection = HTTP2.Session.new_connection(buffer; isclient=true) - headers = HTTP2.Headers(":method" => "GET", - ":path" => url, - ":scheme" => "http", - ":authority" => "127.0.0.1:9000", - "accept" => "*/*", - "accept-encoding" => "gzip, deflate", - "user-agent" => "HTTP2.jl") + headers = [(":method", "GET"), + (":path", url), + (":scheme", "http"), + (":authority", "127.0.0.1:9000"), + ("accept", "*/*"), + ("accept-encoding", "gzip, deflate"), + ("user-agent", "HTTP2.jl")] for req_id in 1:5 @info("Sending request", req_id) diff --git a/test/runtests.jl b/test/runtests.jl index 40c860b..fd77d25 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -1,3 +1,22 @@ using Test include("frames.jl") + +const opts = Base.JLOptions() +const inline_flag = opts.can_inline == 1 ? `` : `--inline=no` +const cov_flag = (opts.code_coverage == 1) ? `--code-coverage=user` : + (opts.code_coverage == 2) ? `--code-coverage=all` : + `` + +function run_test(script) + srvrscript = joinpath(dirname(@__FILE__), script) + srvrcmd = `$(joinpath(Sys.BINDIR, "julia")) $cov_flag $inline_flag $script` + println("Running tests from ", script, "\n", "="^60) + ret = run(srvrcmd) + println("Finished ", script, "\n", "="^60) + nothing +end + +@async run_test("server.jl") +sleep(20) +run_test("client.jl") diff --git a/test/server.jl b/test/server.jl index df8cef5..b0d3cf3 100644 --- a/test/server.jl +++ b/test/server.jl @@ -10,7 +10,8 @@ function test_serve(port, body) server = listen(port) println("Server started.") - while true + connid = 1 + while connid < 4 println("Waiting for a connection ...") buffer = accept(server) println("Processing a connection ...") @@ -30,10 +31,10 @@ function test_serve(port, body) stream_identifier = headers_evt.stream_identifier @info("Stream ", stream_identifier) - sending_headers = HTTP2.Headers(":status" => "200", - "server" => "HTTP2.jl", - "date" => Dates.format(now(Dates.UTC), Dates.RFC1123Format), - "content-type" => "text/html; charset=UTF-8") + sending_headers = [(":status", "200"), + ("server", "HTTP2.jl"), + ("date", Dates.format(now(Dates.UTC), Dates.RFC1123Format)), + ("content-type", "text/html; charset=UTF-8")] sending_body = isa(body, String) ? convert(Vector{UInt8}, codeunits(body)) : body @info("Resopnding", sending_headers, sending_body) @@ -43,6 +44,7 @@ function test_serve(port, body) @info("sent body") end ## We are done! + connid += 1 end end From 45847026316883caff6aa6805800756f638d9eef Mon Sep 17 00:00:00 2001 From: tan Date: Tue, 8 Jan 2019 10:40:34 +0530 Subject: [PATCH 07/19] update for HTTPS connections --- REQUIRE | 1 + src/Frame.jl | 11 ++++++----- src/HTTP2.jl | 21 +++++++++++++++++++++ test/client.jl | 31 ++++++++++++++++++++++++++----- test/server.jl | 26 ++++++++++++++++++++++---- 5 files changed, 76 insertions(+), 14 deletions(-) diff --git a/REQUIRE b/REQUIRE index 6ae8081..65cc198 100644 --- a/REQUIRE +++ b/REQUIRE @@ -1,2 +1,3 @@ julia 1.0 HPack +MbedTLS diff --git a/src/Frame.jl b/src/Frame.jl index 4fe30e3..0882d06 100644 --- a/src/Frame.jl +++ b/src/Frame.jl @@ -1,5 +1,6 @@ module Frame import Base: == +import ..HTTP2: readallbytes @enum FRAME_TYPES DATA=0x0 HEADERS=0x1 PRIORITY=0x2 RST_STREAM=0x3 SETTINGS=0x4 PUSH_PROMISE=0x5 PING=0x6 GOAWAY=0x7 WINDOW_UPDATE=0x8 CONTINUATION=0x9 @@ -11,12 +12,12 @@ struct FrameHeader end function decode_header(buf) - length_arr = read(buf, 3) + length_arr = readallbytes(buf, 3) length = UInt32(length_arr[1]) << 16 + UInt32(length_arr[2]) << 8 + UInt32(length_arr[3]) - typ = FRAME_TYPES(read(buf, 1)[1]) - flags = read(buf, 1)[1] - stream_identifier_arr = read(buf, 4) + typ = FRAME_TYPES(readallbytes(buf, 1)[1]) + flags = readallbytes(buf, 1)[1] + stream_identifier_arr = readallbytes(buf, 4) stream_identifier = UInt32(stream_identifier_arr[1]) << 24 + UInt32(stream_identifier_arr[2]) << 16 + UInt32(stream_identifier_arr[3]) << 8 + UInt32(stream_identifier_arr[4]) @@ -57,7 +58,7 @@ include("Frame/continuation.jl") function decode(buf) header = decode_header(buf) - payload = read(buf, header.length) + payload = readallbytes(buf, header.length) @assert length(payload) == header.length if header.typ == DATA diff --git a/src/HTTP2.jl b/src/HTTP2.jl index 2bd5003..9998e91 100644 --- a/src/HTTP2.jl +++ b/src/HTTP2.jl @@ -1,11 +1,32 @@ module HTTP2 +using MbedTLS + const Headers = Vector{Tuple{String,String}} bytearr(a::Vector{UInt8}) = a bytearr(cs::Base.CodeUnits{UInt8,String}) = convert(Vector{UInt8}, cs) bytearr(s::String) = bytearr(codeunits(s)) +readallbytes(s, nbytes) = read(s, nbytes) +function readallbytes(s::MbedTLS.SSLContext, nbytes) + finalbuf = Vector{UInt8}(undef, nbytes) + lfinal = 0 + while lfinal < nbytes + toread = nbytes - lfinal + buf = Vector{UInt8}(undef, toread) + readbytes!(s, buf, toread) + nread = length(buf) + if nread > 0 + copyto!(finalbuf, lfinal+1, buf) + lfinal += nread + else + yield() + end + end + finalbuf +end + # package code goes here include("Frame.jl") include("Session.jl") diff --git a/test/client.jl b/test/client.jl index 0249bcb..3bd2d52 100644 --- a/test/client.jl +++ b/test/client.jl @@ -2,6 +2,19 @@ import HTTP2 using HTTP2.Frame using Test using Sockets +using MbedTLS + +function sslconnect(dest, port, certhostname) + println("Connecting over SSL ...") + buffer = connect(dest, port) + sslconfig = MbedTLS.SSLConfig(false) + sslbuffer = MbedTLS.SSLContext() + MbedTLS.setup!(sslbuffer, sslconfig) + MbedTLS.set_bio!(sslbuffer, buffer) + MbedTLS.hostname!(sslbuffer, certhostname) + MbedTLS.handshake!(sslbuffer) + return sslbuffer +end function show_response(headers, body) for header in headers @@ -11,10 +24,10 @@ function show_response(headers, body) end -function test_request(dest, port, url) +function test_request(dest, port, url, certhostname=nothing) for conn_id in 1:3 @info("Opening connection", conn_id) - buffer = connect(dest, port) + buffer = (certhostname === nothing) ? connect(dest, port) : sslconnect(dest, port, certhostname) connection = HTTP2.Session.new_connection(buffer; isclient=true) headers = [(":method", "GET"), (":path", url), @@ -24,10 +37,14 @@ function test_request(dest, port, url) ("accept-encoding", "gzip, deflate"), ("user-agent", "HTTP2.jl")] - for req_id in 1:5 + for req_id in 1:3 @info("Sending request", req_id) HTTP2.Session.put_act!(connection, HTTP2.Session.ActSendHeaders(HTTP2.Session.next_free_stream_identifier(connection), headers, true)) - show_response(HTTP2.Session.take_evt!(connection).headers, HTTP2.Session.take_evt!(connection).data) + resp_headers = HTTP2.Session.take_evt!(connection).headers + @show resp_headers + resp_data = HTTP2.Session.take_evt!(connection).data + @show resp_data + show_response(resp_headers, resp_data) end @info("Closing connection", conn_id) @@ -37,4 +54,8 @@ function test_request(dest, port, url) end end -test_request(ip"127.0.0.1", 8000, "/") +if length(ARGS) == 1 + test_request(ip"127.0.0.1", 8000, "/", ARGS[1]) +else + test_request(ip"127.0.0.1", 8000, "/") +end diff --git a/test/server.jl b/test/server.jl index b0d3cf3..6b3077a 100644 --- a/test/server.jl +++ b/test/server.jl @@ -1,19 +1,31 @@ -import HTTP2 +using HTTP2 import HTTP2: bytearr using HTTP2.Frame using Test using Dates using Sockets +using MbedTLS + +function sslaccept(server, certfile, keyfile) + println("Expecting a SSL connection ...") + sslconfig = MbedTLS.SSLConfig(certfile, keyfile) + buffer = accept(server) + sslbuffer = MbedTLS.SSLContext() + MbedTLS.setup!(sslbuffer, sslconfig) + MbedTLS.associate!(sslbuffer, buffer) + MbedTLS.handshake!(sslbuffer) + return sslbuffer +end # test serve method -function test_serve(port, body) +function test_serve(port, body, certfile=nothing, keyfile=nothing) server = listen(port) println("Server started.") connid = 1 while connid < 4 println("Waiting for a connection ...") - buffer = accept(server) + buffer = ((certfile === nothing) || (keyfile === nothing)) ? accept(server) : sslaccept(server, certfile, keyfile) println("Processing a connection ...") connection = HTTP2.Session.new_connection(buffer; isclient=false) @@ -49,4 +61,10 @@ function test_serve(port, body) end # A server example -test_serve(8000, bytearr("

Hello, world!

")) +if length(ARGS) == 2 + certfile = ARGS[1] + keyfile = ARGS[2] + test_serve(8000, bytearr("

Hello, world!

"), certfile, keyfile) +else + test_serve(8000, bytearr("

Hello, world!

")) +end From 99a1ed60190d8f63e7b09686ce1bf254744bde69 Mon Sep 17 00:00:00 2001 From: tan Date: Tue, 8 Jan 2019 11:24:00 +0530 Subject: [PATCH 08/19] add https tests --- test/.gitignore | 2 ++ test/genkey.sh | 13 +++++++++++++ test/runtests.jl | 21 +++++++++++++++++++-- 3 files changed, 34 insertions(+), 2 deletions(-) create mode 100644 test/.gitignore create mode 100755 test/genkey.sh diff --git a/test/.gitignore b/test/.gitignore new file mode 100644 index 0000000..4ee097b --- /dev/null +++ b/test/.gitignore @@ -0,0 +1,2 @@ +*.cert +*.key diff --git a/test/genkey.sh b/test/genkey.sh new file mode 100755 index 0000000..cb430cf --- /dev/null +++ b/test/genkey.sh @@ -0,0 +1,13 @@ +#! /usr/bin/env bash + +DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" + +openssl req \ + -new \ + -newkey rsa:4096 \ + -days 3650 \ + -nodes \ + -x509 \ + -subj "/C=IN/ST=Karnataka/L=Bangalore/O=Julia/CN=www.example.com" \ + -keyout ${DIR}/www.example.com.key \ + -out ${DIR}/www.example.com.cert diff --git a/test/runtests.jl b/test/runtests.jl index fd77d25..798bfad 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -8,9 +8,15 @@ const cov_flag = (opts.code_coverage == 1) ? `--code-coverage=user` : (opts.code_coverage == 2) ? `--code-coverage=all` : `` -function run_test(script) +function run_test(script, args...) srvrscript = joinpath(dirname(@__FILE__), script) - srvrcmd = `$(joinpath(Sys.BINDIR, "julia")) $cov_flag $inline_flag $script` + if isempty(args) + srvrcmd = `$(joinpath(Sys.BINDIR, "julia")) $cov_flag $inline_flag $script` + elseif length(args) == 2 + srvrcmd = `$(joinpath(Sys.BINDIR, "julia")) $cov_flag $inline_flag $script $(args[1]) $(args[2])` + elseif length(args) == 1 + srvrcmd = `$(joinpath(Sys.BINDIR, "julia")) $cov_flag $inline_flag $script $(args[1])` + end println("Running tests from ", script, "\n", "="^60) ret = run(srvrcmd) println("Finished ", script, "\n", "="^60) @@ -20,3 +26,14 @@ end @async run_test("server.jl") sleep(20) run_test("client.jl") +sleep(20) + +GENKEYSCRIPT = joinpath(dirname(@__FILE__), "genkey.sh") +run(`$GENKEYSCRIPT`) + +keyfile = joinpath(dirname(@__FILE__), "www.example.com.key") +certfile = joinpath(dirname(@__FILE__), "www.example.com.cert") +@async run_test("server.jl", certfile, keyfile) +sleep(20) +run_test("client.jl", "www.example.com") +sleep(20) From c63a14b63f51feeb3904b058d1615bc846bc3848 Mon Sep 17 00:00:00 2001 From: tan Date: Tue, 8 Jan 2019 11:38:56 +0530 Subject: [PATCH 09/19] use hostname for cert gen --- test/genkey.sh | 7 ++++--- test/runtests.jl | 10 ++++++---- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/test/genkey.sh b/test/genkey.sh index cb430cf..0f90696 100755 --- a/test/genkey.sh +++ b/test/genkey.sh @@ -1,6 +1,7 @@ #! /usr/bin/env bash DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" +HOSTNAME=$1 openssl req \ -new \ @@ -8,6 +9,6 @@ openssl req \ -days 3650 \ -nodes \ -x509 \ - -subj "/C=IN/ST=Karnataka/L=Bangalore/O=Julia/CN=www.example.com" \ - -keyout ${DIR}/www.example.com.key \ - -out ${DIR}/www.example.com.cert + -subj "/C=IN/ST=Karnataka/L=Bangalore/O=Julia/CN=$HOSTNAME" \ + -keyout ${DIR}/$HOSTNAME.key \ + -out ${DIR}/$HOSTNAME.cert diff --git a/test/runtests.jl b/test/runtests.jl index 798bfad..1ff0a3a 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -1,4 +1,5 @@ using Test +using Sockets include("frames.jl") @@ -29,11 +30,12 @@ run_test("client.jl") sleep(20) GENKEYSCRIPT = joinpath(dirname(@__FILE__), "genkey.sh") -run(`$GENKEYSCRIPT`) +HOSTNAME = gethostname() +run(`$GENKEYSCRIPT $HOSTNAME`) -keyfile = joinpath(dirname(@__FILE__), "www.example.com.key") -certfile = joinpath(dirname(@__FILE__), "www.example.com.cert") +keyfile = joinpath(dirname(@__FILE__), "$HOSTNAME.key") +certfile = joinpath(dirname(@__FILE__), "$HOSTNAME.cert") @async run_test("server.jl", certfile, keyfile) sleep(20) -run_test("client.jl", "www.example.com") +run_test("client.jl", HOSTNAME) sleep(20) From d9fa8aba235dba3cc69d499448d63bc702ac54bb Mon Sep 17 00:00:00 2001 From: tan Date: Tue, 8 Jan 2019 15:33:02 +0530 Subject: [PATCH 10/19] fixes --- src/Session/errors.jl | 38 ++++++++++++++++++++------------------ src/Session/states.jl | 2 +- 2 files changed, 21 insertions(+), 19 deletions(-) diff --git a/src/Session/errors.jl b/src/Session/errors.jl index ad36ddc..f5e8816 100644 --- a/src/Session/errors.jl +++ b/src/Session/errors.jl @@ -1,17 +1,19 @@ -@enum ERROR_CODE ERROR_NONE=0x0 - ERROR_PROTOCOL=0x1 - ERROR_INTERNAL=0x2 - ERROR_FLOW_CONTROL=0x3 - ERROR_SETTINGS_TIMEOUT=0x4 - ERROR_STREAM_CLOSED=0x5 - ERROR_FRAME_SIZE=0x6 - ERROR_REFUSED_STREAM=0x7 - ERROR_CANCEL=0x8 - ERROR_COMPRESSION=0x9 - ERROR_CONNECT=0xa - ERROR_ENHANCE_YOUR_CALM=0xb - ERROR_INADEQUATE_SECURITY=0xc - ERROR_HTTP_1_1_REQUIRED=0xd +@enum ERROR_CODE begin + ERROR_NONE=0x0 + ERROR_PROTOCOL=0x1 + ERROR_INTERNAL=0x2 + ERROR_FLOW_CONTROL=0x3 + ERROR_SETTINGS_TIMEOUT=0x4 + ERROR_STREAM_CLOSED=0x5 + ERROR_FRAME_SIZE=0x6 + ERROR_REFUSED_STREAM=0x7 + ERROR_CANCEL=0x8 + ERROR_COMPRESSION=0x9 + ERROR_CONNECT=0xa + ERROR_ENHANCE_YOUR_CALM=0xb + ERROR_INADEQUATE_SECURITY=0xc + ERROR_HTTP_1_1_REQUIRED=0xd +end struct ProtocolError message::String @@ -25,14 +27,14 @@ struct NullError end function goaway!(connection::HTTPConnection, error) error_code = if typeof(error) == NullError - 0x0 + ERROR_NONE elseif typeof(error) == InternalError - 0x2 + ERROR_INTERNAL else - 0x1 + ERROR_PROTOCOL end - frame = GoawayFrame(0x0, error_code, Array{UInt8, 1}()) + frame = GoawayFrame(0x0, UInt32(error_code), Array{UInt8, 1}()) put!(connection.channel_act_raw, frame) end diff --git a/src/Session/states.jl b/src/Session/states.jl index 7b174bd..1a336da 100644 --- a/src/Session/states.jl +++ b/src/Session/states.jl @@ -39,7 +39,7 @@ function handle_stream_state!(connection::HTTPConnection, frame, issend::Bool) ## The endpoint can send a HEADERS frame. This causes the stream to open ## in a "half-closed (remote)" state. if typeof(frame) == HeadersFrame && issend - stream.state = HALC_CLOSED_REMOTE + stream.state = HALF_CLOSED_REMOTE return end From 9dc083b882fdf1bf486ff8be489d8e762f2c3ae1 Mon Sep 17 00:00:00 2001 From: tan Date: Tue, 8 Jan 2019 16:40:59 +0530 Subject: [PATCH 11/19] minor cosmetic change --- src/Frame/goaway.jl | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/src/Frame/goaway.jl b/src/Frame/goaway.jl index 195b4fc..e881ab3 100644 --- a/src/Frame/goaway.jl +++ b/src/Frame/goaway.jl @@ -1,7 +1,7 @@ struct GoawayFrame last_stream_identifier::UInt32 error_code::UInt32 - debug_data::Array{UInt8, 1} + debug_data::Vector{UInt8} end ==(a::GoawayFrame, b::GoawayFrame) = @@ -22,14 +22,14 @@ function decode_goaway(header, payload) end function encode_goaway(frame) - payload::Array{UInt8, 1} = [ UInt8(frame.last_stream_identifier >> 24) & 0x7f; - UInt8(frame.last_stream_identifier >> 16 & 0x000000ff); - UInt8(frame.last_stream_identifier >> 8 & 0x000000ff); - UInt8(frame.last_stream_identifier & 0x000000ff); - UInt8(frame.error_code >> 24); - UInt8(frame.error_code >> 16 & 0x000000ff); - UInt8(frame.error_code >> 8 & 0x000000ff); - UInt8(frame.error_code & 0x000000ff) ] + payload::Vector{UInt8} = [ UInt8(frame.last_stream_identifier >> 24) & 0x7f; + UInt8(frame.last_stream_identifier >> 16 & 0x000000ff); + UInt8(frame.last_stream_identifier >> 8 & 0x000000ff); + UInt8(frame.last_stream_identifier & 0x000000ff); + UInt8(frame.error_code >> 24); + UInt8(frame.error_code >> 16 & 0x000000ff); + UInt8(frame.error_code >> 8 & 0x000000ff); + UInt8(frame.error_code & 0x000000ff) ] append!(payload, frame.debug_data) return wrap_payload(payload, GOAWAY, 0x0, 0x0) From 2ba38fbecfa11547e52e768ac9628491dfaee0be Mon Sep 17 00:00:00 2001 From: tan Date: Tue, 8 Jan 2019 16:41:36 +0530 Subject: [PATCH 12/19] print exceptions from async blocks --- src/Session/channels.jl | 24 +++++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/src/Session/channels.jl b/src/Session/channels.jl index 308cd04..d4161ed 100644 --- a/src/Session/channels.jl +++ b/src/Session/channels.jl @@ -15,6 +15,7 @@ function initialize_raw_loop_async(connection::HTTPConnection, buffer; skip_pref end @async begin + try while true if connection.closed break @@ -88,9 +89,14 @@ function initialize_raw_loop_async(connection::HTTPConnection, buffer; skip_pref end end end + catch ex + @info("Got an exception $ex") + rethrow(ex) + end end @async begin + try while true if connection.closed break @@ -128,6 +134,10 @@ function initialize_raw_loop_async(connection::HTTPConnection, buffer; skip_pref end end end + catch ex + @info("Got an exception $ex") + rethrow(ex) + end end put!(channel_act_raw, SettingsFrame(false, Array{Tuple{Frame.SETTING_IDENTIFIER, UInt32}, 1}())) @@ -256,7 +266,14 @@ end function select(waitset::Array) c = Channel(length(waitset)) for w in waitset - @async put!(c, (wait(w); w)) + @async begin + try + put!(c, (wait(w); w)) + catch ex + @info("Got an exception $ex") + rethrow(ex) + end + end end take!(c) end @@ -270,6 +287,7 @@ function initialize_loop_async(connection::HTTPConnection, buffer; skip_preface= channel_evt = connection.channel_evt @async begin + try while true if connection.closed put!(channel_evt, EvtGoaway()) @@ -283,5 +301,9 @@ function initialize_loop_async(connection::HTTPConnection, buffer; skip_preface= process_channel_act(connection) end end + catch ex + @info("Got an exception $ex") + rethrow(ex) + end end end From d5f1b3f4b33ef19d5aec74944c289abeb4f3e7c9 Mon Sep 17 00:00:00 2001 From: tan Date: Tue, 8 Jan 2019 21:20:09 +0530 Subject: [PATCH 13/19] more cleanups --- src/Session.jl | 27 +++-- src/Session/channels.jl | 243 +++++++++++++++++++--------------------- test/client.jl | 20 ++-- test/runtests.jl | 7 +- 4 files changed, 146 insertions(+), 151 deletions(-) diff --git a/src/Session.jl b/src/Session.jl index 7d437ef..2813fa5 100644 --- a/src/Session.jl +++ b/src/Session.jl @@ -88,19 +88,24 @@ mutable struct HTTPConnection ## actions -> channel_act -> channel_act_raw -> io ## io -> channel_evt_raw -> channel_evt -> events + act_processor::Union{Nothing,Task} + act_raw_processor::Union{Nothing,Task} + evt_raw_processor::Union{Nothing,Task} + evt_processor::Union{Nothing,Task} + + function HTTPConnection(isclient) + new(HPack.DynamicTable(), + Vector{HTTPStream}(), + 1024, + isclient, + isclient ? 1 : 2, + HTTPSettings(), + false, + Channel(DEFAULT_CHANNEL_SZ), Channel(DEFAULT_CHANNEL_SZ), Channel(DEFAULT_CHANNEL_SZ), Channel(DEFAULT_CHANNEL_SZ), + nothing, nothing, nothing, nothing) + end end -HTTPConnection(isclient) = HTTPConnection(HPack.DynamicTable(), - Array{HTTPStream, 1}(), - 65535, - isclient, - isclient ? 1 : 2, - HTTPSettings(), - false, - Channel(DEFAULT_CHANNEL_SZ), - Channel(DEFAULT_CHANNEL_SZ), - Channel(DEFAULT_CHANNEL_SZ), - Channel(DEFAULT_CHANNEL_SZ)) function next_free_stream_identifier(connection::HTTPConnection) return connection.last_stream_identifier + 2 diff --git a/src/Session/channels.jl b/src/Session/channels.jl index d4161ed..42c444c 100644 --- a/src/Session/channels.jl +++ b/src/Session/channels.jl @@ -14,128 +14,129 @@ function initialize_raw_loop_async(connection::HTTPConnection, buffer; skip_pref end end - @async begin + connection.evt_raw_processor = @async begin try - while true - if connection.closed - break - end - - if eof(buffer) - connection.closed = true - continue - end + while true + if connection.closed + break + end - frame = try - Frame.decode(buffer) - catch - goaway!(connection, ProtocolError("Decode error.")) - break - end + if eof(buffer) + connection.closed = true + continue + end - # Abstract atom headers frame away - if typeof(frame) == HeadersFrame || typeof(frame) == PushPromiseFrame - continuations = Array{ContinuationFrame, 1}() - while !(frame.is_end_headers || - (!frame.is_end_headers && length(continuations) == 0) || - continuations[length(continuations)].is_end_headers) - - continuation = try - Frame.decode(buffer) - catch - goaway!(connection, ProtocolError("Decode error.")) - break - end + frame = try + Frame.decode(buffer) + catch + goaway!(connection, ProtocolError("Decode error.")) + break + end - if !(typeof(continuation) == ContinuationFrame && - continuation.stream_identifier == frame.stream_identifier) - goaway!(connection, ProtocolError("Headers must be followed by continuations if it is not the end.")) + # Abstract atom headers frame away + if typeof(frame) == HeadersFrame || typeof(frame) == PushPromiseFrame + continuations = Array{ContinuationFrame, 1}() + while !(frame.is_end_headers || + (!frame.is_end_headers && length(continuations) == 0) || + continuations[length(continuations)].is_end_headers) + + continuation = try + Frame.decode(buffer) + catch + goaway!(connection, ProtocolError("Decode error.")) + break + end + + if !(typeof(continuation) == ContinuationFrame && + continuation.stream_identifier == frame.stream_identifier) + goaway!(connection, ProtocolError("Headers must be followed by continuations if it is not the end.")) + end + push!(continuations, continuation) end - push!(continuations, continuation) - end - fragment = copy(frame.fragment) - if length(continuations) > 0 - for i = 1:length(continuations) - append!(fragment, continuations[i].fragment) + fragment = copy(frame.fragment) + if length(continuations) > 0 + for i = 1:length(continuations) + append!(fragment, continuations[i].fragment) + end end - end - if typeof(frame) == HeadersFrame - put!(channel_evt_raw, HeadersFrame(frame.is_end_stream, - true, - frame.is_priority, - frame.stream_identifier, - frame.exclusive, - frame.dependent_stream_identifier, - frame.weight, - fragment)) - else - put!(channel_evt_raw, PushPromiseFrame(true, + if typeof(frame) == HeadersFrame + put!(channel_evt_raw, HeadersFrame(frame.is_end_stream, + true, + frame.is_priority, frame.stream_identifier, - frame.promised_stream_identifier, + frame.exclusive, + frame.dependent_stream_identifier, + frame.weight, fragment)) - end - else - put!(channel_evt_raw, frame) + else + put!(channel_evt_raw, PushPromiseFrame(true, + frame.stream_identifier, + frame.promised_stream_identifier, + fragment)) + end + else + put!(channel_evt_raw, frame) - if typeof(frame) == DataFrame && !frame.is_end_stream - put!(channel_act_raw, WindowUpdateFrame(0, length(frame.data))) - put!(channel_act_raw, WindowUpdateFrame(frame.stream_identifier, length(frame.data))) - end + if typeof(frame) == DataFrame && !frame.is_end_stream + put!(channel_act_raw, WindowUpdateFrame(0, length(frame.data))) + put!(channel_act_raw, WindowUpdateFrame(frame.stream_identifier, length(frame.data))) + end - if typeof(frame) == GoawayFrame - connection.closed = true + if typeof(frame) == GoawayFrame + connection.closed = true + end end end - end catch ex - @info("Got an exception $ex") + @info("Exception in connection.evt_raw_processor $ex") rethrow(ex) end end - @async begin + connection.act_raw_processor = @async begin try - while true - if connection.closed - break - end - - frame = take!(channel_act_raw) - - if typeof(frame) == HeadersFrame || typeof(frame) == PushPromiseFrame - write(buffer, Frame.encode(frame)) - while !frame.is_end_headers - continuation = take!(channel_act_raw) - if !(typeof(continuation) == ContinuationFrame && - continuation.stream_identifier == frame.stream_identifier) - goaway!(connection, InternalError("Headers must be followed by continuations if it is not the end.")) - end - write(buffer, Frame.encode(continuation)) + while true + if connection.closed + break end - else - encoded = Frame.encode(frame) - - if typeof(frame) == DataFrame - stream = get_stream(connection, frame.stream_identifier) - if stream.window_size < length(encoded) - put!(channel_act_raw, frame) - continue + + frame = take!(channel_act_raw) + + if typeof(frame) == HeadersFrame || typeof(frame) == PushPromiseFrame + write(buffer, Frame.encode(frame)) + while !frame.is_end_headers + continuation = take!(channel_act_raw) + if !(typeof(continuation) == ContinuationFrame && + continuation.stream_identifier == frame.stream_identifier) + goaway!(connection, InternalError("Headers must be followed by continuations if it is not the end.")) + end + write(buffer, Frame.encode(continuation)) end + else + encoded = Frame.encode(frame) - stream.window_size -= length(encoded) - end + if typeof(frame) == DataFrame + stream = get_stream(connection, frame.stream_identifier) + if stream.window_size < length(encoded) + put!(channel_act_raw, frame) + continue + end - write(buffer, encoded) + stream.window_size -= length(encoded) + end - if typeof(frame) == GoawayFrame - connection.closed = true + write(buffer, encoded) + + if typeof(frame) == GoawayFrame + @info("wrote a goaway frame") + connection.closed = true + end end end - end catch ex - @info("Got an exception $ex") + @info("Exception in connection.act_raw_processor $ex") rethrow(ex) end end @@ -212,8 +213,7 @@ function process_channel_evt(connection::HTTPConnection) handle_setting!(connection, parameters[i][1], parameters[i][2]) end end - put!(channel_act_raw, - SettingsFrame(true, Array{Tuple{Frame.SETTING_IDENTIFIER, UInt32}, 1}())) + put!(channel_act_raw, SettingsFrame(true, Array{Tuple{Frame.SETTING_IDENTIFIER, UInt32}, 1}())) end return end @@ -223,6 +223,7 @@ function process_channel_evt(connection::HTTPConnection) end if typeof(frame) == GoawayFrame @assert frame.error_code == 0x0 + put!(channel_evt, EvtGoaway()) return end if typeof(frame) == WindowUpdateFrame && frame.stream_identifier == 0x0 @@ -263,47 +264,33 @@ function process_channel_evt(connection::HTTPConnection) @assert false end -function select(waitset::Array) - c = Channel(length(waitset)) - for w in waitset - @async begin - try - put!(c, (wait(w); w)) - catch ex - @info("Got an exception $ex") - rethrow(ex) +function initialize_in_loop_async(connection::HTTPConnection) + connection.act_processor = @async begin + try + while !connection.closed + process_channel_act(connection) end + put!(channel_evt, EvtGoaway()) + catch ex + @info("Exception in connection.act_processor $ex") + rethrow(ex) end end - take!(c) -end - -function initialize_loop_async(connection::HTTPConnection, buffer; skip_preface=false) - initialize_raw_loop_async(connection, buffer; skip_preface=skip_preface) - channel_act_raw = connection.channel_act_raw - channel_act = connection.channel_act - channel_evt_raw = connection.channel_evt_raw - channel_evt = connection.channel_evt - - @async begin + connection.evt_raw_processor = @async begin try - while true - if connection.closed - put!(channel_evt, EvtGoaway()) - break - end - - c = select([channel_evt_raw, channel_act]) - if c == channel_evt_raw + while !connection.closed # || (connection.closed && isready(connection.channel_evt_raw)) process_channel_evt(connection) - else - process_channel_act(connection) end - end catch ex - @info("Got an exception $ex") + @info("Exception in connection.evt_raw_processor $ex") rethrow(ex) end end + nothing +end + +function initialize_loop_async(connection::HTTPConnection, buffer; skip_preface=false) + initialize_raw_loop_async(connection, buffer; skip_preface=skip_preface) + initialize_in_loop_async(connection) end diff --git a/test/client.jl b/test/client.jl index 3bd2d52..7a4eaf3 100644 --- a/test/client.jl +++ b/test/client.jl @@ -37,14 +37,18 @@ function test_request(dest, port, url, certhostname=nothing) ("accept-encoding", "gzip, deflate"), ("user-agent", "HTTP2.jl")] - for req_id in 1:3 - @info("Sending request", req_id) - HTTP2.Session.put_act!(connection, HTTP2.Session.ActSendHeaders(HTTP2.Session.next_free_stream_identifier(connection), headers, true)) - resp_headers = HTTP2.Session.take_evt!(connection).headers - @show resp_headers - resp_data = HTTP2.Session.take_evt!(connection).data - @show resp_data - show_response(resp_headers, resp_data) + @sync begin + @async for req_id in 1:3 + @info("Sending request", req_id) + HTTP2.Session.put_act!(connection, HTTP2.Session.ActSendHeaders(HTTP2.Session.next_free_stream_identifier(connection), headers, true)) + end + @async for resp_id in 1:3 + resp_headers = HTTP2.Session.take_evt!(connection).headers + @show resp_headers + resp_data = HTTP2.Session.take_evt!(connection).data + @show resp_data + show_response(resp_headers, resp_data) + end end @info("Closing connection", conn_id) diff --git a/test/runtests.jl b/test/runtests.jl index 1ff0a3a..aecd584 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -25,9 +25,9 @@ function run_test(script, args...) end @async run_test("server.jl") -sleep(20) +sleep(10) run_test("client.jl") -sleep(20) +sleep(10) GENKEYSCRIPT = joinpath(dirname(@__FILE__), "genkey.sh") HOSTNAME = gethostname() @@ -36,6 +36,5 @@ run(`$GENKEYSCRIPT $HOSTNAME`) keyfile = joinpath(dirname(@__FILE__), "$HOSTNAME.key") certfile = joinpath(dirname(@__FILE__), "$HOSTNAME.cert") @async run_test("server.jl", certfile, keyfile) -sleep(20) +sleep(10) run_test("client.jl", HOSTNAME) -sleep(20) From cf18e696d564ec78a6f39e46f3b73ddf243aba07 Mon Sep 17 00:00:00 2001 From: tan Date: Wed, 9 Jan 2019 10:06:44 +0530 Subject: [PATCH 14/19] fix --- src/HTTP2.jl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/HTTP2.jl b/src/HTTP2.jl index 9998e91..60ab73a 100644 --- a/src/HTTP2.jl +++ b/src/HTTP2.jl @@ -12,7 +12,7 @@ readallbytes(s, nbytes) = read(s, nbytes) function readallbytes(s::MbedTLS.SSLContext, nbytes) finalbuf = Vector{UInt8}(undef, nbytes) lfinal = 0 - while lfinal < nbytes + while (lfinal < nbytes) && !eof(s) toread = nbytes - lfinal buf = Vector{UInt8}(undef, toread) readbytes!(s, buf, toread) From 429e964ee3c0ea70e65171dcef469ee8027b88d1 Mon Sep 17 00:00:00 2001 From: tan Date: Wed, 9 Jan 2019 09:56:13 +0530 Subject: [PATCH 15/19] temp test --- src/HTTP2.jl | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/HTTP2.jl b/src/HTTP2.jl index 60ab73a..139e823 100644 --- a/src/HTTP2.jl +++ b/src/HTTP2.jl @@ -15,12 +15,16 @@ function readallbytes(s::MbedTLS.SSLContext, nbytes) while (lfinal < nbytes) && !eof(s) toread = nbytes - lfinal buf = Vector{UInt8}(undef, toread) + @show("trying to read $toread bytes") readbytes!(s, buf, toread) + @show("read $(length(buf)) bytes") nread = length(buf) if nread > 0 copyto!(finalbuf, lfinal+1, buf) lfinal += nread else + @show nread + sleep(0.5) yield() end end From 8e577df91b309033cf17e332ea80dfa10fca2d6b Mon Sep 17 00:00:00 2001 From: tan Date: Wed, 9 Jan 2019 10:31:47 +0530 Subject: [PATCH 16/19] fix --- src/HTTP2.jl | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/src/HTTP2.jl b/src/HTTP2.jl index 139e823..b7cf05c 100644 --- a/src/HTTP2.jl +++ b/src/HTTP2.jl @@ -13,12 +13,15 @@ function readallbytes(s::MbedTLS.SSLContext, nbytes) finalbuf = Vector{UInt8}(undef, nbytes) lfinal = 0 while (lfinal < nbytes) && !eof(s) - toread = nbytes - lfinal - buf = Vector{UInt8}(undef, toread) - @show("trying to read $toread bytes") - readbytes!(s, buf, toread) - @show("read $(length(buf)) bytes") - nread = length(buf) + toread = min(nbytes - lfinal, bytesavailable(s)) + if toread > 0 + buf = Vector{UInt8}(undef, toread) + @show("trying to read $toread bytes") + nread = readbytes!(s, buf, toread) + @show("read $nread bytes") + else + nread = 0 + end if nread > 0 copyto!(finalbuf, lfinal+1, buf) lfinal += nread From fc1bbcc35dc071ccbc718af1110030afa03c559b Mon Sep 17 00:00:00 2001 From: tan Date: Wed, 9 Jan 2019 11:46:49 +0530 Subject: [PATCH 17/19] debug --- src/Frame.jl | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/Frame.jl b/src/Frame.jl index 0882d06..51b4f01 100644 --- a/src/Frame.jl +++ b/src/Frame.jl @@ -58,6 +58,8 @@ include("Frame/continuation.jl") function decode(buf) header = decode_header(buf) + @show header.typ + @show header.length payload = readallbytes(buf, header.length) @assert length(payload) == header.length From 6fd89316a39da610dac69f36dd3b86d70690c7e0 Mon Sep 17 00:00:00 2001 From: tan Date: Wed, 9 Jan 2019 12:01:01 +0530 Subject: [PATCH 18/19] fix --- src/Frame.jl | 1 + 1 file changed, 1 insertion(+) diff --git a/src/Frame.jl b/src/Frame.jl index 51b4f01..ed84648 100644 --- a/src/Frame.jl +++ b/src/Frame.jl @@ -57,6 +57,7 @@ include("Frame/window_update.jl") include("Frame/continuation.jl") function decode(buf) + @show "trying to read header" header = decode_header(buf) @show header.typ @show header.length From d04dd51f18deeba39b47b3931f0fa7f245627026 Mon Sep 17 00:00:00 2001 From: tan Date: Wed, 9 Jan 2019 12:20:33 +0530 Subject: [PATCH 19/19] debug --- src/Session/channels.jl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Session/channels.jl b/src/Session/channels.jl index 42c444c..bbc7ea2 100644 --- a/src/Session/channels.jl +++ b/src/Session/channels.jl @@ -120,6 +120,7 @@ function initialize_raw_loop_async(connection::HTTPConnection, buffer; skip_pref if typeof(frame) == DataFrame stream = get_stream(connection, frame.stream_identifier) if stream.window_size < length(encoded) + @show("putting back as window_size=$(stream.window_size), len=$(length(encoded))") put!(channel_act_raw, frame) continue end @@ -130,7 +131,6 @@ function initialize_raw_loop_async(connection::HTTPConnection, buffer; skip_pref write(buffer, encoded) if typeof(frame) == GoawayFrame - @info("wrote a goaway frame") connection.closed = true end end