Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion lib/em-websocket/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ class Connection < EventMachine::Connection
include Debugger

attr_writer :max_frame_size
attr_accessor :f_permessage_deflate

# define WebSocket callbacks
def onopen(&blk); @onopen = blk; end
Expand Down Expand Up @@ -116,7 +117,7 @@ def dispatch(data)
send_flash_cross_domain_file
else
@handshake ||= begin
handshake = Handshake.new(@secure || @secure_proxy)
handshake = Handshake.new(@secure || @secure_proxy, self)

handshake.callback { |upgrade_response, handler_klass|
debug [:accepting_ws_version, handshake.protocol_version]
Expand Down
72 changes: 71 additions & 1 deletion lib/em-websocket/framing07.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ def process_data
fin = (@data.getbyte(pointer) & 0b10000000) == 0b10000000
# Ignoring rsv1-3 for now
opcode = @data.getbyte(pointer) & 0b00001111
f_rsv1 = (@data.getbyte(pointer) & 0b01000000) == 0b01000000
debug ['f_rsv1', f_rsv1]
pointer += 1

mask = (@data.getbyte(pointer) & 0b10000000) == 0b10000000
Expand Down Expand Up @@ -111,11 +113,16 @@ def process_data
# Message is complete
if frame_type == :continuation
@application_data_buffer << application_data
unpack_deflate!(@application_data_buffer) if f_rsv1 && @frame_type == :text
message(@frame_type, '', @application_data_buffer)
@application_data_buffer = ''
@frame_type = nil
else
unpack_deflate!(application_data) if f_rsv1 && frame_type == :text
message(frame_type, '', application_data)
if frame_type == :close
on_close
end
end
end
end # end while
Expand All @@ -129,11 +136,33 @@ def send_frame(frame_type, application_data)
end

frame = ''
debug ['@connection.f_permessage_deflate', @connection.f_permessage_deflate]
f_do_message_deflate = frame_type == :text && @connection.f_permessage_deflate

opcode = type_to_opcode(frame_type)
byte1 = opcode | 0b10000000 # fin bit set, rsv1-3 are 0
if f_do_message_deflate
byte1 |= 0b01000000 # set rsv1
end
frame << byte1

if f_do_message_deflate

@deflate ||= begin
min_window_bits = 9
max_window_bits = 15
level = Zlib::DEFAULT_COMPRESSION
mem_level = Zlib::DEF_MEM_LEVEL
strategy = Zlib::DEFAULT_STRATEGY
Zlib::Deflate.new(level, -max_window_bits, mem_level, strategy)
end

debug ['application_data.length', application_data.length]
compressed_data = @deflate.deflate(application_data, Zlib::SYNC_FLUSH)[0...-4]
debug ['compressed_data.length', compressed_data.length]
application_data = compressed_data
end

length = application_data.size
if length <= 125
byte2 = length # since rsv4 is 0
Expand All @@ -155,7 +184,48 @@ def send_text_frame(data)
send_frame(:text, data)
end

private
private


def unpack_deflate!(msg)
debug ['(unpack_deflate!) msg.length', msg.length]

@inflate ||= begin
min_window_bits = 9
max_window_bits = 15
peer_window_bits = max_window_bits
window_bits = [peer_window_bits, min_window_bits].max
level = Zlib::DEFAULT_COMPRESSION
mem_level = Zlib::DEF_MEM_LEVEL
strategy = Zlib::DEFAULT_STRATEGY
Zlib::Inflate.new(-window_bits)
end

msg.replace @inflate.inflate(msg) + @inflate.inflate([0x00, 0x00, 0xff, 0xff].pack('C*'))
debug ['unpacked data length', msg.length]
end

def on_close
debug ['(framing) close zlib on close']
if @inflate
close_zlib @inflate
@inflate = nil
end
if @deflate
close_zlib @deflate
@deflate = nil
end
end

def on_unbind
on_close
end

def close_zlib(zlib)
zlib.finish rescue nil
zlib.close
end


FRAME_TYPES = {
:continuation => 0,
Expand Down
5 changes: 5 additions & 0 deletions lib/em-websocket/handler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,15 @@ def fail_websocket(e)
end

def unbind
debug ['unbind']
@state = :closed

@close_timer.cancel if @close_timer

if defined? on_unbind
on_unbind
end

@close_info = defined?(@close_info) ? @close_info : {
:code => 1006,
:was_clean => false,
Expand Down
5 changes: 3 additions & 2 deletions lib/em-websocket/handshake.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@ class Handshake
# Unfortunately drafts 75 & 76 require knowledge of whether the
# connection is being terminated as ws/wss in order to generate the
# correct handshake response
def initialize(secure)
def initialize(secure, connection)
@parser = Http::Parser.new
@secure = secure
@connection = connection

@parser.on_headers_complete = proc { |headers|
@headers = Hash[headers.map { |k,v| [k.downcase, v] }]
Expand Down Expand Up @@ -142,7 +143,7 @@ def process(headers, remains)
raise HandshakeError, "Protocol version #{version} not supported"
end

upgrade_response = handshake_klass.handshake(@headers, @parser.request_url, @secure)
upgrade_response = handshake_klass.handshake(@headers, @parser.request_url, @secure, @connection)

handler_klass = Handler.klass_factory(version)

Expand Down
10 changes: 9 additions & 1 deletion lib/em-websocket/handshake04.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
module EventMachine
module WebSocket
module Handshake04
def self.handshake(headers, _, __)
def self.handshake(headers, _, __, connection)
# Required
unless key = headers['sec-websocket-key']
raise HandshakeError, "sec-websocket-key header is required"
Expand All @@ -22,6 +22,14 @@ def self.handshake(headers, _, __)
upgrade << "Sec-WebSocket-Protocol: #{protocol}"
end

if headers['sec-websocket-extensions'] \
&& headers['sec-websocket-extensions'].include?('permessage-deflate') \
&& (headers['accept-encoding'].include?('gzip') || headers['accept-encoding'].include?('deflate'))

upgrade << "Sec-WebSocket-Extensions: permessage-deflate"
connection.f_permessage_deflate = true
end

# TODO: Support sec-websocket-protocol selection
# TODO: sec-websocket-extensions

Expand Down