Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce memory allocationt. #2109

Open
wants to merge 54 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
a793a3a
Reduce memory allocation of DecodePublishPacket.
xljiulang Nov 18, 2024
6c7e0e5
Reduce memory allocation of MqttBufferReader.
xljiulang Nov 19, 2024
e7f4248
Change the type of the Packet field from ArraySegment<byte> to ReadOn…
xljiulang Nov 19, 2024
e578f0c
MqttBufferWriter.WriteBinary(byte[]) parameter type changed to ReadOn…
xljiulang Nov 19, 2024
9fa3337
Change the type of PayloadSegment to ReadOnlyMemory<byte>
xljiulang Nov 19, 2024
7edac8f
Change the type of PayloadSegment to ReadOnlyMemory<byte>
xljiulang Nov 19, 2024
0a31a61
Optimizing MqttBufferReader
xljiulang Nov 19, 2024
8b89a54
More byte[] parameters or properties are changed to ReadonlyMemory<byte>
xljiulang Nov 19, 2024
b26b4eb
MqttPacketInspector.FillReceiveBuffer: ReadOnlyMemory->ReadOnlySpan
xljiulang Nov 19, 2024
b1a1865
Add high performance payload to json deserialization extension
xljiulang Nov 19, 2024
53242ee
MqttBufferWriter: Rename WriteBinary(byte[],int,int) to Write(Readonl…
xljiulang Nov 20, 2024
42076f4
Using BinaryPrimitives to write length values.
xljiulang Nov 20, 2024
8f76c02
MqttBufferWriter: Use Span<byte> GetSpan(int) to get the buffer to wr…
xljiulang Nov 21, 2024
07772a1
MqttConnectPacket: Restore Password to byte[]? type.
xljiulang Nov 21, 2024
e2ee3f8
MqttConnectPacket: Restore Password to byte[]? type.
xljiulang Nov 21, 2024
f1593ff
MqttClientExtensions: Using ArrayPool makes PublishStringAsync reach …
xljiulang Nov 21, 2024
8f69184
Add 0 allocated MqttPayloadOwnerFactory to build mqtt Payload.
xljiulang Nov 21, 2024
b992a4f
MqttPayloadOwnerFactory: Improve implementation and add unit tests.
xljiulang Nov 21, 2024
7a61bc2
MqttServerExtensions: Add high-performance Inject related extension m…
xljiulang Nov 22, 2024
94f0f9e
Add support for payloadFactory extension methods
xljiulang Nov 22, 2024
41331c5
MqttPayloadOwnerFactory: CancellationToken support.
xljiulang Nov 22, 2024
f2367aa
Add PublishStreamAsync() and InjectStreamAsync()
xljiulang Nov 22, 2024
eb334f0
Improve ConvertPayloadToJson
xljiulang Nov 22, 2024
180b31b
Add CreateJsonReaderOptions
xljiulang Nov 22, 2024
58390e8
Reduce the use of unnecessary MqttApplicationMessageBuilder
xljiulang Nov 28, 2024
6664f15
Update Client_Publish_Samples.
xljiulang Nov 28, 2024
d1792a5
SingleSegmentPayloadOwner: use payloadFactory
xljiulang Nov 29, 2024
0ed0b48
Rpc.ExecuteAsync: Reduce memory allocation.
xljiulang Nov 29, 2024
c7ab618
Add ExecuteTimeOutAsync() extendsion methods.
xljiulang Nov 29, 2024
5a98f70
Merge branch 'dotnet:master' into buffer-reader
xljiulang Dec 1, 2024
a3ac26b
MqttRetainedMessageModel: copy the payload.
xljiulang Dec 1, 2024
0a25ad6
Merge branch 'main' into buffer-reader
xljiulang Dec 2, 2024
5b2a995
MqttBufferWriter: Add GetWrittenArraySegment method.
xljiulang Dec 3, 2024
6394733
Add the BufferSize property, which is used for the pool strategy.
xljiulang Dec 3, 2024
ee6b950
Do not use the PayloadSegment field in anticipation of its removal in…
xljiulang Dec 3, 2024
7ef0a2c
Merge branch 'buffer-reader' of https://github.com/xljiulang/MQTTnet …
xljiulang Dec 3, 2024
4025d18
ValidatingConnectionEventArgs: Restore RawPasswordto byte[]? type.
xljiulang Dec 3, 2024
8aea746
Some PublishAsync and InjectAsync methods remove await and return dir…
xljiulang Dec 3, 2024
c661764
Merge branch 'main' into buffer-reader
xljiulang Dec 4, 2024
a8b6fe9
ConfigureAwait(false)!
xljiulang Dec 4, 2024
22aae2d
MqttChannelAdapter: Use ArrayPool to reduce memory allocation.
xljiulang Dec 5, 2024
20d7866
Fixed the buffer size issue of PacketInspector.
xljiulang Dec 5, 2024
c7a16a9
Add more ExecuteTimeoutAsync overload methods;
xljiulang Dec 5, 2024
5ae8522
Update doc and unit test.
xljiulang Dec 5, 2024
d2c36ee
Enhance unit testing of MqttPayloadOwnerFactory.
xljiulang Dec 5, 2024
8df3a96
Update and fix MessageProcessingBenchmark.
xljiulang Dec 5, 2024
f2a81ea
MqttPacketInspector: add FillReceiveBuffer(ReadOnlySequence<byte>) me…
xljiulang Dec 6, 2024
759a816
Use Pipe to reduce the memory allocation of MqttPacketInspector.
xljiulang Dec 7, 2024
52daba1
InspectPacketAsync() is performed only in the Fill state.
xljiulang Dec 7, 2024
83138b9
Fixed the issue of writing 0-byte Binary.
xljiulang Dec 9, 2024
f82344a
MqttBufferWriter: Add WriteFourByteInteger(uint) method.
xljiulang Dec 9, 2024
cc09847
MqttBufferWriter: Added IBufferWriter<byte> AsLowLevelBufferWriter() …
xljiulang Dec 9, 2024
d0e1ce4
Fixed duplicate write of (MqttPropertyId id, ushort value).
xljiulang Dec 9, 2024
057365d
Merge branch 'dotnet:master' into buffer-reader
xljiulang Dec 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
SingleSegmentPayloadOwner: use payloadFactory
xljiulang committed Nov 29, 2024
commit d1792a549ca27dc736115901f508c36e1ea449cf
3 changes: 1 addition & 2 deletions Source/MQTTnet.Server/MqttServerExtensions.cs
Original file line number Diff line number Diff line change
@@ -111,8 +111,7 @@ public static async Task InjectBinaryAsync(
bool retain = false,
CancellationToken cancellationToken = default)
{
await using var payloadOwner = MqttPayloadOwnerFactory.CreateSingleSegment(payloadSize, out var payloadMemory);
payloadFactory?.Invoke(payloadMemory);
await using var payloadOwner = MqttPayloadOwnerFactory.CreateSingleSegment(payloadSize, payloadFactory);
await server.InjectSequenceAsync(clientId, topic, payloadOwner.Payload, qualityOfServiceLevel, retain, cancellationToken);
}

11 changes: 8 additions & 3 deletions Source/MQTTnet.Tests/Internal/MqttPayloadOwnerFactory_Test.cs
Original file line number Diff line number Diff line change
@@ -19,11 +19,16 @@ public class MqttPayloadOwnerFactory_Test
public async Task CreateSingleSegmentTest()
{
var size = 10;
await using var owner = MqttPayloadOwnerFactory.CreateSingleSegment(size, out var payloadMemory);
Random.Shared.NextBytes(payloadMemory.Span);
var buffer = new byte[size];
Random.Shared.NextBytes(buffer);

await using var owner = MqttPayloadOwnerFactory.CreateSingleSegment(size, payload =>
{
buffer.AsSpan().CopyTo(payload.Span);
});

Assert.AreEqual(size, owner.Payload.Length);
Assert.IsTrue(payloadMemory.Span.SequenceEqual(owner.Payload.ToArray()));
Assert.IsTrue(buffer.AsSpan().SequenceEqual(owner.Payload.ToArray()));
}

[TestMethod]
2 changes: 1 addition & 1 deletion Source/MQTTnet.Tests/Server/Publishing_Tests.cs
Original file line number Diff line number Diff line change
@@ -27,7 +27,7 @@ public async Task Disconnect_While_Publishing()
server.InterceptingPublishAsync += ev => server.DisconnectClientAsync(ev.ClientId, MqttDisconnectReasonCode.NormalDisconnection);

var client = await testEnvironment.ConnectClient();
await client.PublishStringAsync("test", qualityOfServiceLevel: MqttQualityOfServiceLevel.AtLeastOnce);
await client.PublishStringAsync("test",null, qualityOfServiceLevel: MqttQualityOfServiceLevel.AtLeastOnce);
}
}

6 changes: 3 additions & 3 deletions Source/MQTTnet.Tests/Server/QoS_Tests.cs
Original file line number Diff line number Diff line change
@@ -32,7 +32,7 @@ public async Task Fire_Event_On_Client_Acknowledges_QoS_0()
await client1.SubscribeAsync("A");

var client2 = await testEnvironment.ConnectClient();
await client2.PublishStringAsync("A");
await client2.PublishStringAsync("A", null);

await LongTestDelay();

@@ -59,7 +59,7 @@ public async Task Fire_Event_On_Client_Acknowledges_QoS_1()
await client1.SubscribeAsync("A", MqttQualityOfServiceLevel.AtLeastOnce);

var client2 = await testEnvironment.ConnectClient();
await client2.PublishStringAsync("A", qualityOfServiceLevel: MqttQualityOfServiceLevel.AtLeastOnce);
await client2.PublishStringAsync("A",null, qualityOfServiceLevel: MqttQualityOfServiceLevel.AtLeastOnce);

await LongTestDelay();

@@ -94,7 +94,7 @@ public async Task Fire_Event_On_Client_Acknowledges_QoS_2()
await client1.SubscribeAsync("A", MqttQualityOfServiceLevel.ExactlyOnce);

var client2 = await testEnvironment.ConnectClient();
await client2.PublishStringAsync("A", qualityOfServiceLevel: MqttQualityOfServiceLevel.ExactlyOnce);
await client2.PublishStringAsync("A",null, qualityOfServiceLevel: MqttQualityOfServiceLevel.ExactlyOnce);

await LongTestDelay();

6 changes: 3 additions & 3 deletions Source/MQTTnet.Tests/Server/Security_Tests.cs
Original file line number Diff line number Diff line change
@@ -58,7 +58,7 @@ await validClient.ConnectAsync(
.WithClientId("CLIENT")
.Build());

await validClient.PublishStringAsync("HELLO 1");
await validClient.PublishStringAsync("HELLO 1", null);

// The following code tries to connect a new client with the same client ID but invalid
// credentials. This should block the second client but keep the first one up and running.
@@ -81,11 +81,11 @@ await invalidClient.ConnectAsync(

await LongTestDelay();

await validClient.PublishStringAsync("HELLO 2");
await validClient.PublishStringAsync("HELLO 2", null);

await LongTestDelay();

await validClient.PublishStringAsync("HELLO 3");
await validClient.PublishStringAsync("HELLO 3", null);

await LongTestDelay();

2 changes: 1 addition & 1 deletion Source/MQTTnet.Tests/Server/Session_Tests.cs
Original file line number Diff line number Diff line change
@@ -300,7 +300,7 @@ public async Task Set_Session_Item()
Assert.AreEqual(MqttClientSubscribeResultCode.GrantedQoS0, subscribeResult.Items.First().ResultCode);

var client2 = await testEnvironment.ConnectClient();
await client2.PublishStringAsync("x");
await client2.PublishStringAsync("x",null);

await Task.Delay(1000);

2 changes: 1 addition & 1 deletion Source/MQTTnet.Tests/Server/Status_Tests.cs
Original file line number Diff line number Diff line change
@@ -158,7 +158,7 @@ public async Task Track_Sent_Application_Messages()

for (var i = 1; i < 25; i++)
{
await c1.PublishStringAsync("a");
await c1.PublishStringAsync("a",null);
await Task.Delay(50);

var clientStatus = await server.GetClientsAsync();
14 changes: 7 additions & 7 deletions Source/MQTTnet.Tests/Server/Subscribe_Tests.cs
Original file line number Diff line number Diff line change
@@ -187,7 +187,7 @@ public async Task Intercept_Subscription()

await client.SubscribeAsync("b");

await client.PublishStringAsync("a");
await client.PublishStringAsync("a", null);

await Task.Delay(500);

@@ -323,15 +323,15 @@ public async Task Subscribe_Multiple_In_Multiple_Request()

var c2 = await testEnvironment.ConnectClient();

await c2.PublishStringAsync("a");
await c2.PublishStringAsync("a", null);
await Task.Delay(100);
Assert.AreEqual(receivedMessagesCount, 1);

await c2.PublishStringAsync("b");
await c2.PublishStringAsync("b", null);
await Task.Delay(100);
Assert.AreEqual(receivedMessagesCount, 2);

await c2.PublishStringAsync("c");
await c2.PublishStringAsync("c", null);
await Task.Delay(100);
Assert.AreEqual(receivedMessagesCount, 3);
}
@@ -357,15 +357,15 @@ public async Task Subscribe_Multiple_In_Single_Request()

var c2 = await testEnvironment.ConnectClient();

await c2.PublishStringAsync("a");
await c2.PublishStringAsync("a", null);
await Task.Delay(100);
Assert.AreEqual(receivedMessagesCount, 1);

await c2.PublishStringAsync("b");
await c2.PublishStringAsync("b", null);
await Task.Delay(100);
Assert.AreEqual(receivedMessagesCount, 2);

await c2.PublishStringAsync("c");
await c2.PublishStringAsync("c", null);
await Task.Delay(100);
Assert.AreEqual(receivedMessagesCount, 3);
}
38 changes: 27 additions & 11 deletions Source/MQTTnet/Internal/MqttPayloadOwnerFactory.cs
Original file line number Diff line number Diff line change
@@ -17,12 +17,12 @@ public static class MqttPayloadOwnerFactory
/// Create owner for a single segment payload
/// </summary>
/// <param name="payloadSize"></param>
/// <param name="payloadMemory"></param>
/// <param name="payloadFactory"></param>
/// <returns></returns>
public static MqttPayloadOwner CreateSingleSegment(int payloadSize, out Memory<byte> payloadMemory)
public static MqttPayloadOwner CreateSingleSegment(int payloadSize, Action<Memory<byte>> payloadFactory)
{
ArgumentOutOfRangeException.ThrowIfNegativeOrZero(payloadSize);
return SingleSegmentPayloadOwner.Create(payloadSize, out payloadMemory);
ArgumentNullException.ThrowIfNull(payloadFactory);
return SingleSegmentPayloadOwner.Create(payloadSize, payloadFactory);
}

/// <summary>
@@ -42,22 +42,38 @@ private sealed class SingleSegmentPayloadOwner : MqttPayloadOwner
private readonly byte[] _buffer;
public override ReadOnlySequence<byte> Payload { get; }

private SingleSegmentPayloadOwner(byte[] buffer, ReadOnlyMemory<byte> payloadMemory)
private SingleSegmentPayloadOwner(byte[] buffer, ReadOnlyMemory<byte> payload)
{
_buffer = buffer;
Payload = new ReadOnlySequence<byte>(payloadMemory);
Payload = new ReadOnlySequence<byte>(payload);
}

public static MqttPayloadOwner Create(int payloadSize, out Memory<byte> payloadMemory)
public static MqttPayloadOwner Create(int payloadSize, Action<Memory<byte>> payloadFactory)
{
var buffer = ArrayPool<byte>.Shared.Rent(payloadSize);
payloadMemory = buffer.AsMemory(0, payloadSize);
return new SingleSegmentPayloadOwner(buffer, payloadMemory);
byte[] buffer;
Memory<byte> payload;

if (payloadSize <= 0)
{
buffer = Array.Empty<byte>();
payload = Memory<byte>.Empty;
}
else
{
buffer = ArrayPool<byte>.Shared.Rent(payloadSize);
payload = buffer.AsMemory(0, payloadSize);
}

payloadFactory.Invoke(payload);
return new SingleSegmentPayloadOwner(buffer, payload);
}

protected override ValueTask DisposeAsync(bool disposing)
{
ArrayPool<byte>.Shared.Return(_buffer);
if (_buffer.Length > 0)
{
ArrayPool<byte>.Shared.Return(_buffer);
}
return ValueTask.CompletedTask;
}
}
3 changes: 1 addition & 2 deletions Source/MQTTnet/MqttClientExtensions.cs
Original file line number Diff line number Diff line change
@@ -97,8 +97,7 @@ public static async Task<MqttClientPublishResult> PublishBinaryAsync(
bool retain = false,
CancellationToken cancellationToken = default)
{
await using var payloadOwner = MqttPayloadOwnerFactory.CreateSingleSegment(payloadSize, out var payloadMemory);
payloadFactory?.Invoke(payloadMemory);
await using var payloadOwner = MqttPayloadOwnerFactory.CreateSingleSegment(payloadSize, payloadFactory);
return await mqttClient.PublishSequenceAsync(topic, payloadOwner.Payload, qualityOfServiceLevel, retain, cancellationToken);
}