-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathCountersAggregator.cs
103 lines (81 loc) · 3.37 KB
/
CountersAggregator.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
using System;
using System.Linq;
using System.Threading;
using Composite;
using Hangfire.Common;
using Hangfire.CompositeC1.Types;
using Hangfire.Logging;
using Hangfire.Server;
namespace Hangfire.CompositeC1
{
public class CountersAggregator : IServerComponent, IBackgroundProcess
{
private const int NumberOfRecordsInSinglePass = 10000;
private static readonly TimeSpan DelayBetweenPasses = TimeSpan.FromMilliseconds(500);
private readonly ILog _logger = LogProvider.GetCurrentClassLogger();
private readonly CompositeC1Storage _storage;
private readonly TimeSpan _interval;
public CountersAggregator(CompositeC1Storage storage, TimeSpan interval)
{
Verify.ArgumentNotNull(storage, "storage");
_storage = storage;
_interval = interval;
}
public void Execute(BackgroundProcessContext context)
{
Execute(context.CancellationToken);
}
public void Execute(CancellationToken cancellationToken)
{
_logger.Debug("Aggregating records in 'Counter' table...");
int removedCount;
do
{
removedCount = _storage.UseConnection(connection =>
{
var counters = connection.Get<ICounter>().Take(NumberOfRecordsInSinglePass).ToList();
var groupedCounters = counters.GroupBy(c => c.Key).Select(g => new
{
g.Key,
Value = g.Sum(c => c.Value),
ExpireAt = g.Max(c => c.ExpireAt)
});
foreach (var counter in groupedCounters)
{
var aggregate = connection.Get<IAggregatedCounter>().SingleOrDefault(a => a.Key == counter.Key);
if (aggregate == null)
{
aggregate = connection.CreateNew<IAggregatedCounter>();
aggregate.Id = Guid.NewGuid();
aggregate.Key = counter.Key;
aggregate.Value = counter.Value;
aggregate.ExpireAt = counter.ExpireAt;
}
else
{
aggregate.Value += counter.Value;
if (counter.ExpireAt > aggregate.ExpireAt)
{
aggregate.ExpireAt = counter.ExpireAt;
}
}
connection.AddOrUpdate(aggregate);
}
connection.Delete<ICounter>(counters);
return counters.Count();
});
if (removedCount > 0)
{
cancellationToken.Wait(DelayBetweenPasses);
cancellationToken.ThrowIfCancellationRequested();
}
} while (removedCount != 0);
_logger.Trace("Records from the 'Counter' table aggregated.");
cancellationToken.Wait(_interval);
}
public override string ToString()
{
return GetType().ToString();
}
}
}