1
1
namespace SqlStreamStore.FSharp
2
2
3
3
open System.Threading
4
- open SqlStreamStore
5
4
open SqlStreamStore.Streams
6
5
open Insurello.AsyncExtra
7
6
8
- [<RequireQualifiedAccessAttribute>]
9
- type ReadingDirection =
10
- | Forward
11
- | Backward
12
-
13
7
module Read =
14
- let readFromAllStreamAsync : IStreamStore -> ReadingDirection -> StartPositionInclusive -> MessageCount -> Async < ReadAllPage > =
8
+ let private fromReadVersion : uint -> int = fun readVersion -> int ( readVersion)
9
+
10
+ let readFromAllStreamAsync : SqlStreamStore.IStreamStore -> ReadingDirection -> StartPositionInclusive -> MessageCount -> Async < ReadAllPage > =
15
11
fun store readingDirection startPositionInclusive msgCount ->
16
12
match readingDirection with
17
13
| ReadingDirection.Forward -> store.ReadAllForwards( startPositionInclusive, msgCount)
18
14
| ReadingDirection.Backward -> store.ReadAllBackwards( startPositionInclusive, msgCount)
19
15
|> Async.AwaitTask
20
16
21
- let readFromAllStreamAsync ' : IStreamStore -> ReadingDirection -> StartPositionInclusive -> MessageCount -> CancellationToken -> Async < ReadAllPage > =
22
- fun store readingDirection startPositionInclusive msgCount cancellationToken ->
17
+ let readFromAllStreamAsync ' : SqlStreamStore. IStreamStore -> ReadingDirection -> StartPositionInclusive -> MessageCount -> bool -> CancellationToken -> Async < ReadAllPage > =
18
+ fun store readingDirection startPositionInclusive msgCount prefetchJson cancellationToken ->
23
19
match readingDirection with
24
- | ReadingDirection.Forward -> store.ReadAllForwards( startPositionInclusive, msgCount, cancellationToken)
25
- | ReadingDirection.Backward -> store.ReadAllBackwards( startPositionInclusive, msgCount, cancellationToken)
20
+ | ReadingDirection.Forward ->
21
+ store.ReadAllForwards( startPositionInclusive, msgCount, prefetchJson, cancellationToken)
22
+ | ReadingDirection.Backward ->
23
+ store.ReadAllBackwards( startPositionInclusive, msgCount, prefetchJson, cancellationToken)
26
24
|> Async.AwaitTask
27
25
28
- let readFromStreamAsync : IStreamStore -> ReadingDirection -> StreamDetails -> MessageCount -> Async < ReadStreamPage > =
29
- fun store readingDirection streamDetails msgCount ->
26
+ let readFromStreamAsync : SqlStreamStore. IStreamStore -> ReadingDirection -> ReadStreamDetails -> MessageCount -> Async < ReadStreamPage > =
27
+ fun store readingDirection readStreamDetails msgCount ->
30
28
match readingDirection with
31
29
| ReadingDirection.Forward ->
32
- store.ReadStreamForwards( streamDetails.streamName, Helpers.toVersion streamDetails.version, msgCount)
30
+ store.ReadStreamForwards
31
+ ( StreamId( readStreamDetails.streamName), fromReadVersion readStreamDetails.version, msgCount)
33
32
| ReadingDirection.Backward ->
34
- store.ReadStreamBackwards( streamDetails.streamName, Helpers.toVersion streamDetails.version, msgCount)
33
+ store.ReadStreamBackwards
34
+ ( StreamId( readStreamDetails.streamName), fromReadVersion readStreamDetails.version, msgCount)
35
35
|> Async.AwaitTask
36
36
37
- let readFromStreamAsync ' : IStreamStore -> ReadingDirection -> StreamDetails -> MessageCount -> CancellationToken -> Async < ReadStreamPage > =
38
- fun store readingDirection streamDetails msgCount cancellationToken ->
37
+ let readFromStreamAsync ' : SqlStreamStore. IStreamStore -> ReadingDirection -> ReadStreamDetails -> MessageCount -> bool -> CancellationToken -> Async < ReadStreamPage > =
38
+ fun store readingDirection readStreamDetails msgCount prefetchJson cancellationToken ->
39
39
match readingDirection with
40
40
| ReadingDirection.Forward ->
41
41
store.ReadStreamForwards
42
- ( streamDetails.streamName, Helpers.toVersion streamDetails.version, msgCount, cancellationToken)
42
+ ( StreamId( readStreamDetails.streamName),
43
+ fromReadVersion readStreamDetails.version,
44
+ msgCount,
45
+ prefetchJson,
46
+ cancellationToken)
43
47
| ReadingDirection.Backward ->
44
48
store.ReadStreamBackwards
45
- ( streamDetails.streamName, Helpers.toVersion streamDetails.version, msgCount, cancellationToken)
49
+ ( StreamId( readStreamDetails.streamName),
50
+ fromReadVersion readStreamDetails.version,
51
+ msgCount,
52
+ prefetchJson,
53
+ cancellationToken)
46
54
|> Async.AwaitTask
47
55
48
56
module ReadExtras =
49
- let readAllStreamMessages : IStreamStore -> ReadingDirection -> StartPositionInclusive -> MessageCount -> AsyncResult < List < StreamMessage >, string > =
57
+ let readAllStreamMessages : SqlStreamStore. IStreamStore -> ReadingDirection -> StartPositionInclusive -> MessageCount -> AsyncResult < List < StreamMessage >, string > =
50
58
fun store readingDirection startPositionInclusive msgCount ->
51
59
Read.readFromAllStreamAsync store readingDirection startPositionInclusive msgCount
52
60
|> Async.bind ( fun readAllPage ->
@@ -60,9 +68,9 @@ module ReadExtras =
60
68
( sprintf " Failed to retrieve all messages. Retrieved messages count: %d " messageList.Length)
61
69
|> AsyncResult.fromResult)
62
70
63
- let readStreamMessages : IStreamStore -> ReadingDirection -> StreamDetails -> MessageCount -> AsyncResult < List < StreamMessage >, string > =
64
- fun store readingDirection streamDetails msgCount ->
65
- Read.readFromStreamAsync store readingDirection streamDetails msgCount
71
+ let readStreamMessages : SqlStreamStore. IStreamStore -> ReadingDirection -> ReadStreamDetails -> MessageCount -> AsyncResult < List < StreamMessage >, string > =
72
+ fun store readingDirection readStreamDetails msgCount ->
73
+ Read.readFromStreamAsync store readingDirection readStreamDetails msgCount
66
74
|> Async.bind ( fun readStreamPage ->
67
75
readStreamPage.Messages
68
76
|> Seq.toList
0 commit comments