diff --git a/src/NMS.AMQP/Message/Facade/INmsBytesMessageFacade.cs b/src/NMS.AMQP/Message/Facade/INmsBytesMessageFacade.cs index f165cfe5..e17e9e3b 100644 --- a/src/NMS.AMQP/Message/Facade/INmsBytesMessageFacade.cs +++ b/src/NMS.AMQP/Message/Facade/INmsBytesMessageFacade.cs @@ -24,5 +24,6 @@ public interface INmsBytesMessageFacade : INmsMessageFacade BinaryWriter GetDataWriter(); void Reset(); long BodyLength { get; } + byte[] Content { get; set; } } } \ No newline at end of file diff --git a/src/NMS.AMQP/Message/NmsBytesMessage.cs b/src/NMS.AMQP/Message/NmsBytesMessage.cs index 62eaa5bd..ed3c56d5 100644 --- a/src/NMS.AMQP/Message/NmsBytesMessage.cs +++ b/src/NMS.AMQP/Message/NmsBytesMessage.cs @@ -37,11 +37,14 @@ public byte[] Content { get { - byte[] buffer = new byte [BodyLength]; - ReadBytes(buffer); - return buffer; + CheckWriteOnlyBody(); + return this.facade.Content; + } + set + { + CheckReadOnlyBody(); + this.facade.Content = value; } - set => WriteBytes(value); } public byte ReadByte() @@ -292,7 +295,11 @@ public int ReadBytes(byte[] value) public int ReadBytes(byte[] value, int length) { InitializeReading(); + return ReadBytes(dataIn, value, length); + } + private int ReadBytes(BinaryReader binaryReader, byte[] value, int length) + { if (length < 0 || value.Length < length) { throw new IndexOutOfRangeException("length must not be negative or larger than the size of the provided array"); @@ -300,7 +307,7 @@ public int ReadBytes(byte[] value, int length) try { - return dataIn.Read(value, 0, length); + return binaryReader.Read(value, 0, length); } catch (EndOfStreamException e) { diff --git a/src/NMS.AMQP/Provider/Amqp/Message/AmqpNmsBytesMessageFacade.cs b/src/NMS.AMQP/Provider/Amqp/Message/AmqpNmsBytesMessageFacade.cs index e61ddfb6..b55f17da 100644 --- a/src/NMS.AMQP/Provider/Amqp/Message/AmqpNmsBytesMessageFacade.cs +++ b/src/NMS.AMQP/Provider/Amqp/Message/AmqpNmsBytesMessageFacade.cs @@ -31,10 +31,11 @@ public class AmqpNmsBytesMessageFacade : AmqpNmsMessageFacade, INmsBytesMessageF private EndianBinaryReader byteIn = null; private EndianBinaryWriter byteOut = null; - private static readonly Data EMPTY_DATA = new Data { Binary = new byte[0] }; + private static readonly byte[] EMPTY_BINARY = new byte[0]; + private static readonly Data EMPTY_DATA = new Data { Binary = EMPTY_BINARY }; public override sbyte? JmsMsgType => MessageSupport.JMS_TYPE_BYTE; - public long BodyLength => GetBinaryFromBody().Binary.LongLength; + public long BodyLength => Content.Length; public BinaryReader GetDataReader() { @@ -45,58 +46,60 @@ public BinaryReader GetDataReader() if (byteIn == null) { - Data body = GetBinaryFromBody(); - Stream dataStream = new MemoryStream(body.Binary, false); + byte[] body = Content; + Stream dataStream = new MemoryStream(body, false); byteIn = new EndianBinaryReader(dataStream); } return byteIn; } - private Data GetBinaryFromBody() + public byte[] Content { - RestrictedDescribed body = Message.BodySection; - Data result = EMPTY_DATA; - if (body == null) + get { - return result; - } - else if (body is Data) - { - byte[] binary = (body as Data).Binary; - if (binary != null && binary.Length != 0) + RestrictedDescribed body = Message.BodySection; + byte[] result = EMPTY_BINARY; + if (body == null) { - return body as Data; + return result; } - } - else if (body is AmqpValue) - { - object value = (body as AmqpValue).Value; - if (value == null) + else if (body is Data) { - return result; + byte[] binary = (body as Data).Binary; + if (binary != null && binary.Length != 0) + { + return binary; + } } - - if (value is byte[]) + else if (body is AmqpValue) { - byte[] dataValue = value as byte[]; - if (dataValue.Length > 0) + object value = (body as AmqpValue).Value; + if (value == null) { - result = new Data(); - result.Binary = dataValue; + return result; + } + if (value is byte[]) + { + byte[] dataValue = value as byte[]; + if (dataValue.Length > 0) + { + return dataValue; + } + } + else + { + throw new IllegalStateException("Unexpected Amqp value content-type: " + value.GetType().FullName); } } else { - throw new IllegalStateException("Unexpected Amqp value content-type: " + value.GetType().FullName); + throw new IllegalStateException("Unexpected body content-type: " + body.GetType().FullName); } - } - else - { - throw new IllegalStateException("Unexpected body content-type: " + body.GetType().FullName); - } - return result; + return result; + } + set => Message.BodySection = new Data { Binary = value }; } public BinaryWriter GetDataWriter() diff --git a/test/Apache-NMS-AMQP-Test/Message/Facade/NmsTestBytesMessageFacade.cs b/test/Apache-NMS-AMQP-Test/Message/Facade/NmsTestBytesMessageFacade.cs index 1b5f2a1a..d86e9dc9 100644 --- a/test/Apache-NMS-AMQP-Test/Message/Facade/NmsTestBytesMessageFacade.cs +++ b/test/Apache-NMS-AMQP-Test/Message/Facade/NmsTestBytesMessageFacade.cs @@ -26,16 +26,15 @@ public class NmsTestBytesMessageFacade : NmsTestMessageFacade, INmsBytesMessageF { private BinaryWriter bytesOut = null; private BinaryReader bytesIn = null; - private byte[] content = null; public NmsTestBytesMessageFacade() { - content = new byte[0]; + Content = new byte[0]; } public NmsTestBytesMessageFacade(byte[] content) { - this.content = content; + this.Content = content; } public BinaryReader GetDataReader() @@ -45,7 +44,7 @@ public BinaryReader GetDataReader() throw new IllegalStateException("Body is being written to, cannot perform a read."); } - return bytesIn ?? (bytesIn = new BinaryReader(new MemoryStream(content))); + return bytesIn ?? (bytesIn = new BinaryReader(new MemoryStream(Content))); } public BinaryWriter GetDataWriter() @@ -65,7 +64,7 @@ public void Reset() bytesOut.BaseStream.Position = 0; bytesOut.BaseStream.CopyTo(byteStream); - content = byteStream.ToArray(); + Content = byteStream.ToArray(); byteStream.Close(); bytesOut.Close(); @@ -81,9 +80,10 @@ public void Reset() public override void ClearBody() { this.Reset(); - content = new byte[0]; + Content = new byte[0]; } - public long BodyLength => content?.LongLength ?? 0; + public long BodyLength => Content?.LongLength ?? 0; + public byte[] Content { get; set; } } } \ No newline at end of file diff --git a/test/Apache-NMS-AMQP-Test/Message/NmsBytesMessageTest.cs b/test/Apache-NMS-AMQP-Test/Message/NmsBytesMessageTest.cs index c54d5b6d..cae2831a 100644 --- a/test/Apache-NMS-AMQP-Test/Message/NmsBytesMessageTest.cs +++ b/test/Apache-NMS-AMQP-Test/Message/NmsBytesMessageTest.cs @@ -534,5 +534,43 @@ public void TestMessageCopy() NmsBytesMessage copy = message.Copy() as NmsBytesMessage; Assert.IsNotNull(copy); } + + [Test] + public void TestMessageContentCanBeObtainedMultipleTimesWithoutReset() + { + byte[] bytes = Encoding.UTF8.GetBytes("myBytes"); + NmsBytesMessage message = factory.CreateBytesMessage(); + message.Content = bytes; + message.Reset(); + + CollectionAssert.AreEqual(bytes, message.Content); + CollectionAssert.AreEqual(bytes, message.Content); + } + + [Test] + public void TestConsecutiveReadBytes() + { + byte[] bytes = CreateBytesArrayOfSize(24); + NmsBytesMessage message = factory.CreateBytesMessage(bytes); + message.Reset(); + + CollectionAssert.AreEqual(bytes.Take(8), ReadBytes(message, 8)); + CollectionAssert.AreEqual(bytes.Skip(8).Take(8), ReadBytes(message, 8)); + CollectionAssert.AreEqual(bytes.Skip(16).Take(8), ReadBytes(message, 8)); + CollectionAssert.AreEqual(new byte[8], ReadBytes(message, 8)); + } + + private static byte[] CreateBytesArrayOfSize(int size) + { + var random = new Random(); + return Enumerable.Range(0, size).Select(x => (byte) random.Next(byte.MaxValue)).ToArray(); + } + + private static byte[] ReadBytes(IBytesMessage message, int count) + { + byte[] bytes = new byte[count]; + message.ReadBytes(bytes); + return bytes; + } } } \ No newline at end of file