Skip to content

Commit 087adde

Browse files
committed
driver connection
1 parent 79ef367 commit 087adde

File tree

3 files changed

+36
-19
lines changed

3 files changed

+36
-19
lines changed

build.sbt

+1-4
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,11 @@ publishArtifact := false
88
publish := {}
99
publishLocal := {}
1010

11-
publishArtifact := false
12-
publish := {}
13-
publishLocal := {}
1411

1512
lazy val commonSettings = Seq(
1613
name := "akka-reactivemongo-plugin",
1714
organization := "null-vector",
18-
version := s"1.4.5",
15+
version := s"1.4.6",
1916
scalaVersion := scala213,
2017
crossScalaVersions := supportedScalaVersions,
2118
scalacOptions := Seq(

core/src/main/scala/org/nullvector/Collections.scala

+12-7
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,11 @@ class Collections(databaseProvider: DatabaseProvider, system: ExtendedActorSyste
2929
config.getString("akka-persistence-reactivemongo.collection-name-mapping")
3030
).get.getDeclaredConstructor(classOf[Config]).newInstance(config)
3131

32-
def database = currentDatabaseProvider.database
32+
private def database = currentDatabaseProvider.database
3333

3434
override def receive: Receive = {
35-
case SetDatabaseProvider(databaseProvider, ack) =>
36-
currentDatabaseProvider = databaseProvider
35+
case SetDatabaseProvider(aDatabaseProvider, ack) =>
36+
currentDatabaseProvider = aDatabaseProvider
3737
ack.success(Done)
3838

3939
case GetJournalCollectionNameFor(persistentId, promise) =>
@@ -51,10 +51,15 @@ class Collections(databaseProvider: DatabaseProvider, system: ExtendedActorSyste
5151
promisedDone success Done
5252

5353
case GetJournals(response) =>
54-
response completeWith (for {
55-
names <- database.collectionNames
56-
collections = names.filter(_.startsWith(journalPrefix)).map(database.collection[BSONCollection](_))
57-
} yield collections)
54+
val collections = database.collectionNames.map(_.filter(_.startsWith(journalPrefix))).flatMap { names =>
55+
Future.traverse(names) { name =>
56+
val promisedCollection = Promise[BSONCollection]
57+
promisedCollection completeWith verifiedJournalCollection(name)
58+
promisedCollection.future
59+
}
60+
}
61+
response completeWith collections
62+
5863
}
5964

6065
private def verifiedJournalCollection(name: String): Future[BSONCollection] = {

core/src/main/scala/org/nullvector/ReactiveMongoDriver.scala

+23-8
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import akka.util.Timeout
66
import com.typesafe.config.ConfigFactory
77
import org.nullvector.ReactiveMongoDriver.QueryType.QueryType
88
import org.nullvector.ReactiveMongoDriver.{DatabaseProvider, QueryType}
9+
import org.slf4j.{Logger, LoggerFactory}
910
import play.api.libs.json.{JsString, Json}
1011
import reactivemongo.api.bson.BSONDocument
1112
import reactivemongo.api.bson.collection.BSONCollection
@@ -33,16 +34,30 @@ object ReactiveMongoDriver extends ExtensionId[ReactiveMongoDriver] with Extensi
3334
}
3435

3536
class ReactiveMongoDriver(system: ExtendedActorSystem) extends Extension {
37+
protected val logger: Logger = LoggerFactory.getLogger(getClass)
38+
3639
private val dispatcherName = "akka-persistence-reactivemongo-dispatcher"
3740
protected implicit val dispatcher: ExecutionContext = system.dispatchers.lookup(dispatcherName)
3841
private implicit val timeout: Timeout = Timeout(5.seconds)
3942
private val defaultProvider: DatabaseProvider = new DatabaseProvider {
4043
private lazy val db: DB = {
4144
val mongoUri = system.settings.config.getString("akka-persistence-reactivemongo.mongo-uri")
45+
logger.info("Connecting to {}", mongoUri)
4246
Await.result(
4347
MongoConnection.fromString(mongoUri).flatMap { parsedUri =>
44-
val databaseName = parsedUri.db.getOrElse(throw new Exception("Missing database name"))
45-
AsyncDriver(system.settings.config).connect(parsedUri).flatMap(_.database(databaseName))
48+
parsedUri.db match {
49+
case Some(databaseName) =>
50+
AsyncDriver(system.settings.config).connect(parsedUri).flatMap(_.database(databaseName))
51+
.recover {
52+
case throwable: Throwable =>
53+
logger.error(throwable.getMessage, throwable)
54+
throw throwable
55+
}
56+
case None =>
57+
val exception = new IllegalStateException(s"Missing Database Name in $mongoUri")
58+
logger.error(exception.getMessage, exception)
59+
throw exception
60+
}
4661
},
4762
30.seconds
4863
)
@@ -96,11 +111,11 @@ class ReactiveMongoDriver(system: ExtendedActorSystem) extends Extension {
96111
}
97112

98113
(extractValue("mongodb.explain-all").map(_ => QueryType.All) ::
99-
extractValue("mongodb.explain-recovery").map(_ => QueryType.Recovery) ::
100-
extractValue("mongodb.explain-highest-seq").map(_ => QueryType.HighestSeq) ::
101-
extractValue("mongodb.explain-load-snapshot").map(_ => QueryType.LoadSnapshot) ::
102-
extractValue("mongodb.explain-events-by-tag").map(_ => QueryType.EventsByTag) ::
103-
Nil).flatten
114+
extractValue("mongodb.explain-recovery").map(_ => QueryType.Recovery) ::
115+
extractValue("mongodb.explain-highest-seq").map(_ => QueryType.HighestSeq) ::
116+
extractValue("mongodb.explain-load-snapshot").map(_ => QueryType.LoadSnapshot) ::
117+
extractValue("mongodb.explain-events-by-tag").map(_ => QueryType.EventsByTag) ::
118+
Nil).flatten
104119
}
105120

106121
def explain(collection: BSONCollection)(queryType: QueryType.QueryType, queryBuilder: collection.QueryBuilder) = {
@@ -114,7 +129,7 @@ class ReactiveMongoDriver(system: ExtendedActorSystem) extends Extension {
114129
(queryType: QueryType.QueryType, stages: List[collection.PipelineOperator], hint: Option[collection.Hint]) = {
115130
if (shoudExplain(queryType)) {
116131
collection
117-
.aggregatorContext[BSONDocument](stages,explain = true, hint = hint)
132+
.aggregatorContext[BSONDocument](stages, explain = true, hint = hint)
118133
.prepared
119134
.cursor
120135
.collect[List]()

0 commit comments

Comments
 (0)