Skip to content

Commit 46c36ee

Browse files
committed
Important fix using indices in events by tag.
1 parent 529ad05 commit 46c36ee

7 files changed

+44
-18
lines changed

build.sbt

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ publishLocal := {}
1212
lazy val commonSettings = Seq(
1313
name := "akka-reactivemongo-plugin",
1414
organization := "null-vector",
15-
version := s"1.5.1",
15+
version := s"1.5.2",
1616
scalaVersion := scala213,
1717
crossScalaVersions := supportedScalaVersions,
1818
scalacOptions := Seq(

core/src/main/scala/org/nullvector/Collections.scala

+19-6
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,15 @@ class Collections(system: ExtendedActorSystem) extends Actor with ActorLogging {
5555
verifiedNames.clear()
5656
promisedDone success Done
5757

58-
case GetJournals(response) =>
59-
val collections = database.flatMap(_.collectionNames.map(_.filter(_.startsWith(journalPrefix))).flatMap { names =>
58+
case GetJournals(response, collectionNames) =>
59+
val collections = database.flatMap(_
60+
.collectionNames.map { allNames =>
61+
val journalNames = allNames.filter(_.startsWith(journalPrefix))
62+
collectionNames match {
63+
case Nil => journalNames
64+
case _ => journalNames.filter(name => collectionNames.exists(colName => name.endsWith(colName)))
65+
}
66+
}.flatMap { names =>
6067
Future.traverse(names) { name =>
6168
val promisedCollection = Promise[BSONCollection]
6269
promisedCollection completeWith verifiedJournalCollection(name)
@@ -67,7 +74,7 @@ class Collections(system: ExtendedActorSystem) extends Actor with ActorLogging {
6774

6875
case CheckHealth(ack) =>
6976
val collections = Promise[List[BSONCollection]]
70-
context.self ! GetJournals(collections)
77+
context.self ! GetJournals(collections, Nil)
7178
val eventualDone = collections.future.map(_.headOption).flatMap {
7279
case Some(collection) => collection.find(BSONDocument.empty).one.map(_ => Done)
7380
case None => Future.successful(Done)
@@ -162,10 +169,16 @@ class Collections(system: ExtendedActorSystem) extends Actor with ActorLogging {
162169
}
163170

164171
private def ensureTagIndex(indexesManager: CollectionIndexesManager): Future[Unit] = {
165-
ensureIndex(index(Seq(
172+
val tagsById = ensureIndex(index(Seq(
166173
"_id" -> IndexType.Ascending,
167174
Fields.tags -> IndexType.Ascending,
168-
), Some("_tags"), unique = true, sparse = true), indexesManager)
175+
), Some("tags_by_id"), unique = true, sparse = true), indexesManager)
176+
177+
val allTags = ensureIndex(index(Seq(
178+
Fields.tags -> IndexType.Ascending,
179+
), Some("tags"), sparse = true), indexesManager)
180+
181+
tagsById flatMap (_ => allTags)
169182
}
170183

171184
private def ensureIndex(index: Aux[BSONSerializationPack.type], indexesManager: CollectionIndexesManager): Future[Unit] = {
@@ -209,7 +222,7 @@ object Collections {
209222

210223
case class GetSnapshotCollectionNameFor(persistentId: String, response: Promise[BSONCollection]) extends Command
211224

212-
case class GetJournals(response: Promise[List[BSONCollection]]) extends Command
225+
case class GetJournals(response: Promise[List[BSONCollection]], collectionNames: List[String]) extends Command
213226

214227
case class SetDatabaseProvider(databaseProvider: DatabaseProvider, ack: Promise[Done]) extends Command
215228

core/src/main/scala/org/nullvector/ReactiveMongoDriver.scala

+3-2
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,10 @@ class ReactiveMongoDriver(system: ExtendedActorSystem) extends Extension {
5858
promise.future
5959
}
6060

61-
def journals(): Future[List[BSONCollection]] = {
61+
62+
def journals(collectionNames: List[String] = Nil): Future[List[BSONCollection]] = {
6263
val promise = Promise[List[BSONCollection]]()
63-
collections ! GetJournals(promise)
64+
collections ! GetJournals(promise, collectionNames)
6465
promise.future
6566
}
6667

core/src/main/scala/org/nullvector/query/EventsQueries.scala

+8-3
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ trait EventsQueries
108108
}
109109

110110
def eventsByTagQuery(tags: Seq[String], offset: Offset): Source[EventEnvelope, NotUsed] = {
111-
Source.future(rxDriver.journals())
111+
Source.future(rxDriver.journals(collectionNames))
112112
.withAttributes(ActorAttributes.dispatcher(ReactiveMongoPlugin.pluginDispatcherName))
113113
.mapConcat(identity)
114114
.splitWhen(_ => true)
@@ -127,12 +127,17 @@ trait EventsQueries
127127

128128
import collection.AggregationFramework._
129129

130+
val filterByOffsetExp = filterByOffset(offset)
130131
val stages: List[PipelineOperator] = List(
131-
Match(query(Fields.tags) ++ filterByOffset(offset)),
132+
Match(query(Fields.tags) ++ filterByOffsetExp),
132133
UnwindField(Fields.events),
133134
Match(query(s"${Fields.events}.${Fields.tags}")),
134135
)
135-
val hint = Some(collection.hint(BSONDocument("_id" -> 1, Fields.tags -> 1)))
136+
val hint = filterByOffsetExp match {
137+
case BSONDocument.empty => Some(collection.hint(BSONDocument(Fields.tags -> 1)))
138+
case _ => Some(collection.hint(BSONDocument("_id" -> 1, Fields.tags -> 1)))
139+
}
140+
136141
rxDriver.explainAgg(collection)(QueryType.EventsByTag, stages, hint)
137142

138143
def aggregate(implicit producer: CursorProducer[BSONDocument]): producer.ProducedCursor = {

core/src/main/scala/org/nullvector/query/ReactiveMongoJournalProvider.scala

+11-4
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,17 @@ class ReactiveMongoJournalProvider(system: ExtendedActorSystem) extends ReadJour
1616

1717
import akka.actor.typed.scaladsl.adapter._
1818

19-
override val scaladslReadJournal: ReactiveMongoScalaReadJournal = UnderlyingPersistenceFactory(
20-
new ReactiveMongoScalaReadJournalImpl(system),
21-
new FromMemoryReadJournal(system.toTyped)
22-
)(system)
19+
override val scaladslReadJournal: ReactiveMongoScalaReadJournal = createUnderlyingFactory(Nil)
20+
21+
def readJournalFor(collectionNames: List[String]) = createUnderlyingFactory(collectionNames)
22+
23+
private def createUnderlyingFactory(names: List[String]) = {
24+
UnderlyingPersistenceFactory(
25+
new ReactiveMongoScalaReadJournalImpl(system, names),
26+
new FromMemoryReadJournal(system.toTyped)
27+
)(system)
28+
}
29+
2330

2431
override val javadslReadJournal: ReactiveMongoJavaReadJournal = new ReactiveMongoJavaReadJournal(scaladslReadJournal)
2532
}

core/src/main/scala/org/nullvector/query/ReactiveMongoScalaReadJournalImpl.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import java.util.concurrent.TimeUnit
1212
import scala.concurrent.ExecutionContext
1313
import scala.concurrent.duration._
1414

15-
class ReactiveMongoScalaReadJournalImpl(system: ExtendedActorSystem)
15+
class ReactiveMongoScalaReadJournalImpl(system: ExtendedActorSystem, protected val collectionNames: List[String])
1616
extends akka.persistence.query.scaladsl.ReadJournal
1717
with EventsQueries
1818
with PersistenceIdsQueries with ReactiveMongoScalaReadJournal {

core/src/test/scala/org/nullvector/queries/ReactiveMongoReadJournalSpec.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ class ReactiveMongoReadJournalSpec() extends FlatSpec with TestKitBase with Impl
3737
val reactiveMongoJournalImpl: ReactiveMongoJournalImpl = new ReactiveMongoJournalImpl(ConfigFactory.load(), system)
3838

3939
implicit val materializer: Materializer = Materializer.matFromSystem(system)
40-
val readJournal: ReactiveMongoScalaReadJournal = ReactiveMongoJournalProvider(system).scaladslReadJournal
40+
val readJournal: ReactiveMongoScalaReadJournal = ReactiveMongoJournalProvider(system).readJournalFor(Nil)
4141
private val serializer = ReactiveMongoEventSerializer(system.toTyped)
4242
serializer.addAdapter(new SomeEventAdapter())
4343

0 commit comments

Comments
 (0)