Skip to content

Commit 49f8762

Browse files
committed
Upgrade libs versions
1 parent 63ad04c commit 49f8762

21 files changed

+373
-151
lines changed

Diff for: api/src/main/scala/org/nullvector/package.scala

+10-11
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,23 @@
11
package org
22

33
import reactivemongo.api.bson.{BSONDocumentHandler, BSONDocumentReader, BSONDocumentWriter, BSONReader, BSONWriter}
4-
import reactivemongo.api.commands.{MultiBulkWriteResult, WriteResult}
4+
import reactivemongo.api.commands.WriteResult
55

66
import scala.concurrent.{ExecutionContext, Future}
77
import scala.util.{Failure, Success, Try}
88

99
package object nullvector {
1010
type BSONDocumentMapping[T] = BSONDocumentReader[T] with BSONDocumentWriter[T]
1111

12-
implicit def futureWriteResult2Try(futureResult: Future[WriteResult])(implicit ec: ExecutionContext): Future[Try[Unit]] = {
13-
futureResult.map(result =>
14-
if (result.ok) Success({}) else Failure(new Exception(result.writeErrors.map(_.toString).mkString("\n")))
15-
)
16-
}
12+
// implicit def futureWriteResult2Try(futureResult: Future[WriteResult])(implicit ec: ExecutionContext): Future[Unit] = {
13+
// futureResult
14+
// .map(result => if (result.writeErrors.isEmpty) () else new Exception(result.writeErrors.map(_.toString).mkString("\n")))
15+
// }
1716

18-
implicit def futureBulkWriteResult2Try(futureResult: Future[MultiBulkWriteResult])(implicit ec: ExecutionContext): Future[Try[Unit]] = {
19-
futureResult.map(result =>
20-
if (result.ok) Success({}) else Failure(new Exception(result.writeErrors.map(_.toString).mkString("\n")))
21-
)
22-
}
17+
// implicit def futureBulkWriteResult2Try(futureResult: Future[MultiBulkWriteResult])(implicit ec: ExecutionContext): Future[Try[Unit]] = {
18+
// futureResult.map(result =>
19+
// if (result.ok) Success({}) else Failure(new Exception(result.writeErrors.map(_.toString).mkString("\n")))
20+
// )
21+
// }
2322

2423
}

Diff for: build.sbt

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
lazy val scala212 = "2.12.12"
22
lazy val scala213 = "2.13.3"
33
lazy val supportedScalaVersions = List(scala212, scala213)
4-
lazy val akkaVersion = "2.6.9"
5-
lazy val rxmongoVersion = "0.20.11"
4+
lazy val akkaVersion = "2.6.10"
5+
lazy val rxmongoVersion = "1.0.0"
66

77
lazy val commonSettings = Seq(
88
name := "akka-reactivemongo-plugin",
99
organization := "null-vector",
10-
version := "1.3.17",
10+
version := "1.4.0-SNAPSHOT",
1111
scalaVersion := scala213,
1212
crossScalaVersions := supportedScalaVersions,
1313
scalacOptions := Seq(

Diff for: core/src/main/scala/org/nullvector/PersistInMemory.scala

+33-4
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
package org.nullvector
22

3-
import akka.Done
3+
import akka.{Done, NotUsed}
44
import akka.actor.typed.scaladsl.Behaviors
55
import akka.actor.typed.{ActorRef, Behavior, Extension, ExtensionId, ActorSystem => TypedActorSystem}
6+
import akka.stream.scaladsl.Source
67
import org.nullvector.query.ObjectIdOffset
78
import reactivemongo.api.bson.BSONDocument
89

9-
import scala.collection.mutable
10+
import scala.collection.{View, mutable}
1011
import scala.collection.mutable.ListBuffer
1112
import scala.concurrent.{Future, Promise}
1213

@@ -20,6 +21,8 @@ object PersistInMemory extends ExtensionId[PersistInMemory] {
2021

2122
case class EventsOf(persistenceId: String, replyEvents: Either[ActorRef[Seq[EventEntry]], Promise[Seq[EventEntry]]]) extends Command
2223

24+
case class AllEvents(replyEvents: Either[ActorRef[Source[(String, EventEntry), NotUsed]], Promise[Source[(String, EventEntry), NotUsed]]]) extends Command
25+
2326
case class SnapshotsOf(persistenceId: String, replySnapshot: Either[ActorRef[Seq[SnapshotEntry]], Promise[Seq[SnapshotEntry]]]) extends Command
2427

2528
case class HighestSequenceOf(persistenceId: String, replyMaxSeq: Either[ActorRef[Long], Promise[Long]]) extends Command
@@ -28,8 +31,10 @@ object PersistInMemory extends ExtensionId[PersistInMemory] {
2831

2932
case class RemoveSnapshotsOf(persistenceId: String, sequences: SequenceRange, replyDone: Either[ActorRef[Done], Promise[Done]]) extends Command
3033

34+
case class InvalidateAll(replyDone: Either[ActorRef[Done], Promise[Done]]) extends Command
35+
3136
case class EventEntry(sequence: Long, manifest: String, event: BSONDocument, tags: Set[String], offset: Option[ObjectIdOffset] = None) {
32-
def withOffset(): EventEntry = copy(offset = Some(ObjectIdOffset.newOffset()))
37+
def withOffset(): EventEntry = copy(offset = offset.orElse(Some(ObjectIdOffset.newOffset())))
3338
}
3439

3540
case class SnapshotEntry(sequence: Long, manifest: String, event: BSONDocument, timestamp: Long)
@@ -82,7 +87,6 @@ object PersistInMemory extends ExtensionId[PersistInMemory] {
8287
}
8388
Behaviors.same
8489

85-
8690
case HighestSequenceOf(persistenceId, replyMaxSeq) =>
8791
val maxEventSeq = eventsById.getOrElse(persistenceId, Nil).lastOption.map(_.sequence).getOrElse(0L)
8892
val maxSnapshotSeq = snapshotById.getOrElse(persistenceId, Nil).lastOption.map(_.sequence).getOrElse(0L)
@@ -96,6 +100,18 @@ object PersistInMemory extends ExtensionId[PersistInMemory] {
96100
}
97101
reply(replyDone, Done)
98102
Behaviors.same
103+
104+
case AllEvents(replyEvents) =>
105+
val flatten = eventsById.view.flatMap(entry => entry._2.map(event => entry._1 -> event)).iterator
106+
val source = Source.fromIterator(() => flatten)
107+
reply(replyEvents, source)
108+
Behaviors.same
109+
110+
case InvalidateAll(replyDone) =>
111+
eventsById.clear()
112+
snapshotById.clear()
113+
reply(replyDone, Done)
114+
Behaviors.same
99115
}
100116
}
101117

@@ -153,4 +169,17 @@ class PersistInMemory(system: TypedActorSystem[_]) extends Extension {
153169
promisedDone.future
154170
}
155171

172+
def allEvents(): Future[Source[(String, EventEntry), NotUsed]] = {
173+
val promisedDone = Promise[Source[(String, EventEntry), NotUsed]]()
174+
persistInMemory.tell(AllEvents(Right(promisedDone)))
175+
promisedDone.future
176+
}
177+
178+
def invalidateAll(): Future[Done] = {
179+
val promisedDone = Promise[Done]()
180+
persistInMemory.tell(InvalidateAll(Right(promisedDone)))
181+
promisedDone.future
182+
}
183+
184+
156185
}

Diff for: core/src/main/scala/org/nullvector/ReactiveMongoDriver.scala

+7-8
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@ import akka.actor.{Actor, ActorLogging, ActorRef, ExtendedActorSystem, Extension
44
import akka.util.Timeout
55
import reactivemongo.api.bson.BSONDocument
66
import reactivemongo.api.bson.collection.{BSONCollection, BSONSerializationPack}
7-
import reactivemongo.api.commands.CommandError
7+
import reactivemongo.api.commands.CommandException
88
import reactivemongo.api.indexes.{CollectionIndexesManager, Index, IndexType}
9-
import reactivemongo.api.{AsyncDriver, DefaultDB, MongoConnection}
9+
import reactivemongo.api.{AsyncDriver, DB, MongoConnection}
1010

1111
import scala.collection.mutable
1212
import scala.concurrent.duration._
@@ -26,9 +26,9 @@ class ReactiveMongoDriver(system: ExtendedActorSystem) extends Extension {
2626
protected implicit val dispatcher: ExecutionContext = system.dispatchers.lookup("akka-persistence-reactivemongo-dispatcher")
2727

2828
private implicit val timeout: Timeout = Timeout(5.seconds)
29-
private val collections: ActorRef = system.actorOf(Props(new Collections()))
29+
private val collections: ActorRef = system.systemActorOf(Props(new Collections()), "ReactiveMongoDriverCollections")
3030

31-
private val database: DefaultDB = {
31+
private val database: DB = {
3232
val mongoUri = system.settings.config.getString("akka-persistence-reactivemongo.mongo-uri")
3333
Await.result(
3434
MongoConnection.fromString(mongoUri).flatMap { parsedUri =>
@@ -82,7 +82,7 @@ class ReactiveMongoDriver(system: ExtendedActorSystem) extends Extension {
8282
if (!verifiedNames.contains(collectionName)) {
8383
val collection = database.collection[BSONCollection](collectionName)
8484
(for {
85-
_ <- collection.create().recover { case e: CommandError if e.code.contains(48) => () }
85+
_ <- collection.create().recover { case CommandException.Code(48) => () }
8686
_ <- ensurePidSeqIndex(collection.indexesManager)
8787
_ <- ensureTagIndex(collection.indexesManager)
8888
_ <- Future.successful(self ! AddVerified(collectionName))
@@ -97,7 +97,7 @@ class ReactiveMongoDriver(system: ExtendedActorSystem) extends Extension {
9797
if (!verifiedNames.contains(collectionName)) {
9898
val collection = database.collection[BSONCollection](collectionName)
9999
(for {
100-
_ <- collection.create().recover { case e: CommandError if e.code.contains(48) => () }
100+
_ <- collection.create().recover { case CommandException.Code(48) => () }
101101
_ <- ensureSnapshotIndex(collection.indexesManager)
102102
_ <- Future.successful(self ! AddVerified(collectionName))
103103
} yield ())
@@ -148,9 +148,8 @@ class ReactiveMongoDriver(system: ExtendedActorSystem) extends Extension {
148148
key = key,
149149
name = name,
150150
unique = unique,
151-
sparse = sparse,
152151
background = false,
153-
dropDups = false,
152+
sparse = false,
154153
expireAfterSeconds = None,
155154
storageEngine = None,
156155
weights = None,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package org.nullvector
2+
3+
import akka.actor.ActorSystem
4+
5+
object UnderlyingPersistenceFactory {
6+
7+
def apply[T](persistInMongo: => T, persistInMemory: => T)(implicit system: ActorSystem): T = {
8+
val mustPersistInMemory = system.settings.config.getBoolean("akka-persistence-reactivemongo.persist-in-memory")
9+
if (mustPersistInMemory) persistInMemory else persistInMongo
10+
}
11+
12+
}

Diff for: core/src/main/scala/org/nullvector/journal/ReactiveMongoAsyncDeleteMessages.scala

+7-2
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package org.nullvector.journal
22

33
import org.nullvector._
44
import reactivemongo.api.bson.BSONDocument
5+
import reactivemongo.api.commands.CollectionCommand
56

67
import scala.concurrent.Future
78
import scala.util.{Failure, Success, Try}
@@ -17,7 +18,11 @@ trait ReactiveMongoAsyncDeleteMessages {
1718
Fields.persistenceId -> persistenceId,
1819
Fields.to_sn -> BSONDocument("$lte" -> toSequenceNr),
1920
), None, None
20-
).flatMap(el => deleteBuilder.many(Seq(el)): Future[Try[Unit]])
21-
}.transform(_.flatMap(identity))
21+
).flatMap(el => deleteBuilder.many(Seq(el)).map(result => result.errmsg match {
22+
case Some(error) => throw new Exception(error)
23+
case None => ()
24+
}))
25+
26+
}
2227
}
2328
}

Diff for: core/src/main/scala/org/nullvector/journal/ReactiveMongoAsyncWrite.scala

+4-2
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import org.nullvector.{Fields, _}
55
import reactivemongo.api.bson._
66

77
import scala.concurrent.Future
8-
import scala.util.Try
8+
import scala.util.{Failure, Success, Try}
99

1010
trait ReactiveMongoAsyncWrite {
1111
this: ReactiveMongoJournalImpl =>
@@ -28,7 +28,9 @@ trait ReactiveMongoAsyncWrite {
2828
)
2929
}
3030
}
31-
results <- Future.traverse(atomicDocs)(doc => collection.insert(ordered = true).one(doc): Future[Try[Unit]])
31+
results <- Future.traverse(atomicDocs)(doc => collection.insert(ordered = true).one(doc)
32+
.map(result => if (result.writeErrors.isEmpty) Success() else Failure(new Exception(result.writeErrors.map(_.toString).mkString("\n"))))
33+
)
3234
} yield results
3335
}
3436

Diff for: core/src/main/scala/org/nullvector/journal/ReactiveMongoJournal.scala

+5-6
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,16 @@ import akka.actor.ActorSystem
44
import akka.persistence.{AtomicWrite, PersistentRepr}
55
import akka.persistence.journal.AsyncWriteJournal
66
import com.typesafe.config.Config
7+
import org.nullvector.{PersistInMemory, UnderlyingPersistenceFactory}
78

89
import scala.concurrent.Future
910
import scala.util.Try
1011

1112
class ReactiveMongoJournal(val aConfig: Config) extends AsyncWriteJournal {
12-
private val persistInMemory: Boolean = context.system.settings.config.getBoolean("akka-persistence-reactivemongo.persist-in-memory")
13-
private val asyncWriteJournalOps: AsyncWriteJournalOps =
14-
if (!persistInMemory)
15-
new ReactiveMongoJournalImpl(aConfig, context.system)
16-
else
17-
new InMemoryAsyncWriteJournal(context.system)
13+
14+
private val asyncWriteJournalOps: AsyncWriteJournalOps = UnderlyingPersistenceFactory(
15+
new ReactiveMongoJournalImpl(aConfig, context.system),new InMemoryAsyncWriteJournal(context.system)
16+
)(context.system)
1817

1918
override def asyncWriteMessages(messages: Seq[AtomicWrite]): Future[Seq[Try[Unit]]] =
2019
asyncWriteJournalOps.asyncWriteMessages(messages)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package org.nullvector.query
2+
3+
import akka.NotUsed
4+
import akka.persistence.query.{EventEnvelope, Offset}
5+
import akka.stream.scaladsl.Source
6+
7+
trait CustomReadOps {
8+
9+
/*
10+
* Same as [[EventsQueries#currentEventsByTag]] but events aren't serialized, instead
11+
* the `EventEnvelope` will contain the raw `BSONDocument`
12+
*/
13+
def currentRawEventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed]
14+
15+
def currentRawEventsByTag(tags: Seq[String], offset: Offset): Source[EventEnvelope, NotUsed]
16+
17+
def currentEventsByTags(tags: Seq[String], offset: Offset): Source[EventEnvelope, NotUsed]
18+
}

Diff for: core/src/main/scala/org/nullvector/query/EventsQueries.scala

+20-24
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,9 @@ trait EventsQueries
1313
extends akka.persistence.query.scaladsl.EventsByTagQuery
1414
with akka.persistence.query.scaladsl.EventsByPersistenceIdQuery
1515
with akka.persistence.query.scaladsl.CurrentEventsByTagQuery
16-
with akka.persistence.query.scaladsl.CurrentEventsByPersistenceIdQuery {
16+
with akka.persistence.query.scaladsl.CurrentEventsByPersistenceIdQuery with CustomReadOps {
1717

18-
this: ReactiveMongoScalaReadJournal =>
18+
this: ReactiveMongoScalaReadJournalImpl =>
1919

2020
private val amountOfCores: Int = Runtime.getRuntime.availableProcessors()
2121

@@ -69,16 +69,16 @@ trait EventsQueries
6969
* Same as [[EventsQueries#currentEventsByTag]] but events aren't serialized, instead
7070
* the `EventEnvelope` will contain the raw `BSONDocument`
7171
*/
72-
def currentRawEventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed] = {
72+
override def currentRawEventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed] = {
7373
currentRawEventsByTag(Seq(tag), offset)
7474
}
7575

76-
def currentRawEventsByTag(tags: Seq[String], offset: Offset): Source[EventEnvelope, NotUsed] = {
76+
override def currentRawEventsByTag(tags: Seq[String], offset: Offset): Source[EventEnvelope, NotUsed] = {
7777
implicit val raw: (BSONDocument, BSONDocument) => Future[BSONDocument] = (_, rawPayload) => Future(rawPayload)
7878
eventsByTagQuery(tags, offset)
7979
}
8080

81-
def currentEventsByTags(tags: Seq[String], offset: Offset): Source[EventEnvelope, NotUsed] = {
81+
override def currentEventsByTags(tags: Seq[String], offset: Offset): Source[EventEnvelope, NotUsed] = {
8282
eventsByTagQuery(tags, offset)
8383
}
8484

@@ -109,33 +109,29 @@ trait EventsQueries
109109
}
110110

111111
private def buildFindEventsByTagsQuery(coll: collection.BSONCollection, offset: Offset, tags: Seq[String]) = {
112-
113112
def query(field: String) = BSONDocument(field -> BSONDocument("$in" -> tags))
114113

115-
import coll.aggregationFramework._
116-
117-
val $1stMatch = Match(query(Fields.tags) ++ filterByOffset(offset))
118-
val $unwind = UnwindField(Fields.events)
119-
val $2ndMatch = Match(query(s"${Fields.events}.${Fields.tags}"))
120-
121114
coll
122-
.aggregateWith[BSONDocument]()(_ => ($1stMatch, List($unwind, $2ndMatch)))
115+
.aggregateWith[BSONDocument]()(framework =>
116+
List(
117+
framework.Match(query(Fields.tags) ++ filterByOffset(offset)),
118+
framework.UnwindField(Fields.events),
119+
framework.Match(query(s"${Fields.events}.${Fields.tags}")),
120+
))
123121
.documentSource()
124122
}
125123

126124
private def buildFindEventsByIdQuery(coll: collection.BSONCollection, persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long) = {
127-
import coll.aggregationFramework._
128-
129-
val $match = Match(BSONDocument(
130-
Fields.persistenceId -> persistenceId,
131-
Fields.from_sn -> BSONDocument("$gt" -> fromSequenceNr),
132-
Fields.to_sn -> BSONDocument("$lte" -> toSequenceNr)
133-
))
134-
val $unwind = UnwindField(Fields.events)
135-
val $sort = Sort(Ascending(s"${Fields.events}.${Fields.sequence}"))
136-
137125
coll
138-
.aggregateWith[BSONDocument]()(_ => ($match, List($unwind, $sort)))
126+
.aggregateWith[BSONDocument]()(framework => List(
127+
framework.Match(BSONDocument(
128+
Fields.persistenceId -> persistenceId,
129+
Fields.from_sn -> BSONDocument("$gt" -> fromSequenceNr),
130+
Fields.to_sn -> BSONDocument("$lte" -> toSequenceNr)
131+
)),
132+
framework.UnwindField(Fields.events),
133+
framework.Sort(framework.Ascending(s"${Fields.events}.${Fields.sequence}"))
134+
))
139135
.documentSource()
140136
}
141137

0 commit comments

Comments
 (0)