Skip to content

Commit a6edb07

Browse files
committed
Add abstraction Payload to use in Publish
1 parent eb66e5b commit a6edb07

File tree

6 files changed

+187
-13
lines changed

6 files changed

+187
-13
lines changed

spec/io_spec.cr

+1-1
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ describe MQTT::Protocol::IO do
8484
mio = IO::Memory.new
8585
io = MQTT::Protocol::IO.new(mio)
8686

87-
io.write_bytes_raw bytes
87+
io.write bytes
8888
mio.rewind
8989

9090
res = Bytes.new(3)

spec/packets_spec.cr

+9-9
Original file line numberDiff line numberDiff line change
@@ -254,13 +254,13 @@ describe MQTT::Protocol::Packet do
254254
io = MQTT::Protocol::IO.new(mio)
255255

256256
topic = "a/b/c"
257-
payload = "foobar and barfoo".to_slice
257+
payload = MQTT::Protocol::Payload.new("foobar and barfoo".to_slice)
258258
remaining_length = topic.bytesize + payload.size + 2 # 2 = sizeof topic len
259259

260260
io.write_byte 0b00110000u8
261261
io.write_remaining_length remaining_length
262262
io.write_string topic
263-
io.write_bytes_raw payload
263+
io.write_bytes payload
264264

265265
mio.rewind
266266

@@ -282,7 +282,7 @@ describe MQTT::Protocol::Packet do
282282
io.write_byte 0b00111000u8
283283
io.write_remaining_length remaining_length
284284
io.write_string topic
285-
io.write_bytes_raw payload
285+
io.write payload
286286

287287
mio.rewind
288288

@@ -302,7 +302,7 @@ describe MQTT::Protocol::Packet do
302302
io.write_byte 0b00111000u8
303303
io.write_remaining_length remaining_length
304304
io.write_string topic
305-
io.write_bytes_raw payload
305+
io.write payload
306306

307307
mio.rewind
308308

@@ -318,7 +318,7 @@ describe MQTT::Protocol::Packet do
318318
io = MQTT::Protocol::IO.new(mio)
319319

320320
topic = "a/b/c"
321-
payload = "foobar and barfoo".to_slice
321+
payload = MQTT::Protocol::Payload.new("foobar and barfoo".to_slice)
322322
packet_id = 100u16
323323
publish = MQTT::Protocol::Publish.new(topic, payload, packet_id, false, 1, false)
324324
publish.to_io(io)
@@ -335,7 +335,7 @@ describe MQTT::Protocol::Packet do
335335

336336
it "raises error if dup is set for QoS 0 messages" do
337337
topic = "a/b/c"
338-
payload = "foobar and barfoo".to_slice
338+
payload = MQTT::Protocol::Payload.new("foobar and barfoo".to_slice)
339339
packet_id = 100u16
340340
expect_raises(ArgumentError) do
341341
MQTT::Protocol::Publish.new(topic, payload, packet_id, true, 0, false)
@@ -347,7 +347,7 @@ describe MQTT::Protocol::Packet do
347347
io = MQTT::Protocol::IO.new(mio)
348348

349349
topic = "a/b/c"
350-
payload = "foobar and barfoo".to_slice
350+
payload = MQTT::Protocol::Payload.new("foobar and barfoo".to_slice)
351351
packet_id = 100u16
352352
publish = MQTT::Protocol::Publish.new(topic, payload, packet_id, false, 0, false)
353353
publish.to_io(io)
@@ -365,7 +365,7 @@ describe MQTT::Protocol::Packet do
365365
describe "#initialize" do
366366
it "raises an error if QoS is 3" do
367367
topic = "a/b/c"
368-
payload = "foobar and barfoo".to_slice
368+
payload = MQTT::Protocol::Payload.new("foobar and barfoo".to_slice)
369369
packet_id = 100u16
370370
expect_raises(ArgumentError) do
371371
MQTT::Protocol::Publish.new(topic, payload, packet_id, false, 3, false)
@@ -375,7 +375,7 @@ describe MQTT::Protocol::Packet do
375375
describe "with wildcard in topic" do
376376
it "should raise ArguementError" do
377377
topic = "a/#"
378-
payload = "foobar and barfoo".to_slice
378+
payload = MQTT::Protocol::Payload.new("foobar and barfoo".to_slice)
379379
packet_id = 100u16
380380

381381
expect_raises(ArgumentError) do

spec/payload_spec.cr

+77
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
require "./spec_helper"
2+
3+
describe MQTT::Protocol::Payload do
4+
it ".new(Bytes) returns a BytesPayload" do
5+
obj = MQTT::Protocol::Payload.new("foo".to_slice)
6+
obj.should be_a(MQTT::Protocol::BytesPayload)
7+
end
8+
9+
it ".new(IO) returns a IOPayload" do
10+
io = IO::Memory.new
11+
io.write "foo".to_slice
12+
obj = MQTT::Protocol::Payload.new(io, 3)
13+
obj.should be_a(MQTT::Protocol::IOPayload)
14+
end
15+
16+
describe "#==" do
17+
it "should return true for two BytePayload with same bytes" do
18+
one = MQTT::Protocol::BytesPayload.new("foo".to_slice)
19+
two = MQTT::Protocol::BytesPayload.new("foo".to_slice)
20+
21+
(one == two).should be_true
22+
end
23+
24+
it "should return false for two BytePayload with different bytes" do
25+
one = MQTT::Protocol::BytesPayload.new("foo".to_slice)
26+
two = MQTT::Protocol::BytesPayload.new("bar".to_slice)
27+
28+
(one == two).should be_false
29+
end
30+
31+
it "should return true for two IOPayload with same content" do
32+
io_one = IO::Memory.new("foo".to_slice)
33+
io_two = IO::Memory.new("foo".to_slice)
34+
35+
io_one.rewind
36+
io_two.rewind
37+
38+
one = MQTT::Protocol::IOPayload.new(io_one, 3)
39+
two = MQTT::Protocol::IOPayload.new(io_two, 3)
40+
41+
(one == two).should be_true
42+
end
43+
44+
it "should return false for two IOPayload with different content" do
45+
io_one = IO::Memory.new("foo".to_slice)
46+
io_two = IO::Memory.new("bar".to_slice)
47+
48+
io_one.rewind
49+
io_two.rewind
50+
51+
one = MQTT::Protocol::IOPayload.new(io_one, 3)
52+
two = MQTT::Protocol::IOPayload.new(io_two, 3)
53+
54+
(one == two).should be_false
55+
end
56+
57+
it "should return true for one BytesPayload and one IOPayload with same content" do
58+
io_two = IO::Memory.new("foo".to_slice)
59+
io_two.rewind
60+
61+
one = MQTT::Protocol::BytesPayload.new("foo".to_slice)
62+
two = MQTT::Protocol::IOPayload.new(io_two, 3)
63+
64+
(one == two).should be_true
65+
end
66+
67+
it "should return false for one BytesPayload and one IOPayload with different content" do
68+
io_two = IO::Memory.new("bar".to_slice)
69+
io_two.rewind
70+
71+
one = MQTT::Protocol::BytesPayload.new("foo".to_slice)
72+
two = MQTT::Protocol::IOPayload.new(io_two, 3)
73+
74+
(one == two).should be_false
75+
end
76+
end
77+
end

src/mqtt/protocol/io.cr

+1
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ module MQTT
6868
@io.write bytes
6969
end
7070

71+
@[Deprecated("Use write_bytes instead")]
7172
def write_bytes_raw(bytes : Bytes)
7273
@io.write bytes
7374
end

src/mqtt/protocol/packets/publish.cr

+17-3
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
require "../payload"
12
require "./packets"
23

34
module MQTT
@@ -8,7 +9,7 @@ module MQTT
89
getter topic, payload, qos, packet_id, remaining_length
910
getter? dup, retain
1011

11-
def initialize(@topic : String, @payload : Bytes, @packet_id : UInt16?, @dup : Bool, @qos : UInt8, @retain : Bool)
12+
def initialize(@topic : String, @payload : Payload, @packet_id : UInt16?, @dup : Bool, @qos : UInt8, @retain : Bool)
1213
raise ArgumentError.new("QoS must be 0, 1 or 2") if @qos > 2
1314
raise ArgumentError.new("Topic cannot contain wildcard") if @topic.matches?(/[#+]/)
1415
raise ArgumentError.new("Topic must be between atleast 1 char long") if @topic.size < 1
@@ -19,6 +20,19 @@ module MQTT
1920
@remaining_length += 2 if qos.positive? # packet_id
2021
end
2122

23+
@[Deprecated("Use Payload instead of Bytes for @payload")]
24+
def initialize(@topic : String, payload : Bytes, @packet_id : UInt16?, @dup : Bool, @qos : UInt8, @retain : Bool)
25+
raise ArgumentError.new("QoS must be 0, 1 or 2") if @qos > 2
26+
raise ArgumentError.new("Topic cannot contain wildcard") if @topic.matches?(/[#+]/)
27+
raise ArgumentError.new("Topic must be between atleast 1 char long") if @topic.size < 1
28+
raise ArgumentError.new("Topic cannot be larger than 65535 bytes") if @topic.bytesize > 65535
29+
raise ArgumentError.new("DUP must be 0 for QoS 0 messages") if dup? && qos.zero?
30+
@payload = BytesPayload.new(payload)
31+
@remaining_length = 0
32+
@remaining_length += (2 + topic.bytesize) + payload.bytesize
33+
@remaining_length += 2 if qos.positive? # packet_id
34+
end
35+
2236
def self.from_io(io : MQTT::Protocol::IO, flags : Flags, remaining_length : UInt32)
2337
dup = flags.bit(3) > 0
2438
retain = flags.bit(0) > 0
@@ -32,7 +46,7 @@ module MQTT
3246
else
3347
decode_assert dup == false, "DUP must be 0 for QoS 0 messages"
3448
end
35-
payload = io.read_bytes(remaining_length.to_u16)
49+
payload = IOPayload.new(io, remaining_length.to_i32)
3650
self.new(topic, payload, packet_id, dup, qos, retain)
3751
rescue ex : ArgumentError
3852
raise MQTT::Protocol::Error::PacketDecode.new(ex.message)
@@ -47,7 +61,7 @@ module MQTT
4761
io.write_remaining_length remaining_length
4862
io.write_string topic
4963
io.write_int packet_id.not_nil! if qos.positive?
50-
io.write_bytes_raw(payload)
64+
io.write_bytes payload
5165
end
5266
end
5367
end

src/mqtt/protocol/payload.cr

+82
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
require "./io"
2+
3+
module MQTT
4+
module Protocol
5+
abstract struct Payload
6+
def self.new(bytes : Bytes)
7+
BytesPayload.new(bytes)
8+
end
9+
10+
def self.new(io : ::IO, bytesize : Int32)
11+
IOPayload.new(MQTT::Protocol::IO.new(io), bytesize)
12+
end
13+
14+
def self.new(io : MQTT::Protocol::IO, bytesize : Int32)
15+
IOPayload.new(io, bytesize)
16+
end
17+
18+
def size
19+
bytesize
20+
end
21+
22+
abstract def bytesize : Int32
23+
abstract def to_slice : Bytes
24+
abstract def to_io(io, format : ::IO::ByteFormat = IO::ByteFormat::SystemEndian)
25+
26+
def ==(other)
27+
return false unless other.is_a?(Payload)
28+
to_slice == other.to_slice
29+
end
30+
end
31+
32+
struct BytesPayload < Payload
33+
def initialize(@bytes : Bytes)
34+
end
35+
36+
def bytesize : Int32
37+
@bytes.bytesize
38+
end
39+
40+
def to_slice : Bytes
41+
@bytes
42+
end
43+
44+
def to_io(io, format : ::IO::ByteFormat = IO::ByteFormat::SystemEndian)
45+
io.write @bytes
46+
end
47+
end
48+
49+
struct IOPayload < Payload
50+
getter bytesize : Int32
51+
52+
@data : Bytes? = nil
53+
54+
def initialize(@io : MQTT::Protocol::IO, @bytesize : Int32)
55+
end
56+
57+
def initialize(io : ::IO, @bytesize : Int32)
58+
@io = MQTT::Protocol::IO.new(io)
59+
end
60+
61+
def to_slice : Bytes
62+
if peeked = @io.peek.try &.[0, bytesize]?
63+
return peeked
64+
end
65+
return @data || begin
66+
data = Bytes.new(bytesize)
67+
@io.read(data)
68+
data
69+
end
70+
end
71+
72+
def to_io(io, format : ::IO::ByteFormat = IO::ByteFormat::SystemEndian)
73+
if data = @data
74+
io.write data
75+
else
76+
copied = ::IO.copy(@io, io, bytesize)
77+
raise "Failed to copy payload" if copied != bytesize
78+
end
79+
end
80+
end
81+
end
82+
end

0 commit comments

Comments
 (0)