diff --git a/lib/em-websocket/connection.rb b/lib/em-websocket/connection.rb index a55881d..6ebffc1 100644 --- a/lib/em-websocket/connection.rb +++ b/lib/em-websocket/connection.rb @@ -4,6 +4,7 @@ class Connection < EventMachine::Connection include Debugger attr_writer :max_frame_size + attr_accessor :f_permessage_deflate # define WebSocket callbacks def onopen(&blk); @onopen = blk; end @@ -116,7 +117,7 @@ def dispatch(data) send_flash_cross_domain_file else @handshake ||= begin - handshake = Handshake.new(@secure || @secure_proxy) + handshake = Handshake.new(@secure || @secure_proxy, self) handshake.callback { |upgrade_response, handler_klass| debug [:accepting_ws_version, handshake.protocol_version] diff --git a/lib/em-websocket/framing07.rb b/lib/em-websocket/framing07.rb index 58318e4..9eb644a 100644 --- a/lib/em-websocket/framing07.rb +++ b/lib/em-websocket/framing07.rb @@ -19,6 +19,8 @@ def process_data fin = (@data.getbyte(pointer) & 0b10000000) == 0b10000000 # Ignoring rsv1-3 for now opcode = @data.getbyte(pointer) & 0b00001111 + f_rsv1 = (@data.getbyte(pointer) & 0b01000000) == 0b01000000 + debug ['f_rsv1', f_rsv1] pointer += 1 mask = (@data.getbyte(pointer) & 0b10000000) == 0b10000000 @@ -111,11 +113,16 @@ def process_data # Message is complete if frame_type == :continuation @application_data_buffer << application_data + unpack_deflate!(@application_data_buffer) if f_rsv1 && @frame_type == :text message(@frame_type, '', @application_data_buffer) @application_data_buffer = '' @frame_type = nil else + unpack_deflate!(application_data) if f_rsv1 && frame_type == :text message(frame_type, '', application_data) + if frame_type == :close + on_close + end end end end # end while @@ -129,11 +136,33 @@ def send_frame(frame_type, application_data) end frame = '' + debug ['@connection.f_permessage_deflate', @connection.f_permessage_deflate] + f_do_message_deflate = frame_type == :text && @connection.f_permessage_deflate opcode = type_to_opcode(frame_type) byte1 = opcode | 0b10000000 # fin bit set, rsv1-3 are 0 + if f_do_message_deflate + byte1 |= 0b01000000 # set rsv1 + end frame << byte1 + if f_do_message_deflate + + @deflate ||= begin + min_window_bits = 9 + max_window_bits = 15 + level = Zlib::DEFAULT_COMPRESSION + mem_level = Zlib::DEF_MEM_LEVEL + strategy = Zlib::DEFAULT_STRATEGY + Zlib::Deflate.new(level, -max_window_bits, mem_level, strategy) + end + + debug ['application_data.length', application_data.length] + compressed_data = @deflate.deflate(application_data, Zlib::SYNC_FLUSH)[0...-4] + debug ['compressed_data.length', compressed_data.length] + application_data = compressed_data + end + length = application_data.size if length <= 125 byte2 = length # since rsv4 is 0 @@ -155,7 +184,48 @@ def send_text_frame(data) send_frame(:text, data) end - private + private + + + def unpack_deflate!(msg) + debug ['(unpack_deflate!) msg.length', msg.length] + + @inflate ||= begin + min_window_bits = 9 + max_window_bits = 15 + peer_window_bits = max_window_bits + window_bits = [peer_window_bits, min_window_bits].max + level = Zlib::DEFAULT_COMPRESSION + mem_level = Zlib::DEF_MEM_LEVEL + strategy = Zlib::DEFAULT_STRATEGY + Zlib::Inflate.new(-window_bits) + end + + msg.replace @inflate.inflate(msg) + @inflate.inflate([0x00, 0x00, 0xff, 0xff].pack('C*')) + debug ['unpacked data length', msg.length] + end + + def on_close + debug ['(framing) close zlib on close'] + if @inflate + close_zlib @inflate + @inflate = nil + end + if @deflate + close_zlib @deflate + @deflate = nil + end + end + + def on_unbind + on_close + end + + def close_zlib(zlib) + zlib.finish rescue nil + zlib.close + end + FRAME_TYPES = { :continuation => 0, diff --git a/lib/em-websocket/handler.rb b/lib/em-websocket/handler.rb index 98cdd4e..e8af95a 100644 --- a/lib/em-websocket/handler.rb +++ b/lib/em-websocket/handler.rb @@ -71,10 +71,15 @@ def fail_websocket(e) end def unbind + debug ['unbind'] @state = :closed @close_timer.cancel if @close_timer + if defined? on_unbind + on_unbind + end + @close_info = defined?(@close_info) ? @close_info : { :code => 1006, :was_clean => false, diff --git a/lib/em-websocket/handshake.rb b/lib/em-websocket/handshake.rb index 12b7904..6fab3f4 100644 --- a/lib/em-websocket/handshake.rb +++ b/lib/em-websocket/handshake.rb @@ -13,9 +13,10 @@ class Handshake # Unfortunately drafts 75 & 76 require knowledge of whether the # connection is being terminated as ws/wss in order to generate the # correct handshake response - def initialize(secure) + def initialize(secure, connection) @parser = Http::Parser.new @secure = secure + @connection = connection @parser.on_headers_complete = proc { |headers| @headers = Hash[headers.map { |k,v| [k.downcase, v] }] @@ -142,7 +143,7 @@ def process(headers, remains) raise HandshakeError, "Protocol version #{version} not supported" end - upgrade_response = handshake_klass.handshake(@headers, @parser.request_url, @secure) + upgrade_response = handshake_klass.handshake(@headers, @parser.request_url, @secure, @connection) handler_klass = Handler.klass_factory(version) diff --git a/lib/em-websocket/handshake04.rb b/lib/em-websocket/handshake04.rb index 7600787..d3f7604 100644 --- a/lib/em-websocket/handshake04.rb +++ b/lib/em-websocket/handshake04.rb @@ -4,7 +4,7 @@ module EventMachine module WebSocket module Handshake04 - def self.handshake(headers, _, __) + def self.handshake(headers, _, __, connection) # Required unless key = headers['sec-websocket-key'] raise HandshakeError, "sec-websocket-key header is required" @@ -22,6 +22,14 @@ def self.handshake(headers, _, __) upgrade << "Sec-WebSocket-Protocol: #{protocol}" end + if headers['sec-websocket-extensions'] \ + && headers['sec-websocket-extensions'].include?('permessage-deflate') \ + && (headers['accept-encoding'].include?('gzip') || headers['accept-encoding'].include?('deflate')) + + upgrade << "Sec-WebSocket-Extensions: permessage-deflate" + connection.f_permessage_deflate = true + end + # TODO: Support sec-websocket-protocol selection # TODO: sec-websocket-extensions