Skip to content
This repository was archived by the owner on Apr 12, 2021. It is now read-only.

Commit b2ced5d

Browse files
jeremy001181Scooletz
authored andcommitted
Feature/can mark processed message complete when exception occurred (#16)
* added support for marking message as proccessed when exception occurred * Added another test for success or fail as batch * added comment * Updated readme file on how to use MarkAsProccess * updated comment and correct readme format * Update src/QueueBatch/Impl/Listener.cs Co-Authored-By: jeremy001181 <[email protected]> * Update src/QueueBatch.Tests/Helpers.cs Co-Authored-By: jeremy001181 <[email protected]> * refactor according the review
1 parent bb61337 commit b2ced5d

8 files changed

+86
-10
lines changed

README.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,10 @@ public static void MyFunc([QueueBatchTrigger("myqueue")] IMessageBatch batch)
3535
}
3636
```
3737

38-
You can also acknowledge only some of the messages. The rest, will be retried in a similar manner to the regural `[QueueTrigger]`
38+
With `SuccessOrFailAsBatch` set to `false`, you can also acknowledge only some of the messages. The rest, will be retried in a similar manner to the regural `[QueueTrigger]`
3939

4040
```c#
41-
public static void MyFunc([QueueBatchTrigger("myqueue")] IMessageBatch batch)
41+
public static void MyFunc([QueueBatchTrigger("myqueue", SuccessOrFailAsBatch = false)] IMessageBatch batch)
4242
{
4343
foreach (var msg in batch.Messages)
4444
{

src/QueueBatch.Tests/Helpers.cs

+1-2
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,10 @@ public static async Task<List<CloudQueueMessage>> Drain(this CloudQueue queue, i
3939

4040
return received;
4141
}
42-
4342
public static async Task AssertIsEmpty(this CloudQueue queue)
4443
{
4544
var messages = await queue.GetMessagesAsync(32);
4645
CollectionAssert.IsEmpty(messages);
4746
}
4847
}
49-
}
48+
}

src/QueueBatch.Tests/IntegrationTests.cs

+67
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,82 @@
11
using System;
2+
using System.Collections.Generic;
23
using System.Diagnostics;
34
using System.Linq;
5+
using System.Text;
46
using System.Threading.Tasks;
57
using Microsoft.Azure.WebJobs;
68
using Microsoft.Azure.WebJobs.Host;
79
using Microsoft.WindowsAzure.Storage.Queue;
10+
using Newtonsoft.Json;
811
using NUnit.Framework;
912

1013
namespace QueueBatch.Tests
1114
{
1215
public class IntegrationTests : BaseTest
1316
{
17+
[Test]
18+
public async Task Messages_failed_as_batch_when_SuccessOrFailAsBatch_set_true()
19+
{
20+
const int count = 4;
21+
await SendUnique(count);
22+
await Batch.AddMessageAsync(new CloudQueueMessage("bad-guid"));
23+
24+
await RunHost<SuccessOrFailAsBatchTrue>(async () =>
25+
{
26+
await Poison.Drain(5);
27+
await Batch.AssertIsEmpty();
28+
});
29+
}
30+
31+
[Test]
32+
public async Task Can_mark_message_as_processed_while_other_cause_exception_when_SuccessOrFailAsBatch_set_false()
33+
{
34+
const int count = 4;
35+
await SendUnique(count);
36+
await Batch.AddMessageAsync(new CloudQueueMessage("bad-guid"));
37+
38+
await RunHost<SuccessOrFailAsBatchFalse>(async () =>
39+
{
40+
await Output.Drain(4);
41+
await Poison.Drain(1);
42+
await Batch.AssertIsEmpty();
43+
});
44+
}
45+
46+
public class SuccessOrFailAsBatchTrue
47+
{
48+
public static async Task Do(
49+
[QueueBatchTrigger(InputQueue, MaxBackOffInSeconds = 1, SuccessOrFailAsBatch = true)] IMessageBatch batch
50+
, [Queue(OutputQueue)] CloudQueue output)
51+
{
52+
await SomeMessageInBatchCauseException(batch, output);
53+
}
54+
}
55+
56+
public class SuccessOrFailAsBatchFalse
57+
{
58+
public static async Task Do(
59+
[QueueBatchTrigger(InputQueue, MaxBackOffInSeconds = 1, SuccessOrFailAsBatch = false)] IMessageBatch batch
60+
, [Queue(OutputQueue)] CloudQueue output)
61+
{
62+
await SomeMessageInBatchCauseException(batch, output);
63+
}
64+
}
65+
66+
private static async Task SomeMessageInBatchCauseException(IMessageBatch batch, CloudQueue output)
67+
{
68+
var messages = batch.Messages.ToList();
69+
70+
foreach (var m in messages)
71+
{
72+
var content = Encoding.UTF8.GetString(m.Payload.Span);
73+
var guid = Guid.Parse(content);
74+
await output.AddMessageAsync(new CloudQueueMessage(m.Id));
75+
76+
batch.MarkAsProcessed(m);
77+
}
78+
}
79+
1480
[Test]
1581
public async Task Simple_batch_dispatch()
1682
{
@@ -105,5 +171,6 @@ public static Task Do([QueueBatchTrigger(InputQueue, UseFasterQueues = true)] IM
105171
return Task.CompletedTask;
106172
}
107173
}
174+
108175
}
109176
}

src/QueueBatch.Tests/approved_api.txt

+1
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ namespace QueueBatch
3131
public int ParallelGets { get; set; }
3232
public string QueueName { get; }
3333
public bool RunWithEmptyBatch { get; set; }
34+
public bool SuccessOrFailAsBatch { get; set; }
3435
public bool UseFasterQueues { get; set; }
3536
}
3637
}

src/QueueBatch/Impl/BindingProvider.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ await Task.WhenAll(
5454
? new QueueFunctionLogic(SdkQueue.CreateFast(messageQueue, cache), SdkQueue.CreateFast(poisonQueue, cache))
5555
: new QueueFunctionLogic(new SdkQueue(messageQueue), new SdkQueue(poisonQueue));
5656

57-
return new TriggerBinding(context.Parameter, queue, TimeSpan.FromSeconds(attr.MaxBackOffInSeconds), attr.ParallelGets, attr.RunWithEmptyBatch, loggerFactory);
57+
return new TriggerBinding(context.Parameter, queue, TimeSpan.FromSeconds(attr.MaxBackOffInSeconds), attr.ParallelGets, attr.SuccessOrFailAsBatch, attr.RunWithEmptyBatch, loggerFactory);
5858
}
5959
CloudQueue CreatePoisonQueue(CloudQueueClient queueClient, string name)
6060
{

src/QueueBatch/Impl/Listener.cs

+5-3
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ class Listener : IListener
1717
readonly int maxRetries;
1818
readonly QueueFunctionLogic queue;
1919
readonly TimeSpan visibilityTimeout;
20+
readonly bool successOrFailAsBatch;
2021
readonly bool shouldRunOnEmptyBatch;
2122
readonly ILoggerFactory loggerFactory;
2223
readonly Task<IRetrievedMessages>[] gets;
@@ -26,13 +27,14 @@ class Listener : IListener
2627
static readonly TimeSpan VisibilityTimeout = TimeSpan.FromMinutes(10.0);
2728

2829
public Listener(ITriggeredFunctionExecutor executor, QueueFunctionLogic queue,
29-
TimeSpan maxBackOff, int maxRetries, TimeSpan visibilityTimeout, int parallelGets,
30+
TimeSpan maxBackOff, int maxRetries, TimeSpan visibilityTimeout, int parallelGets, bool successOrFailAsBatch,
3031
bool shouldRunOnEmptyBatch, ILoggerFactory loggerFactory)
3132
{
3233
this.executor = executor;
3334
this.queue = queue;
3435
this.maxRetries = maxRetries;
3536
this.visibilityTimeout = visibilityTimeout;
37+
this.successOrFailAsBatch = successOrFailAsBatch;
3638
this.shouldRunOnEmptyBatch = shouldRunOnEmptyBatch;
3739
this.loggerFactory = loggerFactory;
3840
gets = new Task<IRetrievedMessages>[parallelGets];
@@ -94,7 +96,7 @@ async Task Process()
9496

9597
try
9698
{
97-
if (result.Succeeded)
99+
if (result.Succeeded || !successOrFailAsBatch)
98100
{
99101
await batch.Complete(CancellationToken.None);
100102
}
@@ -131,4 +133,4 @@ async Task Process()
131133

132134
Task Delay(bool executionSucceeded, CancellationToken ct) => Task.Delay(backOff.GetNextDelay(executionSucceeded), ct);
133135
}
134-
}
136+
}

src/QueueBatch/Impl/TriggerBinding.cs

+4-2
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,15 @@ class TriggerBinding : ITriggerBinding
1919
readonly ILoggerFactory loggerFactory;
2020
readonly ParameterInfo param;
2121
readonly QueueFunctionLogic queue;
22+
private bool successOrFailAsBatch;
2223

23-
public TriggerBinding(ParameterInfo param, QueueFunctionLogic queue, TimeSpan maxBackOff, int parallelGets, bool shouldRunOnEmptyBatch, ILoggerFactory loggerFactory)
24+
public TriggerBinding(ParameterInfo param, QueueFunctionLogic queue, TimeSpan maxBackOff, int parallelGets, bool successOrFailAsBatch, bool shouldRunOnEmptyBatch, ILoggerFactory loggerFactory)
2425
{
2526
this.param = param;
2627
this.queue = queue;
2728
this.maxBackOff = maxBackOff;
2829
this.parallelGets = parallelGets;
30+
this.successOrFailAsBatch = successOrFailAsBatch;
2931
this.shouldRunOnEmptyBatch = shouldRunOnEmptyBatch;
3032
this.loggerFactory = loggerFactory;
3133
BindingDataContract = new Dictionary<string, Type>(StringComparer.OrdinalIgnoreCase)
@@ -47,7 +49,7 @@ public Task<ITriggerData> BindAsync(object value, ValueBindingContext context)
4749

4850
public Task<IListener> CreateListenerAsync(ListenerFactoryContext context)
4951
{
50-
return Task.FromResult<IListener>(new Listener(context.Executor, queue, maxBackOff, 5, TimeSpan.FromSeconds(1), parallelGets, shouldRunOnEmptyBatch, loggerFactory));
52+
return Task.FromResult<IListener>(new Listener(context.Executor, queue, maxBackOff, 5, TimeSpan.FromSeconds(1), parallelGets, successOrFailAsBatch, shouldRunOnEmptyBatch, loggerFactory));
5153
}
5254

5355
public ParameterDescriptor ToParameterDescriptor()

src/QueueBatch/QueueBatchTriggerAttribute.cs

+5
Original file line numberDiff line numberDiff line change
@@ -52,5 +52,10 @@ public QueueBatchTriggerAttribute(string queueName)
5252
/// Gets or sets the app setting name that contains the Azure Storage connection string.
5353
/// </summary>
5454
public string Connection { get; set; }
55+
56+
/// <summary>
57+
/// If set to false, each message in same batch can be marked as processed independently and they will not affected when other messages cause exception, default to true
58+
/// </summary>
59+
public bool SuccessOrFailAsBatch { get; set; } = true;
5560
}
5661
}

0 commit comments

Comments
 (0)