Skip to content

Commit 7454ffd

Browse files
committed
added ReportingManager
1 parent 82b0655 commit 7454ffd

File tree

22 files changed

+297
-384
lines changed

22 files changed

+297
-384
lines changed

examples/CSharpDev/CustomReporting/CustomReportingExample.cs

-60
This file was deleted.

examples/CSharpDev/Program.cs

-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
using System;
22
using CSharpDev.ClientFactory;
3-
using CSharpDev.CustomReporting;
43
using CSharpDev.DataFeed;
54
using CSharpDev.Features;
65
using CSharpDev.HelloWorld;

examples/FSharpDev/CustomReporting/CustomReporting.fs

-38
This file was deleted.

examples/FSharpDev/FSharpDev.fsproj

-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
<None Include="HelloWorld\config.json">
2222
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
2323
</None>
24-
<Compile Include="CustomReporting\CustomReporting.fs" />
2524
<Compile Include="HttpTests\SimpleHttpTest.fs" />
2625
<None Include="HttpTests\infra-config.json">
2726
<CopyToOutputDirectory>Always</CopyToOutputDirectory>

src/NBomber.Contracts/Contracts.fs

+2-2
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,10 @@ open NBomber.Contracts.Stats
1515
type Response = {
1616
StatusCode: Nullable<int>
1717
IsError: bool
18-
Message: string
18+
mutable Message: string
1919
SizeBytes: int
2020
LatencyMs: float
21-
Payload: obj
21+
mutable Payload: obj
2222
}
2323

2424
type ScenarioInfo = {

src/NBomber/Api/CSharp.fs

-7
Original file line numberDiff line numberDiff line change
@@ -262,13 +262,6 @@ type NBomberRunner =
262262
static member WithReportingInterval(context: NBomberContext, interval: TimeSpan) =
263263
context |> FSharp.NBomberRunner.withReportingInterval interval
264264

265-
/// Sets reporting sinks.
266-
/// Reporting sink is used to save real-time metrics to correspond database.
267-
[<Extension>]
268-
static member WithReportingSinks(context: NBomberContext, [<ParamArray>]reportingSinks: IReportingSink[]) =
269-
let sinks = reportingSinks |> Seq.toList
270-
context |> FSharp.NBomberRunner.withReportingSinks sinks
271-
272265
/// Sets worker plugins.
273266
/// Worker plugin is a plugin that starts at the test start and works as a background worker.
274267
[<Extension>]

src/NBomber/Api/FSharp.fs

-6
Original file line numberDiff line numberDiff line change
@@ -268,12 +268,6 @@ module NBomberRunner =
268268
let report = { context.Reporting with ReportingInterval = interval }
269269
{ context with Reporting = report }
270270

271-
/// Sets reporting sinks.
272-
/// Reporting sink is used to save real-time metrics to correspond database
273-
let withReportingSinks (reportingSinks: IReportingSink list) (context: NBomberContext) =
274-
let report = { context.Reporting with Sinks = reportingSinks }
275-
{ context with Reporting = report }
276-
277271
/// Sets worker plugins.
278272
/// Worker plugin is a plugin that starts at the test start and works as a background worker.
279273
let withWorkerPlugins (plugins: IWorkerPlugin list) (context: NBomberContext) =

src/NBomber/Contracts.fs

+7
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ type NBomberContext = {
5252

5353
namespace NBomber.Contracts.Internal
5454

55+
open System
5556
open CommandLine
5657
open NBomber.Contracts
5758

@@ -67,3 +68,9 @@ type StepResponse = {
6768
EndTimeMs: float
6869
LatencyMs: float
6970
}
71+
72+
type ScenarioRawStats = {
73+
ScenarioName: string
74+
Data: StepResponse list
75+
Timestamp: TimeSpan
76+
}

src/NBomber/Domain/Concurrency/Scheduler/ScenarioScheduler.fs

+17-6
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ open FSharp.Control.Reactive
77

88
open NBomber
99
open NBomber.Contracts
10+
open NBomber.Contracts.Internal
1011
open NBomber.Contracts.Stats
1112
open NBomber.Domain
1213
open NBomber.Domain.DomainTypes
@@ -80,7 +81,7 @@ let emptyExec (dep: ActorDep) (actorPool: ScenarioActor list) (scheduledActorCou
8081

8182
type ScenarioScheduler(dep: ActorDep, scenarioClusterCount: int) =
8283

83-
let log = dep.Logger.ForContext<ScenarioScheduler>()
84+
let _log = dep.Logger.ForContext<ScenarioScheduler>()
8485
let mutable _warmUp = false
8586
let mutable _scenario = dep.Scenario
8687
let mutable _currentSimulation = dep.Scenario.LoadTimeLine.Head.LoadSimulation
@@ -110,8 +111,15 @@ type ScenarioScheduler(dep: ActorDep, scenarioClusterCount: int) =
110111
getOneTimeActorCount()
111112
)
112113

113-
let getRealtimeStats (executionTime) =
114-
let executedDuration = _scenario |> Scenario.getDuration |> correctExecutedDuration executionTime
114+
let addRawStats (rawStats: ScenarioRawStats) =
115+
dep.ScenarioStatsActor.Publish(AddResponses rawStats.Data)
116+
117+
let getRawStats (timestamp) =
118+
let executedDuration = _scenario |> Scenario.getDuration |> correctExecutedDuration timestamp
119+
dep.ScenarioStatsActor.GetRawStats executedDuration
120+
121+
let getRealtimeStats (timestamp) =
122+
let executedDuration = _scenario |> Scenario.getDuration |> correctExecutedDuration timestamp
115123
let simulationStats = getCurrentSimulationStats()
116124
dep.ScenarioStatsActor.GetRealtimeStats(simulationStats, executedDuration)
117125

@@ -198,13 +206,16 @@ type ScenarioScheduler(dep: ActorDep, scenarioClusterCount: int) =
198206

199207
member _.EventStream = _eventStream :> IObservable<_>
200208
member _.Scenario = dep.Scenario
201-
member _.PublishStatsToCoordinator() = dep.ScenarioStatsActor.Publish(ActorMessage.PublishStatsToCoordinator)
202-
member _.GetRealtimeStats(executionTime) = getRealtimeStats executionTime
209+
210+
member _.AddRawStats(rawStats) = addRawStats rawStats
211+
member _.GetRawStats(timestamp) = getRawStats timestamp
212+
member _.GetRealtimeStats(timestamp) = getRealtimeStats timestamp
203213
member _.GetFinalStats() = getFinalStats()
214+
member _.GetStatusMessages() = ()
204215

205216
interface IDisposable with
206217
member _.Dispose() =
207218
stop()
208219
_eventStream.Dispose()
209-
log.Verbose $"{nameof ScenarioScheduler} disposed"
220+
_log.Verbose $"{nameof ScenarioScheduler} disposed"
210221

src/NBomber/Domain/Stats/ScenarioStatsActor.fs

+40-26
Original file line numberDiff line numberDiff line change
@@ -13,42 +13,48 @@ open NBomber.Domain.DomainTypes
1313
open NBomber.Domain.Stats.Statistics
1414

1515
type ActorMessage =
16-
| AddResponse of StepResponse
17-
| AddResponses of StepResponse[]
18-
| PublishStatsToCoordinator
19-
| GetRealtimeStats of reply:TaskCompletionSource<ScenarioStats> * LoadSimulationStats * duration:TimeSpan
20-
| GetFinalStats of reply:TaskCompletionSource<ScenarioStats> * LoadSimulationStats * duration:TimeSpan
16+
| AddResponse of StepResponse
17+
| AddResponses of StepResponse list
18+
| GetRawStats of reply:TaskCompletionSource<ScenarioRawStats> * timestamp:TimeSpan
19+
| GetRealtimeStats of reply:TaskCompletionSource<ScenarioStats> * LoadSimulationStats * duration:TimeSpan
20+
| GetFinalStats of reply:TaskCompletionSource<ScenarioStats> * LoadSimulationStats * duration:TimeSpan
2121

2222
type IScenarioStatsActor =
2323
abstract Publish: ActorMessage -> unit
24+
abstract GetRawStats: timestamp:TimeSpan -> Task<ScenarioRawStats>
2425
abstract GetRealtimeStats: LoadSimulationStats * duration:TimeSpan -> Task<ScenarioStats>
2526
abstract GetFinalStats: LoadSimulationStats * duration:TimeSpan -> Task<ScenarioStats>
2627

27-
type ScenarioStatsActor(logger: ILogger, scenario: Scenario, reportingInterval: TimeSpan) =
28+
type ScenarioStatsActor(logger: ILogger, scenario: Scenario, reportingInterval: TimeSpan, keepRawStats: bool) =
2829

2930
let _allStepsData = Array.init scenario.Steps.Length (fun _ -> StepStatsRawData.createEmpty())
3031
let mutable _intervalStepsData = Array.init scenario.Steps.Length (fun _ -> StepStatsRawData.createEmpty())
32+
let mutable _intervalRawStats = List.empty
3133

32-
let addResponse (allData: StepStatsRawData[]) (intervalData: StepStatsRawData[]) (resp: StepResponse) =
33-
let allStData = allData.[resp.StepIndex]
34-
let intervalStData = intervalData.[resp.StepIndex]
35-
allData.[resp.StepIndex] <- StepStatsRawData.addResponse allStData resp
36-
intervalData.[resp.StepIndex] <- StepStatsRawData.addResponse intervalStData resp
34+
let addResponse (resp: StepResponse) =
35+
let allStData = _allStepsData.[resp.StepIndex]
36+
let intervalStData = _intervalStepsData.[resp.StepIndex]
37+
_allStepsData.[resp.StepIndex] <- StepStatsRawData.addResponse allStData resp
38+
_intervalStepsData.[resp.StepIndex] <- StepStatsRawData.addResponse intervalStData resp
39+
40+
if keepRawStats then
41+
resp.ClientResponse.Payload <- null // to prevent sending in cluster mode
42+
resp.ClientResponse.Message <- null
43+
_intervalRawStats <- resp :: _intervalRawStats
3744

3845
let createScenarioStats (stepsData, simulationStats, operation, duration, interval) =
3946
ScenarioStats.create scenario stepsData simulationStats operation duration interval
4047

4148
let _actor = ActionBlock(fun msg ->
4249
try
4350
match msg with
44-
| AddResponse response ->
45-
addResponse _allStepsData _intervalStepsData response
46-
47-
| AddResponses responses ->
48-
responses |> Array.iter(addResponse _allStepsData _intervalStepsData)
51+
| AddResponse response -> addResponse response
52+
| AddResponses responses -> responses |> List.iter addResponse
4953

50-
| PublishStatsToCoordinator ->
51-
failwith "invalid operation" // it's only needed for cluster
54+
| GetRawStats (reply, timestamp) ->
55+
let stats = { ScenarioName = scenario.ScenarioName; Data = _intervalRawStats; Timestamp = timestamp }
56+
reply.TrySetResult(stats) |> ignore
57+
_intervalRawStats <- List.empty
5258

5359
| GetRealtimeStats (reply, simulationStats, duration) ->
5460
let scnStats = createScenarioStats(_intervalStepsData, simulationStats, OperationType.Bombing, duration, reportingInterval)
@@ -60,23 +66,31 @@ type ScenarioStatsActor(logger: ILogger, scenario: Scenario, reportingInterval:
6066
let scnStats = createScenarioStats(_allStepsData, simulationStats, OperationType.Complete, duration, duration)
6167
reply.TrySetResult(scnStats) |> ignore
6268
with
63-
| ex -> logger.Error(ex, "GlobalScenarioStatsActor failed")
69+
| ex -> logger.Error $"{nameof ScenarioStatsActor} failed: {ex.ToString()}"
6470
)
6571

6672
interface IScenarioStatsActor with
6773

6874
[<MethodImpl(MethodImplOptions.AggressiveInlining)>]
6975
member _.Publish(msg) = _actor.Post(msg) |> ignore
7076

77+
member _.GetRawStats(timestamp) =
78+
let reply = TaskCompletionSource<ScenarioRawStats>()
79+
GetRawStats(reply, timestamp) |> _actor.Post |> ignore
80+
reply.Task
81+
7182
member _.GetRealtimeStats(simulationStats, duration) =
72-
let tcs = TaskCompletionSource<ScenarioStats>()
73-
GetRealtimeStats(tcs, simulationStats, duration) |> _actor.Post |> ignore
74-
tcs.Task
83+
let reply = TaskCompletionSource<ScenarioStats>()
84+
GetRealtimeStats(reply, simulationStats, duration) |> _actor.Post |> ignore
85+
reply.Task
7586

7687
member _.GetFinalStats(simulationStats, duration) =
77-
let tcs = TaskCompletionSource<ScenarioStats>()
78-
GetFinalStats(tcs, simulationStats, duration) |> _actor.Post |> ignore
79-
tcs.Task
88+
let reply = TaskCompletionSource<ScenarioStats>()
89+
GetFinalStats(reply, simulationStats, duration) |> _actor.Post |> ignore
90+
reply.Task
8091

8192
let create (logger: ILogger) (scenario: Scenario) (reportingInterval: TimeSpan) =
82-
ScenarioStatsActor(logger, scenario, reportingInterval) :> IScenarioStatsActor
93+
ScenarioStatsActor(logger, scenario, reportingInterval, keepRawStats = false) :> IScenarioStatsActor
94+
95+
let createWithRawStats (logger: ILogger) (scenario: Scenario) (reportingInterval: TimeSpan) =
96+
ScenarioStatsActor(logger, scenario, reportingInterval, keepRawStats = true) :> IScenarioStatsActor

src/NBomber/Domain/Stats/Statistics.fs

+14-14
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ module internal NBomber.Domain.Stats.Statistics
22

33
open System
44
open System.Collections.Generic
5-
open System.Data
65

76
open HdrHistogram
87

@@ -206,19 +205,20 @@ module ScenarioStats =
206205
module NodeStats =
207206

208207
let create (testInfo: TestInfo) (nodeInfo: NodeInfo) (scnStats: ScenarioStats[]) =
209-
210-
let maxDuration = scnStats |> Array.maxBy(fun x -> x.Duration) |> fun scn -> scn.Duration
211-
212-
{ RequestCount = scnStats |> Array.sumBy(fun x -> x.RequestCount)
213-
OkCount = scnStats |> Array.sumBy(fun x -> x.OkCount)
214-
FailCount = scnStats |> Array.sumBy(fun x -> x.FailCount)
215-
AllBytes = scnStats |> Array.sumBy(fun x -> x.AllBytes)
216-
ScenarioStats = scnStats
217-
PluginStats = Array.empty
218-
NodeInfo = nodeInfo
219-
TestInfo = testInfo
220-
ReportFiles = Array.empty
221-
Duration = maxDuration }
208+
if Array.isEmpty scnStats then
209+
NodeStats.empty
210+
else
211+
let maxDuration = scnStats |> Array.maxBy(fun x -> x.Duration) |> fun scn -> scn.Duration
212+
{ RequestCount = scnStats |> Array.sumBy(fun x -> x.RequestCount)
213+
OkCount = scnStats |> Array.sumBy(fun x -> x.OkCount)
214+
FailCount = scnStats |> Array.sumBy(fun x -> x.FailCount)
215+
AllBytes = scnStats |> Array.sumBy(fun x -> x.AllBytes)
216+
ScenarioStats = scnStats
217+
PluginStats = Array.empty
218+
NodeInfo = nodeInfo
219+
TestInfo = testInfo
220+
ReportFiles = Array.empty
221+
Duration = maxDuration }
222222

223223
let round (stats: NodeStats) =
224224
{ stats with ScenarioStats = stats.ScenarioStats |> Array.map(ScenarioStats.round)

0 commit comments

Comments
 (0)