@@ -7,8 +7,9 @@ import akka.persistence.state.scaladsl.{DurableStateStore, DurableStateUpdateSto
7
7
import org .nullvector .ReactiveMongoDriver
8
8
import org .nullvector .crud .ReactiveMongoCrud .Schema
9
9
import org .nullvector .typed .ReactiveMongoEventSerializer
10
- import reactivemongo .api .bson .BSONDocument
10
+ import reactivemongo .api .bson .{ BSONDateTime , BSONDocument }
11
11
12
+ import java .time .{Clock , Instant }
12
13
import scala .concurrent .{ExecutionContext , Future }
13
14
14
15
object ReactiveMongoCrud {
@@ -18,15 +19,18 @@ object ReactiveMongoCrud {
18
19
val payload = " payload"
19
20
val manifest = " manifest"
20
21
val revision = " revision"
22
+ val created = " created"
23
+ val updated = " updated"
21
24
val tags = " tags"
22
25
}
23
26
}
24
27
25
- class ReactiveMongoCrud (system : ActorSystem [_ ]) extends DurableStateStore [Any ] with DurableStateUpdateStore [Any ] {
28
+ class ReactiveMongoCrud (system : ActorSystem [? ]) extends DurableStateStore [Any ] with DurableStateUpdateStore [Any ] {
26
29
private implicit lazy val dispatcher : ExecutionContext =
27
30
system.dispatchers.lookup(DispatcherSelector .fromConfig(" akka-persistence-reactivemongo-dispatcher" ))
28
31
private val driver : ReactiveMongoDriver = ReactiveMongoDriver (system)
29
32
private val serializer : ReactiveMongoEventSerializer = ReactiveMongoEventSerializer (system)
33
+ private val utcClock : Clock = Clock .systemUTC()
30
34
31
35
override def getObject (persistenceId : String ): Future [GetObjectResult [Any ]] = {
32
36
for {
@@ -39,24 +43,26 @@ class ReactiveMongoCrud(system: ActorSystem[_]) extends DurableStateStore[Any] w
39
43
serializer
40
44
.deserialize(PersistentRepr (payload = payload, manifest = manifest))
41
45
.map(rep => Some (rep.payload) -> revision)
42
- case None => Future .successful(None , 1L )
46
+ case None => Future .successful(None , 0L )
43
47
}
44
48
} yield GetObjectResult (found, revision)
45
49
}
46
50
override def upsertObject (persistenceId : String , revision : Long , value : Any , tag : String ): Future [Done ] = {
51
+ val nowBsonDateTime = BSONDateTime (Instant .now(utcClock).toEpochMilli)
47
52
for {
48
53
coll <- driver.crudCollection(persistenceId)
49
54
rep <- serializer.serialize(PersistentRepr (value))
50
55
_ <- coll
51
56
.findAndUpdate(
52
57
BSONDocument (Schema .persistenceId -> persistenceId, Schema .revision -> (revision - 1 )),
53
58
BSONDocument (
54
- " $set" -> BSONDocument (
59
+ " $set" -> ( BSONDocument (
55
60
Schema .payload -> rep._1.payload.asInstanceOf [BSONDocument ],
56
61
Schema .manifest -> rep._1.manifest,
57
62
Schema .revision -> revision,
58
- Schema .tags -> rep._2
59
- )
63
+ Schema .tags -> rep._2,
64
+ Schema .updated -> nowBsonDateTime
65
+ ) ++ (if (revision == 1 ) BSONDocument (Schema .created -> nowBsonDateTime) else BSONDocument ()))
60
66
),
61
67
upsert = true
62
68
)
@@ -68,4 +74,11 @@ class ReactiveMongoCrud(system: ActorSystem[_]) extends DurableStateStore[Any] w
68
74
_ <- coll.findAndRemove(BSONDocument (Schema .persistenceId -> persistenceId))
69
75
} yield Done
70
76
}
77
+
78
+ override def deleteObject (persistenceId : String , revision : Long ): Future [Done ] = {
79
+ for {
80
+ coll <- driver.crudCollection(persistenceId)
81
+ _ <- coll.findAndRemove(BSONDocument (Schema .persistenceId -> persistenceId, Schema .revision -> revision))
82
+ } yield Done
83
+ }
71
84
}
0 commit comments