Skip to content

Commit c900e1c

Browse files
committed
Using hint for recovery
1 parent 087adde commit c900e1c

File tree

5 files changed

+35
-16
lines changed

5 files changed

+35
-16
lines changed

README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ Add in your `build.sbt` the following lines:
1212
resolvers += "null-vector" at "https://nullvector.jfrog.io/artifactory/releases"
1313
```
1414
```scala
15-
libraryDependencies += "null-vector" %% "akka-reactivemongo-plugin" % "1.4.5"
15+
libraryDependencies += "null-vector" %% "akka-reactivemongo-plugin" % "1.4.7"
1616
```
1717

1818
## Configuration

build.sbt

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ publishLocal := {}
1212
lazy val commonSettings = Seq(
1313
name := "akka-reactivemongo-plugin",
1414
organization := "null-vector",
15-
version := s"1.4.6",
15+
version := s"1.4.7",
1616
scalaVersion := scala213,
1717
crossScalaVersions := supportedScalaVersions,
1818
scalacOptions := Seq(

core/src/main/scala/org/nullvector/ReactiveMongoDriver.scala

+3-3
Original file line numberDiff line numberDiff line change
@@ -119,15 +119,15 @@ class ReactiveMongoDriver(system: ExtendedActorSystem) extends Extension {
119119
}
120120

121121
def explain(collection: BSONCollection)(queryType: QueryType.QueryType, queryBuilder: collection.QueryBuilder) = {
122-
if (shoudExplain(queryType)) {
122+
if (shouldExplain(queryType)) {
123123
queryBuilder.explain().cursor().collect[List]()
124124
.map(docs => Try(Json.parse(BsonTextNormalizer(docs.head))).foreach(println))
125125
}
126126
}
127127

128128
def explainAgg(collection: BSONCollection)
129129
(queryType: QueryType.QueryType, stages: List[collection.PipelineOperator], hint: Option[collection.Hint]) = {
130-
if (shoudExplain(queryType)) {
130+
if (shouldExplain(queryType)) {
131131
collection
132132
.aggregatorContext[BSONDocument](stages, explain = true, hint = hint)
133133
.prepared
@@ -138,7 +138,7 @@ class ReactiveMongoDriver(system: ExtendedActorSystem) extends Extension {
138138
}
139139

140140

141-
private def shoudExplain(queryType: QueryType) = {
141+
private def shouldExplain(queryType: QueryType) = {
142142
explainOptions.exists(shouldType => shouldType == QueryType.All || shouldType == queryType)
143143
}
144144
}

core/src/main/scala/org/nullvector/journal/ReactiveMongoAsyncReplay.scala

+9-1
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,16 @@ trait ReactiveMongoAsyncReplay {
2323
Fields.to_sn -> BSONDocument("$gte" -> fromSequenceNr),
2424
Fields.from_sn -> BSONDocument("$lte" -> toSequenceNr),
2525
)
26-
val queryBuilder = collection.find(query)
26+
val queryBuilder = collection
27+
.find(query)
28+
.hint(collection.hint(BSONDocument(
29+
Fields.persistenceId -> 1,
30+
Fields.to_sn -> 1,
31+
Fields.from_sn -> 1,
32+
)))
33+
2734
rxDriver.explain(collection)(QueryType.Recovery, queryBuilder)
35+
2836
queryBuilder
2937
.cursor[BSONDocument]()
3038
.documentSource(if (max >= Int.MaxValue) Int.MaxValue else max.intValue())

core/src/test/scala/org/nullvector/journal/PersistentActorSpec.scala

+21-10
Original file line numberDiff line numberDiff line change
@@ -40,17 +40,19 @@ class PersistentActorSpec() extends TestKitBase with ImplicitSender
4040
actorRef ! Command("get_state") //Will recover Nothing
4141
expectMsg(13.seconds, None)
4242

43-
actorRef ! Command("Command1")
44-
actorRef ! Command("Command2")
45-
actorRef ! Command("Command3")
46-
actorRef ! Command("Command4")
47-
actorRef ! Command("Command5")
48-
actorRef ! Command("Command6")
49-
actorRef ! Command("Command7")
50-
receiveN(7, 15.seconds)
43+
actorRef ! Command("A")
44+
actorRef ! CommandAll(Seq("B","C","D"))
45+
actorRef ! Command("E")
46+
actorRef ! CommandAll(Seq("F","G","H"))
47+
actorRef ! Command("I")
48+
receiveN(4, 15.seconds)
49+
50+
actorRef ! Kill
51+
52+
Thread.sleep(1000)
5153

5254
actorRef ! Command("get_state")
53-
expectMsg(Some("Command7"))
55+
expectMsg(Some("I"))
5456
}
5557

5658
"PersistAll" in {
@@ -116,6 +118,8 @@ class PersistentActorSpec() extends TestKitBase with ImplicitSender
116118

117119
case class Command(action: Any)
118120

121+
case class CommandAll(actions: Seq[Any])
122+
119123
case class MultiCommand(action1: String, action2: String, action3: String)
120124

121125
case class AnEvent(string: String)
@@ -134,13 +138,20 @@ class PersistentActorSpec() extends TestKitBase with ImplicitSender
134138
sender() ! state
135139

136140
case Command(action) =>
137-
138141
persist(AnEvent(action.toString)) { event =>
139142
state = Some(event.string)
140143
sender() ! Done
141144
if (lastSequenceNr % 13 == 0) saveSnapshot(AnEvent(action.toString))
142145
}
143146

147+
case CommandAll(actions) =>
148+
persistAll(actions.map(a => AnEvent(a.toString))) { event =>
149+
}
150+
defer(actions){ _ =>
151+
sender() ! Done
152+
state = Some(actions.last.toString)
153+
}
154+
144155
case MultiCommand(action1, action2, action3) =>
145156
persistAll(Seq(AnEvent(action1), AnEvent(action2), AnEvent(action3))) { _ => }
146157
deferAsync(()) { _ =>

0 commit comments

Comments
 (0)