diff --git a/aws-auth/src/test/scala/monix/connect/aws/auth/MonixAwsConfigSpec.scala b/aws-auth/src/test/scala/monix/connect/aws/auth/MonixAwsConfigSpec.scala index 8bd2a7620..a658524e9 100644 --- a/aws-auth/src/test/scala/monix/connect/aws/auth/MonixAwsConfigSpec.scala +++ b/aws-auth/src/test/scala/monix/connect/aws/auth/MonixAwsConfigSpec.scala @@ -28,6 +28,7 @@ import monix.connect.aws.auth.MonixAwsConf._ import monix.connect.aws.auth.configreader.KebabConfigReader import monix.eval.Task import monix.execution.Scheduler +import monix.execution.exceptions.DummyException import monix.testing.scalatest.MonixTaskTest import java.io.File @@ -128,8 +129,8 @@ class MonixAwsConfigSpec extends AsyncFlatSpec with MonixTaskTest with Matchers Task(configSource.loadOrThrow[AppConf]).map(_.monixAws).attempt.asserting { monixAwsConf => monixAwsConf.isLeft shouldBe true - monixAwsConf.left.get shouldBe a[ConfigReaderException[_]] - monixAwsConf.left.get.getMessage should include("Key not found: 'region'") + monixAwsConf.swap.getOrElse(DummyException("failed")) shouldBe a[ConfigReaderException[_]] + monixAwsConf.swap.getOrElse(DummyException("failed")).getMessage should include("Key not found: 'region'") } } diff --git a/build.sbt b/build.sbt index 5c5d98769..abba17cb7 100644 --- a/build.sbt +++ b/build.sbt @@ -1,5 +1,6 @@ import Dependencies.Versions -import sbt.Keys.version +import sbt.Keys.{target, version} +import sbt.Test val monixConnectSeries = "0.7.0" @@ -17,9 +18,9 @@ inThisBuild(List( ) )) -skip in publish := true //required by sbt-ci-release +publish / publish := true //required by sbt-ci-release -def sharedSettings(publishForScala3: Boolean= true) = { +def sharedSettings(publishForScala3: Boolean= true, fatalWarningsEnables: Boolean = true) = { Seq( scalaVersion := "2.13.8", crossScalaVersions := Seq("2.12.17", "2.13.8") ++ (if (publishForScala3) Seq("3.1.2") else Seq.empty) @@ -40,60 +41,62 @@ def sharedSettings(publishForScala3: Boolean= true) = { "-language:higherKinds", "-language:implicitConversions", "-language:experimental.macros" - ), - //warnUnusedImports - scalacOptions in(Compile, console) ++= Seq("-Ywarn-unused:imports") + ) , + // Linter - scalacOptions ++= Seq( - "-Ywarn-unused:imports", // Warn if an import selector is not referenced. - "-Ywarn-dead-code", // Warn when dead code is identified. + scalacOptions ++= ( + (CrossVersion.partialVersion(scalaVersion.value) match { + case Some((3, _)) => Seq.empty + case _ => Seq( + "-Ywarn-unused:imports", // Warn if an import selector is not referenced. + "-Ywarn-dead-code", // Warn when dead code is identified. + // Turns all warnings into errors ;-) + //temporary disabled for mongodb warn, -YWarn (2.13) and Silencer (2.12) should fix it... + // Enables linter options + "-Xlint:adapted-args", // warn if an argument list is modified to match the receiver + "-Xlint:infer-any", // warn when a type argument is inferred to be `Any` + "-Xlint:missing-interpolator", // a string literal appears to be missing an interpolator id + "-Xlint:doc-detached", // a ScalaDoc comment appears to be detached from its element + "-Xlint:private-shadow", // a private field (or class parameter) shadows a superclass field + "-Xlint:type-parameter-shadow", // a local type parameter shadows a type already in scope + "-Xlint:poly-implicit-overload", // parameterized overloaded implicit methods are not visible as view bounds + "-Xlint:option-implicit", // Option.apply used implicit view + "-Xlint:delayedinit-select", // Selecting member of DelayedInit + "-Ywarn-unused" + //"-Xlint:package-object-classes" // Class or object defined in package object + ) ++ (if(fatalWarningsEnables) Seq("-Xfatal-warnings") else Seq.empty[String]) + }) ++ Seq( // Turns all warnings into errors ;-) //temporary disabled for mongodb warn, -YWarn (2.13) and Silencer (2.12) should fix it... - //"-Xfatal-warnings", //Turning of fatal warnings for the moment // Enables linter options - "-Xlint:adapted-args", // warn if an argument list is modified to match the receiver - "-Xlint:infer-any", // warn when a type argument is inferred to be `Any` - "-Xlint:missing-interpolator", // a string literal appears to be missing an interpolator id - "-Xlint:doc-detached", // a ScalaDoc comment appears to be detached from its element - "-Xlint:private-shadow", // a private field (or class parameter) shadows a superclass field - "-Xlint:type-parameter-shadow", // a local type parameter shadows a type already in scope - "-Xlint:poly-implicit-overload", // parameterized overloaded implicit methods are not visible as view bounds - "-Xlint:option-implicit", // Option.apply used implicit view - "-Xlint:delayedinit-select", // Selecting member of DelayedInit - //"-Xlint:package-object-classes" // Class or object defined in package object - ) + // Note, this is used by the doc-source-url feature to determine the + // relative path of a given source file. If it's not a prefix of a the + // absolute path of the source file, the absolute path of that file + // will be put into the FILE_SOURCE variable, which is + // definitely not what we want. + "-sourcepath", + file(".").getAbsolutePath.replaceAll("[.]$", "") + //"-Xlint:package-object-classes" // Class or object defined in package objecz + ) ++ (if(fatalWarningsEnables) Seq("-Xfatal-warnings") else Seq.empty[String])) , // ScalaDoc settings - scalacOptions in(Compile, doc) ++= Seq("-no-link-warnings") + (Compile / doc / scalacOptions) ++= Seq("-no-link-warnings") , autoAPIMappings := true , - scalacOptions in ThisBuild ++= Seq( - // Note, this is used by the doc-source-url feature to determine the - // relative path of a given source file. If it's not a prefix of a the - // absolute path of the source file, the absolute path of that file - // will be put into the FILE_SOURCE variable, which is - // definitely not what we want. - "-sourcepath", - file(".").getAbsolutePath.replaceAll("[.]$", "") + scalacOptions ++= Seq( + ) , - parallelExecution in Test := true - , - parallelExecution in ThisBuild := true + Test / parallelExecution := true , - testForkedParallel in Test := true - , - testForkedParallel in ThisBuild := true + Test / testForkedParallel := true , concurrentRestrictions in Global += Tags.limit(Tags.Test, 3) , - logBuffered in Test := false - , - logBuffered in IntegrationTest := false - , + Test /logBuffered := false, //dependencyClasspath in IntegrationTest := (dependencyClasspath in IntegrationTest).value ++ (exportedProducts in Test).value, // https://github.com/sbt/sbt/issues/2654 incOptions := incOptions.value.withLogRecompileOnMacro(false) @@ -151,10 +154,8 @@ def mimaSettings(projectName: String) = Seq( mimaBinaryIssueFilters ++= MimaFilters.allMimaFilters ) -mimaFailOnNoPrevious in ThisBuild := false - //ignores scaladoc link warnings (which are -scalacOptions in (Compile, doc) ++= Seq("-no-link-warnings") + (Compile / doc / scalacOptions) ++= Seq("-no-link-warnings") val IT = config("it") extend Test @@ -179,13 +180,13 @@ lazy val hdfs = monixConnector("hdfs", Dependencies.Hdfs) })) -lazy val mongodb = monixConnector("mongodb", Dependencies.MongoDb, isMimaEnabled = false, isITParallelExecution = true, scala3Publish = false) +lazy val mongodb = monixConnector("mongodb", Dependencies.MongoDb, isITParallelExecution = true, scala3Publish = false, fatalWarningsEnabled = false) .settings(libraryDependencies ++= (CrossVersion.partialVersion(scalaVersion.value) match { case Some((3, _)) => Seq.empty case _ => Seq( "org.mongodb.scala" %% "mongo-scala-driver" % Versions.MongoScala, "org.mongodb.scala" %% "mongo-scala-bson" % Versions.MongoScala % Test, - "org.mockito" %% "mockito-scala" % Versions.Mockito % Test cross CrossVersion.for3Use2_13) + "org.mockito" %% "mockito-scala" % Versions.Mockito % Test) })) lazy val parquet = monixConnector("parquet", Dependencies.Parquet, scala3Publish = false) @@ -226,7 +227,8 @@ def monixConnector( projectDependencies: Seq[ModuleID], isMimaEnabled: Boolean = true, isITParallelExecution: Boolean = false, - scala3Publish: Boolean = true) = { + scala3Publish: Boolean = true, + fatalWarningsEnabled: Boolean = true) = { Project(id = connectorName, base = file(connectorName)) .enablePlugins(AutomateHeaderPlugin) .settings(name := s"monix-$connectorName", @@ -235,16 +237,19 @@ def monixConnector( IntegrationTest / parallelExecution := isITParallelExecution, IntegrationTest / testForkedParallel := isITParallelExecution ) - .settings(sharedSettings(scala3Publish)) + .settings(sharedSettings(scala3Publish, fatalWarningsEnabled)) .configs(IntegrationTest, IT) .enablePlugins(AutomateHeaderPlugin) .settings( if(isMimaEnabled) { mimaSettings(s"monix-$connectorName") } else { Seq.empty }, + // skips publishing docs in scala3 due to a bug running task Compile / doc / sources := { if (scalaVersion.value.startsWith("3.")) Seq.empty else (Compile / doc / sources).value }, - Test / doc / sources := { if (scalaVersion.value.startsWith("3.")) Seq.empty else (Compile / doc / sources).value } - ) + Test / doc / sources := { if (scalaVersion.value.startsWith("3.")) Seq.empty else (Test / doc / sources).value }, + doctestGenTests := { Seq.empty }, + doctestOnlyCodeBlocksMode := false, + Test / unidoc / sources := { Seq.empty }) } //=> non published modules @@ -267,34 +272,34 @@ lazy val docs = project .enablePlugins(DocusaurusPlugin, MdocPlugin, ScalaUnidocPlugin) lazy val skipOnPublishSettings = Seq( - skip in publish := true, + publish / skip := true, publishArtifact := false, ) lazy val mdocSettings = Seq( scalacOptions --= Seq("-Xfatal-warnings", "-Ywarn-unused"), crossScalaVersions := Seq(scalaVersion.value), - unidocProjectFilter in (ScalaUnidoc, unidoc) := inProjects(parquet, dynamodb, s3, sqs, elasticsearch, gcs, hdfs, mongodb, redis), - target in (ScalaUnidoc, unidoc) := (baseDirectory in LocalRootProject).value / "website" / "static" / "api", - cleanFiles += (target in (ScalaUnidoc, unidoc)).value, + (ScalaUnidoc / unidoc / unidocProjectFilter) := inProjects(parquet, dynamodb, s3, sqs, elasticsearch, gcs, hdfs, mongodb, redis), + (ScalaUnidoc / unidoc / target) := (baseDirectory in LocalRootProject).value / "website" / "static" / "api", + cleanFiles += (ScalaUnidoc / unidoc / target).value, docusaurusCreateSite := docusaurusCreateSite - .dependsOn(unidoc in Compile) - .dependsOn(updateSiteVariables in ThisBuild) + .dependsOn(Compile / unidoc) + .dependsOn(ThisBuild / updateSiteVariables) .value, docusaurusPublishGhpages := docusaurusPublishGhpages - .dependsOn(unidoc in Compile) - .dependsOn(updateSiteVariables in ThisBuild) + .dependsOn(Compile / unidoc) + .dependsOn(ThisBuild / updateSiteVariables) .value, - scalacOptions in (ScalaUnidoc, unidoc) ++= Seq( + (ScalaUnidoc / unidoc / scalacOptions) ++= Seq( "-doc-source-url", s"https://github.com/monix/monix-connect/tree/v${version.value}€{FILE_PATH}.scala", - "-sourcepath", baseDirectory.in(LocalRootProject).value.getAbsolutePath, + "-sourcepath", (LocalRootProject / baseDirectory).value.getAbsolutePath, "-doc-title", "Monix Connect", "-doc-version", s"v${version.value}", "-groups" ), // Exclude monix.*.internal from ScalaDoc - sources in (ScalaUnidoc, unidoc) ~= (_ filterNot { file => + ScalaUnidoc / unidoc / sources ~= (_ filterNot { file => // Exclude protobuf generated files file.getCanonicalPath.contains("/src_managed/main/monix/connect/") file.getCanonicalPath.contains("monix-connect/redis/target/scala-2.12/src_managed") @@ -310,17 +315,17 @@ def minorVersion(version: String): String = { val updateSiteVariables = taskKey[Unit]("Update site variables") -updateSiteVariables in ThisBuild := { +ThisBuild / updateSiteVariables := { val file = - (baseDirectory in LocalRootProject).value / "website" / "variables.js" + (LocalRootProject / baseDirectory).value / "website" / "variables.js" val variables = Map[String, String]( - "organization" -> (organization in LocalRootProject).value, - "coreModuleName" -> (moduleName in monixConnect).value, + "organization" -> (LocalRootProject / organization).value, + "coreModuleName" -> (monixConnect / moduleName).value, "latestVersion" -> version.value, "scalaPublishVersions" -> { - val minorVersions = (crossScalaVersions in monixConnect).value.map(minorVersion) + val minorVersions = (monixConnect / crossScalaVersions).value.map(minorVersion) if (minorVersions.size <= 2) minorVersions.mkString(" and ") else minorVersions.init.mkString(", ") ++ " and " ++ minorVersions.last } diff --git a/dynamodb/src/it/scala/monix/connect/dynamodb/DynamoDbFixture.scala b/dynamodb/src/it/scala/monix/connect/dynamodb/DynamoDbFixture.scala index fecc8b1ff..b8afff6ed 100644 --- a/dynamodb/src/it/scala/monix/connect/dynamodb/DynamoDbFixture.scala +++ b/dynamodb/src/it/scala/monix/connect/dynamodb/DynamoDbFixture.scala @@ -9,11 +9,13 @@ import software.amazon.awssdk.regions.Region import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient import software.amazon.awssdk.services.dynamodb.model.{AttributeDefinition, AttributeValue, BillingMode, CreateTableRequest, CreateTableResponse, DeleteTableRequest, DeleteTableResponse, GetItemRequest, KeySchemaElement, KeyType, ProvisionedThroughput, PutItemRequest, ScalarAttributeType} -import scala.collection.JavaConverters._ import scala.concurrent.duration._ +@scala.annotation.nowarn trait DynamoDbFixture { + import scala.collection.JavaConverters._ + case class Citizen(citizenId: String, city: String, age: Int) val strAttr: String => AttributeValue = value => AttributeValue.builder.s(value).build val doubleAttr: Int => AttributeValue = value => AttributeValue.builder().n(value.toString).build @@ -84,8 +86,7 @@ trait DynamoDbFixture { def createTableRequest( tableName: String = Gen.identifier.sample.get, schema: List[KeySchemaElement], - attributeDefinition: List[AttributeDefinition], - provisionedThroughput: ProvisionedThroughput = baseProvisionedThroughput): CreateTableRequest = { + attributeDefinition: List[AttributeDefinition]): CreateTableRequest = { CreateTableRequest .builder .tableName(tableName) diff --git a/dynamodb/src/it/scala/monix/connect/dynamodb/DynamoDbSuite.scala b/dynamodb/src/it/scala/monix/connect/dynamodb/DynamoDbSuite.scala index c04e72cc7..314fe0c4a 100644 --- a/dynamodb/src/it/scala/monix/connect/dynamodb/DynamoDbSuite.scala +++ b/dynamodb/src/it/scala/monix/connect/dynamodb/DynamoDbSuite.scala @@ -14,7 +14,6 @@ import software.amazon.awssdk.regions.Region import software.amazon.awssdk.services.dynamodb.model.{GetItemRequest, ListTablesRequest, ListTablesResponse, PutItemRequest} import scala.concurrent.duration._ -import scala.collection.JavaConverters._ class DynamoDbSuite extends AsyncFlatSpec with Matchers with MonixTaskTest with DynamoDbFixture with BeforeAndAfterAll { @@ -24,9 +23,9 @@ class DynamoDbSuite extends AsyncFlatSpec with Matchers with MonixTaskTest with import monix.connect.dynamodb.DynamoDbOp.Implicits.listTablesOp val listRequest = ListTablesRequest.builder.build - DynamoDb.fromConfig.use(_.single(listRequest)).asserting { listedTables => + DynamoDb.fromConfig().use(_.single(listRequest)).asserting { listedTables => listedTables shouldBe a[ListTablesResponse] - listedTables.tableNames().asScala.contains(tableName) shouldBe true + listedTables.tableNames().contains(tableName) shouldBe true } } @@ -36,7 +35,7 @@ class DynamoDbSuite extends AsyncFlatSpec with Matchers with MonixTaskTest with MonixAwsConf.load().memoizeOnSuccess.flatMap(DynamoDb.fromConfig(_).use(_.single(listRequest))).asserting { listTables => listTables shouldBe a[ListTablesResponse] - listTables.tableNames().asScala.contains(tableName) shouldBe true + listTables.tableNames().contains(tableName) shouldBe true } } @@ -47,7 +46,7 @@ class DynamoDbSuite extends AsyncFlatSpec with Matchers with MonixTaskTest with val monixAwsConf = MonixAwsConf.load().memoizeOnSuccess DynamoDb.fromConfig(monixAwsConf).use(_.single(listRequest)).asserting { listTables => listTables shouldBe a[ListTablesResponse] - listTables.tableNames().asScala.contains(tableName) shouldBe true + listTables.tableNames().contains(tableName) shouldBe true } } @@ -61,7 +60,7 @@ class DynamoDbSuite extends AsyncFlatSpec with Matchers with MonixTaskTest with Observable.now(listRequest).transform(dynamodb.transformer()).headL }.asserting { listedTables => listedTables shouldBe a[ListTablesResponse] - listedTables.tableNames().asScala.contains(tableName) shouldBe true + listedTables.tableNames().contains(tableName) shouldBe true } } @@ -74,7 +73,7 @@ class DynamoDbSuite extends AsyncFlatSpec with Matchers with MonixTaskTest with _ <- Observable.now(request).consumeWith(DynamoDb.createUnsafe(client).sink()) getResponse <- Task.from(client.getItem(getItemRequest(tableName, citizen.citizenId, citizen.city))) } yield { - getResponse.item().values().asScala.head.n().toDouble shouldBe citizen.age + getResponse.item().get(0).n().toDouble shouldBe citizen.age } } @@ -84,7 +83,7 @@ class DynamoDbSuite extends AsyncFlatSpec with Matchers with MonixTaskTest with val putItemRequests: List[PutItemRequest] = citizens.map(putItemRequest(tableName, _)) for { - _ <- DynamoDb.fromConfig.use { dynamoDb => + _ <- DynamoDb.fromConfig().use { dynamoDb => Observable .fromIterable(putItemRequests) .consumeWith(dynamoDb.sink(RetryStrategy(retries = 3, backoffDelay = 1.second))) @@ -93,7 +92,7 @@ class DynamoDbSuite extends AsyncFlatSpec with Matchers with MonixTaskTest with Task.from(client.getItem(getItemRequest(tableName, citizen.citizenId, citizen.city))) } } yield { - val actualCitizens = getResponses.map(_.item().values().asScala.head.n().toDouble) + val actualCitizens = getResponses.map(_.item().get(0).n().toDouble) actualCitizens should contain theSameElementsAs citizens.map(_.age) } } @@ -105,13 +104,13 @@ class DynamoDbSuite extends AsyncFlatSpec with Matchers with MonixTaskTest with val getItemRequests: List[GetItemRequest] = List("citizen1", "citizen2").map(getItemRequest(tableName, _, city = "Rome")) Task.traverse(putItemRequests)(req => Task.from(client.putItem(req))) >> - DynamoDb.fromConfig.use { dynamoDb => + DynamoDb.fromConfig().use { dynamoDb => Observable .fromIterable(getItemRequests) .transform(dynamoDb.transformer(RetryStrategy(retries = 3, backoffDelay = 1.second))) .toListL }.map { result => - val actualCitizens = result.map(_.item().values().asScala.head.n().toInt) + val actualCitizens = result.map(_.item().get(0).n().toInt) actualCitizens should contain theSameElementsAs citizens.map(_.age) } } diff --git a/dynamodb/src/test/scala-2/monix/connect/dynamodb/DynamoDbDelayedOpSpec.scala b/dynamodb/src/test/scala-2/monix/connect/dynamodb/DynamoDbDelayedOpSpec.scala index 4630d09ad..a652dc4a2 100644 --- a/dynamodb/src/test/scala-2/monix/connect/dynamodb/DynamoDbDelayedOpSpec.scala +++ b/dynamodb/src/test/scala-2/monix/connect/dynamodb/DynamoDbDelayedOpSpec.scala @@ -19,7 +19,6 @@ package monix.connect.dynamodb import monix.connect.dynamodb.domain.RetryStrategy import monix.eval.Task -import monix.execution.Scheduler.Implicits.global import monix.execution.exceptions.DummyException import monix.execution.schedulers.TestScheduler import org.scalatest.matchers.should.Matchers diff --git a/dynamodb/src/test/scala/monix/connect/dynamodb/Fixture.scala b/dynamodb/src/test/scala/monix/connect/dynamodb/Fixture.scala index e66637347..2b2033021 100644 --- a/dynamodb/src/test/scala/monix/connect/dynamodb/Fixture.scala +++ b/dynamodb/src/test/scala/monix/connect/dynamodb/Fixture.scala @@ -48,7 +48,7 @@ trait Fixture { override def execute(dynamoDbRequest: GetItemRequest)( implicit client: DynamoDbAsyncClient): CompletableFuture[GetItemResponse] = { - incWithF.runToFuture.flatMap(r => Future.successful(r)).asJava.toCompletableFuture + incWithF().runToFuture.flatMap(r => Future.successful(r)).asJava.toCompletableFuture } } diff --git a/elasticsearch/src/it/scala/monix/connect/elasticsearch/ElasticsearchSourceSuite.scala b/elasticsearch/src/it/scala/monix/connect/elasticsearch/ElasticsearchSourceSuite.scala index f3bb3bd79..a7d63b17f 100644 --- a/elasticsearch/src/it/scala/monix/connect/elasticsearch/ElasticsearchSourceSuite.scala +++ b/elasticsearch/src/it/scala/monix/connect/elasticsearch/ElasticsearchSourceSuite.scala @@ -2,15 +2,12 @@ package monix.connect.elasticsearch import monix.eval.Task import monix.execution.Scheduler -import monix.execution.Scheduler.Implicits.global import monix.testing.scalatest.MonixTaskTest import org.scalacheck.Gen import org.scalatest.BeforeAndAfterEach -import org.scalatest.flatspec.{AnyFlatSpecLike, AsyncFlatSpec} +import org.scalatest.flatspec.AsyncFlatSpec import org.scalatest.matchers.should.Matchers -import scala.util.Try - class ElasticsearchSourceSuite extends AsyncFlatSpec with MonixTaskTest with Fixture with Matchers with BeforeAndAfterEach { import com.sksamuel.elastic4s.ElasticDsl._ diff --git a/elasticsearch/src/it/scala/monix/connect/elasticsearch/ElasticsearchSuite.scala b/elasticsearch/src/it/scala/monix/connect/elasticsearch/ElasticsearchSuite.scala index 2ecc4cb37..56f78ec93 100644 --- a/elasticsearch/src/it/scala/monix/connect/elasticsearch/ElasticsearchSuite.scala +++ b/elasticsearch/src/it/scala/monix/connect/elasticsearch/ElasticsearchSuite.scala @@ -3,11 +3,11 @@ package monix.connect.elasticsearch import com.sksamuel.elastic4s.Indexes import monix.eval.Task import monix.execution.Scheduler -import monix.execution.Scheduler.Implicits.global +import monix.execution.exceptions.DummyException import monix.testing.scalatest.MonixTaskTest import org.scalacheck.Gen import org.scalatest.BeforeAndAfterEach -import org.scalatest.flatspec.{AnyFlatSpecLike, AsyncFlatSpec} +import org.scalatest.flatspec.AsyncFlatSpec import org.scalatest.matchers.should.Matchers class ElasticsearchSuite extends AsyncFlatSpec with MonixTaskTest with Fixture with Matchers with BeforeAndAfterEach { @@ -155,7 +155,7 @@ class ElasticsearchSuite extends AsyncFlatSpec with MonixTaskTest with Fixture w es.getIndex(getIndex(indexName)).map(_.result(indexName)).attempt }.asserting { getIndexAttempt => getIndexAttempt.isLeft shouldBe true - getIndexAttempt.left.get shouldBe a[NoSuchElementException] + getIndexAttempt.swap.getOrElse(DummyException("failed")) shouldBe a[NoSuchElementException] } } diff --git a/gcs/src/it/scala/monix/connect/gcp/storage/GcsUploaderSuite.scala b/gcs/src/it/scala/monix/connect/gcp/storage/GcsUploaderSuite.scala index 155bcb0d7..f6bdd0ba4 100644 --- a/gcs/src/it/scala/monix/connect/gcp/storage/GcsUploaderSuite.scala +++ b/gcs/src/it/scala/monix/connect/gcp/storage/GcsUploaderSuite.scala @@ -6,14 +6,13 @@ import com.google.cloud.storage.contrib.nio.testing.LocalStorageHelper import com.google.cloud.storage.{Blob, BlobId, BlobInfo, Option => _} import monix.connect.gcp.storage.components.GcsUploader import monix.execution.Scheduler -import monix.execution.Scheduler.Implicits.global import monix.reactive.Observable import monix.testing.scalatest.MonixTaskTest import org.apache.commons.io.FileUtils import org.scalacheck.Gen import org.scalatest.BeforeAndAfterAll import org.scalatest.matchers.should.Matchers -import org.scalatest.wordspec.{AnyWordSpecLike, AsyncWordSpec} +import org.scalatest.wordspec.AsyncWordSpec class GcsUploaderSuite extends AsyncWordSpec with MonixTaskTest with Matchers with BeforeAndAfterAll { diff --git a/gcs/src/it/scala/monix/connect/gcp/storage/examples/GcsExamples.scala b/gcs/src/it/scala/monix/connect/gcp/storage/examples/GcsExamples.scala index b967d7428..0099a3634 100644 --- a/gcs/src/it/scala/monix/connect/gcp/storage/examples/GcsExamples.scala +++ b/gcs/src/it/scala/monix/connect/gcp/storage/examples/GcsExamples.scala @@ -40,7 +40,7 @@ class GcsExamples extends AnyWordSpecLike with Matchers with BeforeAndAfterAll { "bucket donwload" in { val storage = GcsStorage(underlying) val bucket: Task[Option[GcsBucket]] = storage.getBucket("myBucket") - val ob: Observable[Array[Byte]] = { + val _: Observable[Array[Byte]] = { Observable.fromTask(bucket) .flatMap { case Some(blob) => blob.download("myBlob") @@ -56,7 +56,7 @@ class GcsExamples extends AnyWordSpecLike with Matchers with BeforeAndAfterAll { val storage = GcsStorage.create() val memoizedBlob = storage.getBlob("myBucket", "myBlob").memoize - val ob: Observable[Array[Byte]] = { + val _: Observable[Array[Byte]] = { for { blob <- Observable.fromTask(memoizedBlob): Observable[Option[GcsBlob]] bytes <- { @@ -79,7 +79,7 @@ class GcsExamples extends AnyWordSpecLike with Matchers with BeforeAndAfterAll { val storage = GcsStorage.create() val targetFile = new File("example/target/file.txt") - val t: Task[Unit] = { + val _: Task[Unit] = { for { maybeBucket <- storage.getBucket("myBucket"): Task[Option[GcsBucket]] _ <- maybeBucket match { @@ -98,7 +98,7 @@ class GcsExamples extends AnyWordSpecLike with Matchers with BeforeAndAfterAll { val storage = GcsStorage.create() val targetFile = new File("example/target/file.txt") - val t: Task[Unit] = { + val _: Task[Unit] = { for { maybeBucket <- storage.getBlob("myBucket", "myBlob"): Task[Option[GcsBlob]] _ <- maybeBucket match { @@ -134,9 +134,9 @@ class GcsExamples extends AnyWordSpecLike with Matchers with BeforeAndAfterAll { val storage = GcsStorage.create() val sourceFile = new File("path/to/your/path.txt") - val t: Task[Unit] = for { + val _: Task[Unit] = for { bucket <- storage.createBucket("myBucket", GcsBucketInfo.Locations.`US-WEST1`): Task[GcsBucket] - unit <- bucket.uploadFromFile("myBlob", sourceFile.toPath) + _ <- bucket.uploadFromFile("myBlob", sourceFile.toPath) } yield () } @@ -165,7 +165,7 @@ class GcsExamples extends AnyWordSpecLike with Matchers with BeforeAndAfterAll { val storage = GcsStorage.create() val sourceFile = new File("path/to/your/path.txt") - val t: Task[Unit] = for { + val _: Task[Unit] = for { blob <- storage.createBlob("myBucket", "myBlob"): Task[GcsBlob] _ <- blob.uploadFromFile(sourceFile.toPath) } yield () @@ -184,14 +184,14 @@ class GcsExamples extends AnyWordSpecLike with Matchers with BeforeAndAfterAll { ), storageClass = Some(StorageClass.REGIONAL) ) - val bucket: Task[GcsBucket] = storage.createBucket("mybucket", Locations.`EUROPE-WEST1`, Some(metadata)).memoizeOnSuccess + val _: Task[GcsBucket] = storage.createBucket("mybucket", Locations.`EUROPE-WEST1`, Some(metadata)).memoizeOnSuccess } "create blob" in { import monix.connect.gcp.storage.{GcsBlob, GcsStorage} val storage = GcsStorage.create() - val blob: Task[GcsBlob] = storage.createBlob("mybucket","myBlob").memoizeOnSuccess + val _: Task[GcsBlob] = storage.createBlob("mybucket","myBlob").memoizeOnSuccess } @@ -201,7 +201,7 @@ class GcsExamples extends AnyWordSpecLike with Matchers with BeforeAndAfterAll { val storage = GcsStorage.create() val sourceBlob: Task[GcsBlob] = storage.createBlob("myBucket", "sourceBlob") - val targetBlob: Task[GcsBlob] = sourceBlob.flatMap(_.copyTo("targetBlob")) + val _: Task[GcsBlob] = sourceBlob.flatMap(_.copyTo("targetBlob")) } } diff --git a/gcs/src/test/scala-2/monix/connect/gcp/storage/GcsBlobSpec.scala b/gcs/src/test/scala-2/monix/connect/gcp/storage/GcsBlobSpec.scala index 1a0b7ecd3..303024a8d 100644 --- a/gcs/src/test/scala-2/monix/connect/gcp/storage/GcsBlobSpec.scala +++ b/gcs/src/test/scala-2/monix/connect/gcp/storage/GcsBlobSpec.scala @@ -136,7 +136,6 @@ class GcsBlobSpec extends AnyWordSpecLike with IdiomaticMockito with Matchers wi val blobSourceOption: BlobSourceOption = mock[BlobSourceOption] val copywriter = mock[CopyWriter] val bucketName = genNonEmtyStr.sample.get - val blobName = genNonEmtyStr.sample.get when(underlying.copyTo(bucketName, blobSourceOption)).thenReturn(copywriter) //when diff --git a/hdfs/src/main/scala/monix/connect/hdfs/Hdfs.scala b/hdfs/src/main/scala/monix/connect/hdfs/Hdfs.scala index edc29e9b0..684a93fa5 100644 --- a/hdfs/src/main/scala/monix/connect/hdfs/Hdfs.scala +++ b/hdfs/src/main/scala/monix/connect/hdfs/Hdfs.scala @@ -18,7 +18,6 @@ package monix.connect.hdfs import monix.eval.Task -import monix.execution.Scheduler import monix.reactive.{Consumer, Observable} import org.apache.hadoop.fs.{FileSystem, Path} @@ -36,9 +35,7 @@ object Hdfs { * @param scheduler An implicit [[Scheduler]] instance to be in the scope of the call. * @return A [[Long]] that represents the number of bytes that has been written. */ - def append(fs: FileSystem, path: Path)( - implicit - scheduler: Scheduler): Consumer[Array[Byte], Long] = { + def append(fs: FileSystem, path: Path): Consumer[Array[Byte], Long] = { new HdfsSubscriber(fs, path, appendEnabled = true) } @@ -65,7 +62,7 @@ object Hdfs { overwrite: Boolean = true, replication: Short = 3, bufferSize: Int = 4096, - blockSize: Int = 134217728)(implicit scheduler: Scheduler): Consumer[Array[Byte], Long] = { + blockSize: Int = 134217728): Consumer[Array[Byte], Long] = { new HdfsSubscriber(fs, path, overwrite, bufferSize, replication, blockSize, appendEnabled = false) } @@ -78,9 +75,7 @@ object Hdfs { * @param scheduler An implicit [[Scheduler]] instance to be in the scope of the call. * @return An [[Observable]] of chunks of bytes with the size specified by [[chunkSize]]. */ - def read(fs: FileSystem, path: Path, chunkSize: Int = 8192)( - implicit - scheduler: Scheduler): Observable[Array[Byte]] = { + def read(fs: FileSystem, path: Path, chunkSize: Int = 8192): Observable[Array[Byte]] = { Observable.fromInputStream(Task(fs.open(path)), chunkSize) } diff --git a/hdfs/src/test/scala-2/monix/connect/hdfs/HdfsSpec.scala b/hdfs/src/test/scala-2/monix/connect/hdfs/HdfsSpec.scala index c9775b415..87241d88c 100644 --- a/hdfs/src/test/scala-2/monix/connect/hdfs/HdfsSpec.scala +++ b/hdfs/src/test/scala-2/monix/connect/hdfs/HdfsSpec.scala @@ -26,6 +26,7 @@ import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} import org.apache.hadoop.hdfs.{HdfsConfiguration, MiniDFSCluster} import monix.reactive.{Consumer, Observable} import monix.execution.Scheduler +import monix.execution.exceptions.DummyException import monix.testing.scalatest.MonixTaskTest class HdfsSpec @@ -146,7 +147,8 @@ class HdfsSpec resultA.toList shouldBe chunksA.flatten offsetA shouldBe chunksA.flatten.size failedOverwriteAttempt.isLeft shouldBe true - failedOverwriteAttempt.left.get shouldBe a[org.apache.hadoop.fs.FileAlreadyExistsException] + failedOverwriteAttempt.swap.getOrElse(DummyException("failed")) shouldBe a[ + org.apache.hadoop.fs.FileAlreadyExistsException] fs.exists(path) shouldBe true resultAfterOverwriteAttempt.toList shouldBe chunksA.flatten resultAfterOverwriteAttempt.length shouldBe chunksA.flatten.size diff --git a/mongodb/src/it/scala-2/monix/connect/mongodb/MongoConnectionSuite.scala b/mongodb/src/it/scala-2/monix/connect/mongodb/MongoConnectionSuite.scala index 28e79ed4a..15ac78047 100644 --- a/mongodb/src/it/scala-2/monix/connect/mongodb/MongoConnectionSuite.scala +++ b/mongodb/src/it/scala-2/monix/connect/mongodb/MongoConnectionSuite.scala @@ -94,7 +94,7 @@ class MongoConnectionSuite extends AsyncFlatSpec with MonixTaskTest with Matcher it should "be created unsafely given the mongo client" in { def makeResource(col1: CollectionRef[Employee], col2: CollectionRef[Company]) = - Resource.liftF(MongoConnection.createUnsafe2(MongoClients.create(mongoEndpoint), (col1, col2))) + Resource.eval(MongoConnection.createUnsafe2(MongoClients.create(mongoEndpoint), (col1, col2))) createConnectionTest2(makeResource) } @@ -115,7 +115,7 @@ class MongoConnectionSuite extends AsyncFlatSpec with MonixTaskTest with Matcher it should "be created unsafely given a mongo client" in { def makeResource(col1: CollectionRef[Company], col2: CollectionRef[Employee], col3: CollectionRef[Investor]) = - Resource.liftF(MongoConnection.createUnsafe3(MongoClients.create(mongoEndpoint), (col1, col2, col3))) + Resource.eval(MongoConnection.createUnsafe3(MongoClients.create(mongoEndpoint), (col1, col2, col3))) abstractCreateConnectionTest3(makeResource) } @@ -134,7 +134,7 @@ class MongoConnectionSuite extends AsyncFlatSpec with MonixTaskTest with Matcher it should "be created unsafely given a mongo client" in { val makeResource = (collections: Tuple4F[CollectionRef, Employee, Employee, Employee, Company]) => - Resource.liftF( + Resource.eval( MongoConnection.createUnsafe4( MongoClients.create(mongoEndpoint), (collections._1, collections._2, collections._3, collections._4)) @@ -158,7 +158,7 @@ class MongoConnectionSuite extends AsyncFlatSpec with MonixTaskTest with Matcher it should "be created unsafely given a mongo client" in { val makeResource = (collections: Tuple5F[CollectionRef, Employee, Employee, Employee, Employee, Company]) => - Resource.liftF( + Resource.eval( MongoConnection.createUnsafe5( MongoClients.create(mongoEndpoint), (collections._1, collections._2, collections._3, collections._4, collections._5) @@ -191,7 +191,7 @@ class MongoConnectionSuite extends AsyncFlatSpec with MonixTaskTest with Matcher val makeResource = (collections: Tuple6F[CollectionRef, Employee, Employee, Employee, Employee, Employee, Company]) => { val (c1, c2, c3, c4, c5, c6) = collections - Resource.liftF( + Resource.eval( MongoConnection .createUnsafe6(MongoClients.create(mongoEndpoint), (c1, c2, c3, c4, c5, c6)) ) @@ -223,7 +223,7 @@ class MongoConnectionSuite extends AsyncFlatSpec with MonixTaskTest with Matcher val makeResource = (collections: Tuple7F[CollectionRef, Employee, Employee, Employee, Employee, Employee, Employee, Company]) => { val (c1, c2, c3, c4, c5, c6, c7) = collections - Resource.liftF( + Resource.eval( MongoConnection .createUnsafe7(MongoClients.create(mongoEndpoint), (c1, c2, c3, c4, c5, c6, c7)) ) @@ -255,7 +255,7 @@ class MongoConnectionSuite extends AsyncFlatSpec with MonixTaskTest with Matcher val makeResource = (collections: Tuple8F[CollectionRef, Employee, Employee, Employee, Employee, Employee, Employee, Employee, Company]) => { val (c1, c2, c3, c4, c5, c6, c7, c8) = collections - Resource.liftF( + Resource.eval( MongoConnection .createUnsafe8(MongoClients.create(mongoEndpoint), (c1, c2, c3, c4, c5, c6, c7, c8)) ) diff --git a/mongodb/src/main/scala-2/monix/connect/mongodb/client/MongoConnection.scala b/mongodb/src/main/scala-2/monix/connect/mongodb/client/MongoConnection.scala index 7f8386864..28d4d7f62 100644 --- a/mongodb/src/main/scala-2/monix/connect/mongodb/client/MongoConnection.scala +++ b/mongodb/src/main/scala-2/monix/connect/mongodb/client/MongoConnection.scala @@ -773,19 +773,19 @@ private[mongodb] trait MongoConnection[A <: Product, T2 <: Product] { self => def create(connectionStr: String, collectionRefs: A): Resource[Task, T2] = for { client <- Resource.fromAutoCloseable(Task.evalAsync(MongoClients.create(connectionStr))) - collectionOperator <- Resource.liftF(createCollectionOperator(client, collectionRefs)) + collectionOperator <- Resource.eval(createCollectionOperator(client, collectionRefs)) } yield collectionOperator def create(connectionStr: ConnectionString, collectionRefs: A): Resource[Task, T2] = for { client <- Resource.fromAutoCloseable(Task.evalAsync(MongoClients.create(connectionStr))) - collectionOperator <- Resource.liftF(createCollectionOperator(client, collectionRefs)) + collectionOperator <- Resource.eval(createCollectionOperator(client, collectionRefs)) } yield collectionOperator def create(clientSettings: MongoClientSettings, collectionRefs: A): Resource[Task, T2] = for { client <- Resource.fromAutoCloseable(Task.evalAsync(MongoClients.create(clientSettings))) - collectionOperator <- Resource.liftF(createCollectionOperator(client, collectionRefs)) + collectionOperator <- Resource.eval(createCollectionOperator(client, collectionRefs)) } yield collectionOperator @UnsafeBecauseImpure diff --git a/mongodb/src/test/scala-2/monix/connect/mongodb/DeprecatedObjectPackageSpec.scala b/mongodb/src/test/scala-2/monix/connect/mongodb/DeprecatedObjectPackageSpec.scala index 51f9055b2..8abc4c76f 100644 --- a/mongodb/src/test/scala-2/monix/connect/mongodb/DeprecatedObjectPackageSpec.scala +++ b/mongodb/src/test/scala-2/monix/connect/mongodb/DeprecatedObjectPackageSpec.scala @@ -41,7 +41,6 @@ class DeprecatedObjectPackageSpec with IdiomaticMockito { private[this] implicit val col: MongoCollection[Employee] = mock[MongoCollection[Employee]] - private[this] implicit val defaultConfig: PatienceConfig = PatienceConfig(5.seconds, 300.milliseconds) private[this] val objectId = BsonObjectId.apply() override def beforeEach() = { diff --git a/mongodb/src/test/scala-2/monix/connect/mongodb/MongoOpSpec.scala b/mongodb/src/test/scala-2/monix/connect/mongodb/MongoOpSpec.scala index dd8812eea..cbcdac8d6 100644 --- a/mongodb/src/test/scala-2/monix/connect/mongodb/MongoOpSpec.scala +++ b/mongodb/src/test/scala-2/monix/connect/mongodb/MongoOpSpec.scala @@ -402,7 +402,6 @@ class MongoOpSpec //then s.tick(1.second) verify(col).updateOne(filter, update) - val expectedUpdateResult = UpdateResult(matchedCount = 0, modifiedCount = 0, wasAcknowledged = false) f.value.get shouldBe util.Success(UpdateResult(0, 0, false)) } diff --git a/mongodb/src/test/scala-2/monix/connect/mongodb/MongoSingleSpec.scala b/mongodb/src/test/scala-2/monix/connect/mongodb/MongoSingleSpec.scala index 235354851..3ce763528 100644 --- a/mongodb/src/test/scala-2/monix/connect/mongodb/MongoSingleSpec.scala +++ b/mongodb/src/test/scala-2/monix/connect/mongodb/MongoSingleSpec.scala @@ -383,7 +383,6 @@ class MongoSingleSpec //then s.tick(1.second) verify(col).updateOne(filter, update) - val expectedUpdateResult = UpdateResult(matchedCount = 0, modifiedCount = 0, wasAcknowledged = false) f.value.get shouldBe util.Success(UpdateResult(0, 0, false)) } diff --git a/mongodb/src/test/scala-2/monix/connect/mongodb/MongoSinkSpec.scala b/mongodb/src/test/scala-2/monix/connect/mongodb/MongoSinkSpec.scala index 9bdd20a56..f333a733f 100644 --- a/mongodb/src/test/scala-2/monix/connect/mongodb/MongoSinkSpec.scala +++ b/mongodb/src/test/scala-2/monix/connect/mongodb/MongoSinkSpec.scala @@ -30,7 +30,6 @@ import org.scalatest.concurrent.ScalaFutures import org.scalatest.flatspec.AnyFlatSpecLike import org.scalatest.matchers.should.Matchers import com.mongodb.client.result.{InsertOneResult => MongoInsertOneResult} -import org.mongodb.scala.bson.BsonObjectId import scala.concurrent.duration._ @@ -52,7 +51,6 @@ class MongoSinkSpec val s = TestScheduler() val e1 = genEmployee.sample.get val e2 = genEmployee.sample.get - val objectId = BsonObjectId.apply() val failedPub = Task.raiseError[MongoInsertOneResult](DummyException("Insert one failed")).toReactivePublisher(s) val successPub = Task(MongoInsertOneResult.unacknowledged()).toReactivePublisher(s) diff --git a/mongodb/src/test/scala-2/monix/connect/mongodb/ObjectPackageSpec.scala b/mongodb/src/test/scala-2/monix/connect/mongodb/ObjectPackageSpec.scala index 635ed074b..3f55a4516 100644 --- a/mongodb/src/test/scala-2/monix/connect/mongodb/ObjectPackageSpec.scala +++ b/mongodb/src/test/scala-2/monix/connect/mongodb/ObjectPackageSpec.scala @@ -40,7 +40,6 @@ class ObjectPackageSpec with IdiomaticMockito { private[this] implicit val col: MongoCollection[Employee] = mock[MongoCollection[Employee]] - private[this] implicit val defaultConfig: PatienceConfig = PatienceConfig(5.seconds, 300.milliseconds) private[this] val objectId = BsonObjectId.apply() override def beforeEach() = { diff --git a/parquet/src/test/scala-2/monix/connect/parquet/ParquetSinkErrorHandlingSpec.scala b/parquet/src/test/scala-2/monix/connect/parquet/ParquetSinkErrorHandlingSpec.scala index 0992416e9..f49c5e301 100644 --- a/parquet/src/test/scala-2/monix/connect/parquet/ParquetSinkErrorHandlingSpec.scala +++ b/parquet/src/test/scala-2/monix/connect/parquet/ParquetSinkErrorHandlingSpec.scala @@ -20,7 +20,6 @@ package monix.connect.parquet import monix.eval.Coeval import monix.execution.Scheduler import monix.execution.exceptions.DummyException -import monix.execution.schedulers.TestScheduler import monix.reactive.Observable import monix.testing.scalatest.MonixTaskTest import org.apache.avro.generic.GenericRecord @@ -65,7 +64,6 @@ class ParquetSinkErrorHandlingSpec } "signals error when the underlying parquet writer throws an error" in { - val testScheduler = TestScheduler() val filePath: String = genFilePath() val record: GenericRecord = personToRecord(genPerson.sample.get) val ex = DummyException("Boom!") diff --git a/parquet/src/test/scala/monix/connect/parquet/AvroParquetFixture.scala b/parquet/src/test/scala/monix/connect/parquet/AvroParquetFixture.scala index c86f01f68..3f8b7b6b5 100644 --- a/parquet/src/test/scala/monix/connect/parquet/AvroParquetFixture.scala +++ b/parquet/src/test/scala/monix/connect/parquet/AvroParquetFixture.scala @@ -27,6 +27,8 @@ import org.apache.parquet.hadoop.util.HadoopInputFile import org.scalacheck.Gen import org.scalatest.AsyncTestSuite +import scala.annotation.nowarn + trait AvroParquetFixture extends ParquetFixture { this: AsyncTestSuite => @@ -43,8 +45,10 @@ trait AvroParquetFixture extends ParquetFixture { val genAvroUsers: Int => Gen[List[Person]] = n => Gen.listOfN(n, genPerson) - def parquetWriter(file: String, conf: Configuration, schema: Schema): ParquetWriter[GenericRecord] = - AvroParquetWriter.builder[GenericRecord](new Path(file)).withConf(conf).withSchema(schema).build() + @nowarn("") + def parquetWriter(file: String, conf: Configuration, schema: Schema): ParquetWriter[GenericRecord] = { + AvroParquetWriter.builder(new Path(file)).withConf(conf).withSchema(schema).build() + } def avroParquetReader[T <: GenericRecord](file: String, conf: Configuration): ParquetReader[T] = AvroParquetReader.builder[T](HadoopInputFile.fromPath(new Path(file), conf)).withConf(conf).build() diff --git a/parquet/src/test/scala/monix/connect/parquet/AvroParquetSpec.scala b/parquet/src/test/scala/monix/connect/parquet/AvroParquetSpec.scala index 60cacd210..014a3bd11 100644 --- a/parquet/src/test/scala/monix/connect/parquet/AvroParquetSpec.scala +++ b/parquet/src/test/scala/monix/connect/parquet/AvroParquetSpec.scala @@ -50,7 +50,7 @@ class AvroParquetSpec .consumeWith(Parquet.writer(w)) .runSyncUnsafe() - val parquetContent: List[GenericRecord] = fromParquet[GenericRecord](file, conf, avroParquetReader(file, conf)) + val parquetContent: List[GenericRecord] = fromParquet[GenericRecord](avroParquetReader(file, conf)) parquetContent.length shouldEqual n parquetContent should contain theSameElementsAs records } diff --git a/parquet/src/test/scala/monix/connect/parquet/ParquetFixture.scala b/parquet/src/test/scala/monix/connect/parquet/ParquetFixture.scala index 8a8eaac10..f6ab83e7d 100644 --- a/parquet/src/test/scala/monix/connect/parquet/ParquetFixture.scala +++ b/parquet/src/test/scala/monix/connect/parquet/ParquetFixture.scala @@ -42,7 +42,7 @@ trait ParquetFixture { val conf = new Configuration() - def fromParquet[T](file: String, configuration: Configuration, reader: ParquetReader[T]): List[T] = { + def fromParquet[T](reader: ParquetReader[T]): List[T] = { var record: T = reader.read() var result: List[T] = List.empty[T] diff --git a/parquet/src/test/scala/monix/connect/parquet/ParquetSinkCoevalSpec.scala b/parquet/src/test/scala/monix/connect/parquet/ParquetSinkCoevalSpec.scala index 773c7df7e..441e7b291 100644 --- a/parquet/src/test/scala/monix/connect/parquet/ParquetSinkCoevalSpec.scala +++ b/parquet/src/test/scala/monix/connect/parquet/ParquetSinkCoevalSpec.scala @@ -50,7 +50,7 @@ class ParquetSinkCoevalSpec .consumeWith(ParquetSink.fromWriter(Coeval(w))) .asserting { _ => val parquetContent: List[GenericRecord] = - fromParquet[GenericRecord](filePath, conf, avroParquetReader(filePath, conf)) + fromParquet[GenericRecord](avroParquetReader(filePath, conf)) parquetContent.length shouldEqual n parquetContent should contain theSameElementsAs records } @@ -67,7 +67,7 @@ class ParquetSinkCoevalSpec writtenRecords shouldBe 0 val file = new File(filePath) val parquetContent: List[GenericRecord] = - fromParquet[GenericRecord](filePath, conf, avroParquetReader(filePath, conf)) + fromParquet[GenericRecord](avroParquetReader(filePath, conf)) file.exists() shouldBe true parquetContent.length shouldEqual 0 } diff --git a/parquet/src/test/scala/monix/connect/parquet/ParquetSinkUnsafeSpec.scala b/parquet/src/test/scala/monix/connect/parquet/ParquetSinkUnsafeSpec.scala index 2380ae413..1845a5960 100644 --- a/parquet/src/test/scala/monix/connect/parquet/ParquetSinkUnsafeSpec.scala +++ b/parquet/src/test/scala/monix/connect/parquet/ParquetSinkUnsafeSpec.scala @@ -48,10 +48,9 @@ class ParquetSinkUnsafeSpec Observable .fromIterable(records) .consumeWith(ParquetSink.fromWriterUnsafe(w)) >> - Task.eval(fromParquet[GenericRecord](filePath, conf, avroParquetReader(filePath, conf))).asserting { - parquetContent => - parquetContent.length shouldEqual n - parquetContent should contain theSameElementsAs records + Task.eval(fromParquet[GenericRecord](avroParquetReader(filePath, conf))).asserting { parquetContent => + parquetContent.length shouldEqual n + parquetContent should contain theSameElementsAs records } } @@ -64,7 +63,7 @@ class ParquetSinkUnsafeSpec .empty[GenericRecord] .consumeWith(ParquetSink.fromWriterUnsafe(writer)) file = new File(filePath) - parquetContent = fromParquet[GenericRecord](filePath, conf, avroParquetReader(filePath, conf)) + parquetContent = fromParquet[GenericRecord](avroParquetReader(filePath, conf)) } yield { writtenRecords shouldBe 0 file.exists() shouldBe true diff --git a/parquet/src/test/scala/monix/connect/parquet/ParquetSourceSpec.scala b/parquet/src/test/scala/monix/connect/parquet/ParquetSourceSpec.scala index 2b5a69748..f0d4df60e 100644 --- a/parquet/src/test/scala/monix/connect/parquet/ParquetSourceSpec.scala +++ b/parquet/src/test/scala/monix/connect/parquet/ParquetSourceSpec.scala @@ -20,6 +20,7 @@ package monix.connect.parquet import java.io.{File, FileNotFoundException} import monix.eval.Task import monix.execution.Scheduler +import monix.execution.exceptions.DummyException import monix.reactive.Observable import monix.testing.scalatest.MonixTaskTest import org.apache.avro.generic.GenericRecord @@ -85,8 +86,9 @@ class ParquetSourceSpec cancelable2 <- ParquetSource.fromReaderUnsafe(null).toListL.attempt } yield { cancelable1.isLeft shouldBe true + cancelable1.swap.getOrElse(DummyException("failed")) shouldBe a[NullPointerException] cancelable2.isLeft shouldBe true - cancelable1.left.get shouldBe a[NullPointerException] + cancelable2.swap.getOrElse(DummyException("failed")) shouldBe a[NullPointerException] } } diff --git a/redis/src/it/scala/monix/connect/redis/HashCommandsSuite.scala b/redis/src/it/scala/monix/connect/redis/HashCommandsSuite.scala index 763544be4..921cdb2f0 100644 --- a/redis/src/it/scala/monix/connect/redis/HashCommandsSuite.scala +++ b/redis/src/it/scala/monix/connect/redis/HashCommandsSuite.scala @@ -282,7 +282,6 @@ class HashCommandsSuite for { k1 <- Task.from(genRedisKey) k2 <- Task.from(genRedisKey) - k3 <- Task.from(genRedisKey) f1 <- Task.from(genRedisKey) f2 <- Task.from(genRedisKey) f3 <- Task.from(genRedisKey) diff --git a/redis/src/it/scala/monix/connect/redis/ServerCommandsSuite.scala b/redis/src/it/scala/monix/connect/redis/ServerCommandsSuite.scala index 4091c3f96..433af27c6 100644 --- a/redis/src/it/scala/monix/connect/redis/ServerCommandsSuite.scala +++ b/redis/src/it/scala/monix/connect/redis/ServerCommandsSuite.scala @@ -57,12 +57,10 @@ class ServerCommandsSuite "config parameters" can "be added and read" in { utfConnection.use[Task, Assertion](cmd => for { - initialLoglevel <- cmd.server.configGet("loglevel") _ <- cmd.server.configSet("loglevel", "debug") updatedLoglevel <- cmd.server.configGet("loglevel") emptyParameter <- cmd.server.configGet("non-existing-param") } yield { - //initialLoglevel shouldBe Some("notice") updatedLoglevel shouldBe Some("debug") emptyParameter shouldBe None }) diff --git a/redis/src/it/scala/monix/connect/redis/SortedSetCommandsSuite.scala b/redis/src/it/scala/monix/connect/redis/SortedSetCommandsSuite.scala index 2e775f2ff..f0b874047 100644 --- a/redis/src/it/scala/monix/connect/redis/SortedSetCommandsSuite.scala +++ b/redis/src/it/scala/monix/connect/redis/SortedSetCommandsSuite.scala @@ -70,11 +70,11 @@ class SortedSetCommandsSuite min3 <- sortedSet.bZPopMin(1.seconds, k1, k2) empty <- sortedSet.bZPopMin(1.seconds, k1, k2) } yield { - min1 shouldBe Some(k1, vScore1) - min2 shouldBe Some(k1, vScore2) // returns 5 because is the min score in k1 - min4 shouldBe Some(k1, vScore4) - min5 shouldBe Some(k1, vScore5) - min3 shouldBe Some(k2, vScore3) + min1 shouldBe Some((k1, vScore1)) + min2 shouldBe Some((k1, vScore2)) // returns 5 because is the min score in k1 + min4 shouldBe Some((k1, vScore4)) + min5 shouldBe Some((k1, vScore5)) + min3 shouldBe Some((k2, vScore3)) empty shouldBe None } } @@ -98,9 +98,9 @@ class SortedSetCommandsSuite thirdMax <- sortedSet.bZPopMax(1.seconds, k1, k2) empty <- sortedSet.bZPopMax(1.seconds, k1, k2) } yield { - firstMax shouldBe Some(k1, vScore5) - secondMax shouldBe Some(k1, vScore1) // returns 5 because is the min score in k1 - thirdMax shouldBe Some(k2, vScore3) + firstMax shouldBe Some((k1, vScore5)) + secondMax shouldBe Some((k1, vScore1)) // returns 5 because is the min score in k1 + thirdMax shouldBe Some((k2, vScore3)) empty shouldBe None } } diff --git a/redis/src/it/scala/monix/connect/redis/StringCommandsSuite.scala b/redis/src/it/scala/monix/connect/redis/StringCommandsSuite.scala index c8b7630b7..f75080adf 100644 --- a/redis/src/it/scala/monix/connect/redis/StringCommandsSuite.scala +++ b/redis/src/it/scala/monix/connect/redis/StringCommandsSuite.scala @@ -243,7 +243,7 @@ class StringCommandsSuite } "mGet" should "get the values of all the given keys" in { - val kV: List[(K, Option[V])] = List.fill(10)(genRedisKey, genRedisValue).map{ case (k, v) => (k.sample.get, Some(v.sample.get)) } + val kV: List[(K, Option[V])] = List.fill(10)((genRedisKey, genRedisValue)).map{ case (k, v) => (k.sample.get, Some(v.sample.get)) } val nonExistingKey = genRedisKey.sample.get utfConnection.use[Task, Assertion] { cmd => @@ -259,12 +259,12 @@ class StringCommandsSuite } "mSet" should "set multiple keys to multiple values" in { - val kV: Map[K, V] = List.fill(10)(genRedisKey, genRedisValue).map{ case (k, v) => (k.sample.get, v.sample.get) }.toMap + val kV: Map[K, V] = List.fill(10)((genRedisKey, genRedisValue)).map{ case (k, v) => (k.sample.get, v.sample.get) }.toMap utfConnection.use { cmd => cmd.string.mSet(kV) *> cmd.string.mGet(kV.keys.toList).toListL - }.asserting(_ should contain theSameElementsAs kV.mapValues(Some(_)).toList) + }.asserting(_ should contain theSameElementsAs kV.map{case (k, v) => (k, Some(v))}.toList) } "mSetNx" should "set multiple keys to multiple values, only if none of the keys exist" in { diff --git a/redis/src/main/scala/monix/connect/redis/Redis.scala b/redis/src/main/scala/monix/connect/redis/Redis.scala deleted file mode 100644 index 17204ef41..000000000 --- a/redis/src/main/scala/monix/connect/redis/Redis.scala +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Copyright (c) 2020-2021 by The Monix Connect Project Developers. - * See the project homepage at: https://connect.monix.io - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package monix.connect.redis - -/** - * An object that provides an aggregation of all the different Redis Apis. - * They can be equally accessed independently or from this object. - */ -@deprecated("use the pure `monix.connect.redis.client.RedisConnection`", "0.6.0") -object Redis - extends RedisKey with RedisHash with RedisList with RedisPubSub with RedisSet with RedisSortedSet with RedisStream - with RedisString with RedisServer diff --git a/redis/src/main/scala/monix/connect/redis/RedisHash.scala b/redis/src/main/scala/monix/connect/redis/RedisHash.scala deleted file mode 100644 index 846c59e15..000000000 --- a/redis/src/main/scala/monix/connect/redis/RedisHash.scala +++ /dev/null @@ -1,157 +0,0 @@ -/* - * Copyright (c) 2020-2021 by The Monix Connect Project Developers. - * See the project homepage at: https://connect.monix.io - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package monix.connect.redis - -import io.lettuce.core.api.StatefulRedisConnection -import io.lettuce.core.{KeyValue, MapScanCursor, ScanCursor} -import monix.eval.Task -import monix.reactive.Observable - -import scala.jdk.CollectionConverters._ - -@deprecated("use the `monix.connect.redis.client.RedisConnection`", "0.6.0") -private[redis] trait RedisHash { - - /** - * Delete one or more hash fields. - * @return Number of fields that were removed from the hash, not including specified but non existing - * fields. - */ - def hdel[K, V](key: K, fields: K*)(implicit connection: StatefulRedisConnection[K, V]): Task[Long] = - Task.from(connection.reactive().hdel(key, fields: _*)).map(_.longValue) - - /** - * Determine if a hash field exists. - * @return True if the hash contains the field. - * False if the hash does not contain the field, or key does not exist. . - */ - def hexists[K, V](key: K, field: K)(implicit connection: StatefulRedisConnection[K, V]): Task[Boolean] = - Task.from(connection.reactive().hexists(key, field)).map(_.booleanValue) - - /** - * Get the value of a hash field. - * @return The value associated with field, or null when field is not present in the hash or key does not exist. - * A failed task with [[NoSuchElementException]] will be returned when the underlying api - * returns an empty publisher. i.e: when the key did not exist. - */ - def hget[K, V](key: K, field: K)(implicit connection: StatefulRedisConnection[K, V]): Task[V] = { - Task.from(connection.reactive().hget(key, field)) - } - - /** - * Increment the integer value of a hash field by the given number. - * @return The value at field after the increment operation. - */ - def hincrby[K, V](key: K, field: K, amount: Long)( - implicit - connection: StatefulRedisConnection[K, V]): Task[Long] = - Task.from(connection.reactive().hincrby(key, field, amount)).map(_.longValue) - - /** - * Increment the float value of a hash field by the given amount. - * @return The value of field after the increment. - */ - def hincrbyfloat[K, V](key: K, field: K, amount: Double)( - implicit - connection: StatefulRedisConnection[K, V]): Task[Double] = - Task.from(connection.reactive().hincrbyfloat(key, field, amount)).map(_.doubleValue) - - /** - * Get all the fields and values in a hash. - * - * @param key the key - * @return Map of the fields and their values stored in the hash, or an empty list when key does not exist. - */ - def hgetall[K, V](key: K)(implicit connection: StatefulRedisConnection[K, V]): Observable[KeyValue[K, V]] = - Observable.fromReactivePublisher(connection.reactive().hgetall(key)) - - /** - * Get all the fields in a hash. - * @return The fields in the hash. - */ - def hkeys[K, V](key: K)(implicit connection: StatefulRedisConnection[K, V]): Observable[K] = - Observable.fromReactivePublisher(connection.reactive().hkeys(key)) - - /** - * Get the number of fields in a hash. - * @return Number of fields in the hash, or 0 when key does not exist. - */ - def hlen[K, V](key: K)(implicit connection: StatefulRedisConnection[K, V]): Task[Long] = - Task.from(connection.reactive().hlen(key)).map(_.longValue) - - /** - * Get the values of all the given hash fields. - * @return Values associated with the given fields. - */ - def hmget[K, V](key: K, fields: K*)(implicit connection: StatefulRedisConnection[K, V]): Observable[KeyValue[K, V]] = - Observable.fromReactivePublisher(connection.reactive().hmget(key, fields: _*)) - - /** - * Set multiple hash fields to multiple values. - * @return Simple string reply. - */ - def hmset[K, V](key: K, map: Map[K, V])(implicit connection: StatefulRedisConnection[K, V]): Task[String] = - Task.from(connection.reactive().hmset(key, map.asJava)) - - /** - * Incrementally iterate hash fields and associated values. - * @return Map scan cursor. - */ - def hscan[K, V](key: K)(implicit connection: StatefulRedisConnection[K, V]): Task[MapScanCursor[K, V]] = - Task.from(connection.reactive().hscan(key)) - - def hscan[K, V](key: K, scanCursor: ScanCursor)( - implicit - connection: StatefulRedisConnection[K, V]): Task[MapScanCursor[K, V]] = - Task.from(connection.reactive().hscan(key, scanCursor)) - - /** - * Set the string value of a hash field. - * @return True if field is a new field in the hash and value was set. - * False if field already exists in the hash and the value was updated. - */ - def hset[K, V](key: K, field: K, value: V)(implicit connection: StatefulRedisConnection[K, V]): Task[Boolean] = - Task.from(connection.reactive().hset(key, field, value)).map(_.booleanValue) - - /** - * Set the value of a hash field, only if the field does not exist. - * - * @return True if field is a new field in the hash and value was set. - * False if field already exists in the hash and the value was updated. - */ - def hsetnx[K, V](key: K, field: K, value: V)(implicit connection: StatefulRedisConnection[K, V]): Task[Boolean] = - Task.from(connection.reactive().hsetnx(key, field, value)).map(_.booleanValue) - - /** - * Get the string length of the field value in a hash. - * @return Length of the field value, or 0 when field is not present in the hash - * or key does not exist at all. - */ - def hstrlen[K, V](key: K, field: K)(implicit connection: StatefulRedisConnection[K, V]): Task[Long] = - Task.from(connection.reactive().hstrlen(key, field)).map(_.longValue) - - /** - * Get all the values in a hash. - * @return Values in the hash, or an empty list when key does not exist. - */ - def hvals[K, V](key: K)(implicit connection: StatefulRedisConnection[K, V]): Observable[V] = - Observable.fromReactivePublisher(connection.reactive().hvals(key)) -} - -@deprecated("use the pure `monix.connect.redis.client.RedisConnection`", "0.6.0") -object RedisHash extends RedisHash diff --git a/redis/src/main/scala/monix/connect/redis/RedisKey.scala b/redis/src/main/scala/monix/connect/redis/RedisKey.scala deleted file mode 100644 index d84622efa..000000000 --- a/redis/src/main/scala/monix/connect/redis/RedisKey.scala +++ /dev/null @@ -1,228 +0,0 @@ -/* - * Copyright (c) 2020-2021 by The Monix Connect Project Developers. - * See the project homepage at: https://connect.monix.io - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package monix.connect.redis - -import java.util.Date - -import io.lettuce.core.api.StatefulRedisConnection -import io.lettuce.core.{KeyScanCursor, ScanCursor} -import monix.eval.Task -import monix.reactive.Observable - -@deprecated("use the pure `monix.connect.redis.client.RedisConnection`", "0.6.0") -private[redis] trait RedisKey { - - /** - * Delete one or more keys. - * @return The number of keys that were removed. - */ - def del[K, V](keys: K*)(implicit connection: StatefulRedisConnection[K, V]): Task[Long] = - Task.from(connection.reactive().del(keys: _*)).map(_.longValue) - - /** - * Unlink one or more keys (non blocking DEL). - * @return The number of keys that were removed. - */ - def unlink[K, V](keys: K*)(implicit connection: StatefulRedisConnection[K, V]): Task[Long] = - Task.from(connection.reactive().unlink(keys: _*)).map(_.longValue) - - /** - * Return a serialized version of the value stored at the specified key. - * @return The serialized value. - */ - def dump[K, V](key: K)(implicit connection: StatefulRedisConnection[K, V]): Task[Array[Byte]] = - Task.from(connection.reactive().dump(key)) - - /** - * Determine how many keys exist. - * @return Number of existing keys - */ - def exists[K, V](keys: K*)(implicit connection: StatefulRedisConnection[K, V]): Task[Long] = - Task.from(connection.reactive().exists(keys: _*)).map(_.toLong) - - /** - * Set a key's time to live in seconds. - * @return True if the timeout was set. false if key does not exist or the timeout could not be set. - * - */ - def expire[K, V](key: K, seconds: Long)(implicit connection: StatefulRedisConnection[K, V]): Task[Boolean] = - Task.from(connection.reactive().expire(key, seconds)).map(_.booleanValue) - - /** - * Set the expiration for a key as a UNIX timestamp. - * @return True if the timeout was set. False if key does not exist or the timeout could not be set. - */ - def expireat[K, V](key: K, timestamp: Date)(implicit connection: StatefulRedisConnection[K, V]): Task[Boolean] = - Task.from(connection.reactive().expireat(key, timestamp)).map(_.booleanValue) - - def expireat[K, V](key: K, timestamp: Long)(implicit connection: StatefulRedisConnection[K, V]): Task[Boolean] = - Task.from(connection.reactive().expireat(key, timestamp)).map(_.booleanValue) - - /** - * Find all keys matching the given pattern. - * @return Keys matching the pattern. - */ - def keys[K, V](pattern: K)(implicit connection: StatefulRedisConnection[K, V]): Observable[K] = - Observable.fromReactivePublisher(connection.reactive().keys(pattern)) - - /** - * Atomically transfer a key from a Redis instance to another one. - * @return The command returns OK on success. - */ - def migrate[K, V](host: String, port: Int, key: K, db: Int, timeout: Long)( - implicit - connection: StatefulRedisConnection[K, V]): Task[String] = - Task.from(connection.reactive().migrate(host, port, key, db, timeout)) - - /** - * Move a key to another database. - * @return True if the move operation succeeded, false if not. - */ - def move[K, V](key: K, db: Int)(implicit connection: StatefulRedisConnection[K, V]): Task[Boolean] = - Task.from(connection.reactive().move(key, db)).map(_.booleanValue) - - /** - * returns the kind of internal representation used in order to store the value associated with a key. - * @return String - */ - def objectEncoding[K, V](key: K)(implicit connection: StatefulRedisConnection[K, V]): Task[String] = - Task.from(connection.reactive().objectEncoding(key)) - - /** - * Returns the number of seconds since the object stored at the specified key is idle (not requested by read or write - * operations). - * @return Number of seconds since the object stored at the specified key is idle. - */ - def objectIdletime[K, V](key: K)(implicit connection: StatefulRedisConnection[K, V]): Task[Long] = - Task.from(connection.reactive().objectIdletime(key)).map(_.longValue) - - /** - * Returns the number of references of the value associated with the specified key. - * @return Long - */ - def objectRefcount[K, V](key: K)(implicit connection: StatefulRedisConnection[K, V]): Task[Long] = - Task.from(connection.reactive().objectRefcount(key)).map(_.longValue) - - /** - * Remove the expiration from a key. - * @return True if the timeout was removed. false if key does not exist or does not have an associated timeout. - */ - def persist[K, V](key: K)(implicit connection: StatefulRedisConnection[K, V]): Task[Boolean] = - Task.from(connection.reactive().persist(key)).map(_.booleanValue) - - /** - * Set a key's time to live in milliseconds. - * @return True if the timeout was set. False if key does not exist or the timeout could not be set. - * - */ - def pexpire[K, V](key: K, milliseconds: Long)(implicit connection: StatefulRedisConnection[K, V]): Task[Boolean] = - Task.from(connection.reactive().pexpire(key, milliseconds)).map(_.booleanValue()) - - /** - * Set the expiration for a key as a UNIX timestamp specified in milliseconds. - * @return Boolean integer-reply specifically: - * { @literal true} if the timeout was set. { @literal false} if { @code key} does not exist or the timeout could not - * be set (see: { @code EXPIRE}). - */ - def pexpireat[K, V](key: K, timestamp: Date)(implicit connection: StatefulRedisConnection[K, V]): Task[Boolean] = - Task.from(connection.reactive().pexpireat(key, timestamp)).map(_.booleanValue()) - - def pexpireat[K, V](key: K, timestamp: Long)(implicit connection: StatefulRedisConnection[K, V]): Task[Boolean] = - Task.from(connection.reactive().pexpireat(key, timestamp)).map(_.booleanValue()) - - /** - * Get the time to live for a key in milliseconds. - * @return Long integer-reply TTL in milliseconds, or a negative value in order to signal an error (see the description - * above). - */ - def pttl[K, V](key: K)(implicit connection: StatefulRedisConnection[K, V]): Task[Long] = - Task.from(connection.reactive().pttl(key)).map(_.longValue) - - /** - * Return a random key from the keyspace. - * @return The random key, or null when the database is empty. - */ - def randomkey[K, V]()(implicit connection: StatefulRedisConnection[K, V]): Task[K] = - Task.from(connection.reactive().randomkey()) - - /** - * Rename a key. - * @return String simple-string-reply - */ - def rename[K, V](key: K, newKey: K)(implicit connection: StatefulRedisConnection[K, V]): Task[String] = - Task.from(connection.reactive().rename(key, newKey)) - - /** - * Rename a key, only if the new key does not exist. - * @return True if key was renamed to newkey. - * False if newkey already exists. - */ - def renamenx[K, V](key: K, newKey: K)(implicit connection: StatefulRedisConnection[K, V]): Task[Boolean] = - Task.from(connection.reactive().renamenx(key, newKey)).map(_.booleanValue) - - /** - * Create a key using the provided serialized value, previously obtained using DUMP. - * @return The command returns OK on success. - */ - def restore[K, V](key: K, ttl: Long, value: Array[Byte])( - implicit - connection: StatefulRedisConnection[K, V]): Task[String] = - Task.from(connection.reactive().restore(key, ttl, value)) - - /** - * Sort the elements in a list, set or sorted set. - * @return Sorted elements. - */ - def sort[K, V](key: K)(implicit connection: StatefulRedisConnection[K, V]): Observable[V] = - Observable.fromReactivePublisher(connection.reactive().sort(key)) - - /** - * Touch one or more keys. Touch sets the last accessed time for a key. Non-exsitent keys wont get created. - * @return The number of found keys. - */ - def touch[K, V](keys: K*)(implicit connection: StatefulRedisConnection[K, V]): Task[Long] = - Task.from(connection.reactive().touch(keys: _*)).map(_.longValue) - - /** - * Get the time to live for a key. - * @return TTL in seconds, or a negative value in order to signal an error (see the description above). - */ - def ttl[K, V](key: K)(implicit connection: StatefulRedisConnection[K, V]): Task[Long] = - Task.from(connection.reactive().ttl(key)).map(_.longValue) - - /** - * Determine the type stored at key. - * @return Type of key, or none when key does not exist. - */ - def `type`[K, V](key: K)(implicit connection: StatefulRedisConnection[K, V]): Task[String] = - Task.from(connection.reactive().`type`(key)) - - /** - * Incrementally iterate the keys space. - * @return Scan cursor. - */ - def scan[K, V]()(implicit connection: StatefulRedisConnection[K, V]): Task[KeyScanCursor[K]] = - Task.from(connection.reactive().scan()) - - def scan[K, V](scanCursor: ScanCursor)(implicit connection: StatefulRedisConnection[K, V]): Task[KeyScanCursor[K]] = - Task.from(connection.reactive().scan(scanCursor)) - -} - -@deprecated("use the pure `monix.connect.redis.client.RedisConnection`", "0.6.0") -object RedisKey extends RedisKey diff --git a/redis/src/main/scala/monix/connect/redis/RedisList.scala b/redis/src/main/scala/monix/connect/redis/RedisList.scala deleted file mode 100644 index a4d5be8be..000000000 --- a/redis/src/main/scala/monix/connect/redis/RedisList.scala +++ /dev/null @@ -1,157 +0,0 @@ -/* - * Copyright (c) 2020-2021 by The Monix Connect Project Developers. - * See the project homepage at: https://connect.monix.io - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package monix.connect.redis - -import io.lettuce.core.api.StatefulRedisConnection -import io.lettuce.core.KeyValue -import monix.eval.Task -import monix.reactive.Observable - -@deprecated("use the pure `monix.connect.redis.client.RedisConnection`", "0.6.0") -private[redis] trait RedisList { - - /** - * Remove and get the first element in a list, or block until one is available. - * @return A null multi-bulk when no element could be popped and the timeout expired. - * A two-element multi-bulk with the first element being the name of the key - * where an element was popped and the second element being the value of the popped element. - */ - def blpop[K, V](timeout: Long, keys: K*)(implicit connection: StatefulRedisConnection[K, V]): Task[KeyValue[K, V]] = - Task.from(connection.reactive().blpop(timeout, keys: _*)) - - /** - * Remove and get the last element in a list, or block until one is available. - * @return A null multi-bulk when no element could be popped and the timeout expired. - * A two-element multi-bulk with the first element being the name of the key - * where an element was popped and the second element being the value of the popped element. - */ - def brpop[K, V](timeout: Long, keys: K*)(implicit connection: StatefulRedisConnection[K, V]): Task[KeyValue[K, V]] = - Task.from(connection.reactive().brpop(timeout, keys: _*)) - - /** - * Pop a value from a list, push it to another list and return it; or block until one is available. - * @return The element being popped from source and pushed to destination. - */ - def brpoplpush[K, V](timeout: Long, source: K, destination: K)( - implicit - connection: StatefulRedisConnection[K, V]): Task[V] = - Task.from(connection.reactive().brpoplpush(timeout, source, destination)) - - /** - * Get an element from a list by its index. - * @return The requested element, or null when index is out of range. - */ - def lindex[K, V](key: K, index: Long)(implicit connection: StatefulRedisConnection[K, V]): Task[V] = - Task.from(connection.reactive().lindex(key, index)) - - /** - * Insert an element before or after another element in a list. - * @return The length of the list after the insert operation, or -1 when the value pivot was not found. - */ - def linsert[K, V](key: K, before: Boolean, pivot: V, value: V)( - implicit - connection: StatefulRedisConnection[K, V]): Task[Long] = - Task.from(connection.reactive().linsert(key, before, pivot, value)).map(_.longValue) - - /** - * Get the length of a list. - * @return Long integer-reply the length of the list at { @code key}. - */ - def llen[K, V](key: K)(implicit connection: StatefulRedisConnection[K, V]): Task[Long] = - Task.from(connection.reactive().llen(key)).map(_.longValue) - - /** - * Remove and get the first element in a list. - * @return The value of the first element, or null when key does not exist. - */ - def lpop[K, V](key: K)(implicit connection: StatefulRedisConnection[K, V]): Task[V] = - Task.from(connection.reactive().lpop(key)) - - /** - * Prepend one or multiple values to a list. - * @return The length of the list after the push operations. - */ - def lpush[K, V](key: K, values: V*)(implicit connection: StatefulRedisConnection[K, V]): Task[Long] = - Task.from(connection.reactive().lpush(key, values: _*)).map(_.longValue) - - /** - * Prepend values to a list, only if the list exists. - * @return The length of the list after the push operation. - */ - def lpushx[K, V](key: K, values: V*)(implicit connection: StatefulRedisConnection[K, V]): Task[Long] = - Task.from(connection.reactive().lpushx(key, values: _*)).map(_.longValue) - - /** - * Get a range of elements from a list. - * @return List of elements in the specified range. - */ - def lrange[K, V](key: K, start: Long, stop: Long)(implicit connection: StatefulRedisConnection[K, V]): Observable[V] = - Observable.fromReactivePublisher(connection.reactive().lrange(key, start, stop)) - - /** - * Remove elements from a list. - * @return The number of removed elements. - */ - def lrem[K, V](key: K, count: Long, value: V)(implicit connection: StatefulRedisConnection[K, V]): Task[Long] = - Task.from(connection.reactive().lrem(key, count, value)).map(_.longValue) - - /** - * Set the value of an element in a list by its index. - * @return The same inserted value - */ - def lset[K, V](key: K, index: Long, value: V)(implicit connection: StatefulRedisConnection[K, V]): Task[String] = - Task.from(connection.reactive().lset(key, index, value)) - - /** - * Trim a list to the specified range. - * @return Simple string reply - */ - def ltrim[K, V](key: K, start: Long, stop: Long)(implicit connection: StatefulRedisConnection[K, V]): Task[String] = - Task.from(connection.reactive().ltrim(key, start, stop)) - - /** - * Remove and get the last element in a list. - * @return The value of the last element, or null when key does not exist. - */ - def rpop[K, V](key: K)(implicit connection: StatefulRedisConnection[K, V]): Task[V] = - Task.from(connection.reactive().rpop(key)) - - /** - * Remove the last element in a list, append it to another list and return it. - * @return The element being popped and pushed. - */ - def rpoplpush[K, V](source: K, destination: K)(implicit connection: StatefulRedisConnection[K, V]): Task[V] = - Task.from(connection.reactive().rpoplpush(source, destination)) - - /** - * Append one or multiple values to a list. - * @return The length of the list after the push operation. - */ - def rpush[K, V](key: K, values: V*)(implicit connection: StatefulRedisConnection[K, V]): Task[Long] = - Task.from(connection.reactive().rpush(key, values: _*)).map(_.longValue) - - /** - * Append values to a list, only if the list exists. - * @return The length of the list after the push operation. - */ - def rpushx[K, V](key: K, values: V*)(implicit connection: StatefulRedisConnection[K, V]): Task[Long] = - Task.from(connection.reactive().rpushx(key, values: _*)).map(_.longValue) -} - -@deprecated("use the pure `monix.connect.redis.client.RedisConnection`", "0.6.0") -object RedisList extends RedisList diff --git a/redis/src/main/scala/monix/connect/redis/RedisPubSub.scala b/redis/src/main/scala/monix/connect/redis/RedisPubSub.scala deleted file mode 100644 index bfc4b6f0a..000000000 --- a/redis/src/main/scala/monix/connect/redis/RedisPubSub.scala +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Copyright (c) 2020-2021 by The Monix Connect Project Developers. - * See the project homepage at: https://connect.monix.io - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package monix.connect.redis - -import io.lettuce.core.api.StatefulRedisConnection -import monix.eval.Task -import monix.reactive.Observable - -import scala.jdk.CollectionConverters._ - -/** - * @see The reference Lettuce Api at: [[io.lettuce.core.api.reactive.BaseRedisReactiveCommands]] - */ -@deprecated("use the pure `monix.connect.redis.client.RedisConnection`", "0.6.0") -trait RedisPubSub { - - /** - * Post a message to a channel. - * @return The number of clients that received the message. - */ - def publish[K, V](channel: K, message: V)(implicit connection: StatefulRedisConnection[K, V]): Task[Long] = - Task.from(connection.reactive().publish(channel, message)).map(_.longValue) - - /** - * Lists the currently *active channels*. - * @return List of active channels, optionally matching the specified pattern. - */ - def pubsubChannels[K, V](implicit connection: StatefulRedisConnection[K, V]): Observable[K] = - Observable.fromReactivePublisher(connection.reactive().pubsubChannels()) - - /** - * Lists the currently *active channels*. - * @return The list of active channels, optionally matching the specified pattern. - */ - def pubsubChannels[K, V](channel: K)(implicit connection: StatefulRedisConnection[K, V]): Observable[K] = - Observable.fromReactivePublisher(connection.reactive().pubsubChannels(channel)) - - /** - * Returns the number of subscribers (not counting clients subscribed to patterns) for the specified channels. - * @return The list of channels and number of subscribers for every channel. - * In this case long remains as [[java.lang.Long]] and not as as [[scala.Long]], - * since traversing the map to convert values would imply performance implications - */ - def pubsubNumsub[K, V](channels: K*)( - implicit - connection: StatefulRedisConnection[K, V]): Task[Map[K, java.lang.Long]] = - Task.from(connection.reactive().pubsubNumsub(channels: _*)).map(_.asScala.toMap) - - /** - * Returns the number of subscriptions to patterns. - * @return The number of patterns all the clients are subscribed to. - */ - def pubsubNumpat[K, V]()(implicit connection: StatefulRedisConnection[K, V]): Task[Long] = - Task.from(connection.reactive().pubsubNumpat()).map(_.longValue) - - /** - * Echo the given string. - * @return Bulk string reply. - */ - def echo[K, V](msg: V)(implicit connection: StatefulRedisConnection[K, V]): Task[V] = - Task.from(connection.reactive().echo(msg)) - - /** - * Return the role of the instance in the context of replication. - * @return Object array-reply where the first element is one of master, slave, sentinel and the additional - * elements are role-specific. - */ - def role[K, V]()(implicit connection: StatefulRedisConnection[K, V]): Observable[Any] = - Observable.fromReactivePublisher(connection.reactive().role()) - - /** - * Ping the server. - * @return Simple string reply. - */ - def ping[K, V]()(implicit connection: StatefulRedisConnection[K, V]): Task[String] = - Task.from(connection.reactive().ping()) - - /** - * Switch connection to Read-Only mode when connecting to a cluster. - * @return Simple string reply. - */ - def readOnly[K, V]()(implicit connection: StatefulRedisConnection[K, V]): Task[String] = - Task.from(connection.reactive().readOnly()) - - /** - * Switch connection to Read-Write mode (default) when connecting to a cluster. - * @return Simple string reply. - */ - def readWrite[K, V]()(implicit connection: StatefulRedisConnection[K, V]): Task[String] = - Task.from(connection.reactive().readWrite()) - - /** - * Instructs Redis to disconnect the connection. Note that if auto-reconnect is enabled then Lettuce will auto-reconnect if - * the connection was disconnected. Use {@link io.lettuce.core.api.StatefulConnection#close} to close connections and - * release resources. - * @return String simple string reply always OK. - */ - def quit[K, V]()(implicit connection: StatefulRedisConnection[K, V]): Task[String] = - Task.from(connection.reactive().quit()) - - /** - * Wait for replication. - * @return Number of replicas - */ - def waitForReplication[K, V](replicas: Int, timeout: Long)( - implicit - connection: StatefulRedisConnection[K, V]): Task[Long] = - Task.from(connection.reactive().waitForReplication(replicas, timeout)).map(_.longValue) - -} - -@deprecated("use the pure `monix.connect.redis.client.RedisConnection`", "0.6.0") -object RedisPubSub extends RedisPubSub diff --git a/redis/src/main/scala/monix/connect/redis/RedisServer.scala b/redis/src/main/scala/monix/connect/redis/RedisServer.scala deleted file mode 100644 index b507bc5aa..000000000 --- a/redis/src/main/scala/monix/connect/redis/RedisServer.scala +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Copyright (c) 2020-2021 by The Monix Connect Project Developers. - * See the project homepage at: https://connect.monix.io - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package monix.connect.redis - -import io.lettuce.core.api.StatefulRedisConnection -import monix.eval.Task - -import scala.collection.mutable -import scala.jdk.CollectionConverters._ - -/** - * @see The reference Lettuce Api at: - * [[io.lettuce.core.api.reactive.RedisServerReactiveCommands]] - */ -@deprecated("use the pure `monix.connect.redis.client.RedisConnection`", "0.6.0") -private[redis] trait RedisServer { - - /** - * Asynchronously rewrite the append-only file. - * @return Always OK. - */ - def bgrewriteaof[K, V](implicit connection: StatefulRedisConnection[K, V]): Task[String] = - Task.from(connection.reactive().bgrewriteaof()) - /** - * Asynchronously save the dataset to disk. - * @return Simple string reply - */ - def bgsave[K, V](implicit connection: StatefulRedisConnection[K, V]): Task[String] = - Task.from(connection.reactive().bgsave()) - - /** - * Get the current connection name. - * @return The connection name, or a null bulk reply if no name is set. - */ - def clientGetname[K, V](implicit connection: StatefulRedisConnection[K, V]): Task[String] = - Task.from(connection.reactive().bgsave()) - - /** - * Set the current connection name. - * - * @return OK if the connection name was successfully set. - */ - def clientSetname[K, V](name: K)(implicit connection: StatefulRedisConnection[K, V]): Task[String] = - Task.from(connection.reactive().bgsave()) - - /** - * Kill the connection of a client identified by ip:port. - * @return OK if the connection exists and has been closed. - */ - def clientKill[K, V](addr: String)(implicit connection: StatefulRedisConnection[K, V]): Task[String] = - Task.from(connection.reactive().clientKill(addr)) - - /** - * Get the list of client connections. - * - * @return A unique string, formatted as follows: One client connection per line (separated by LF), - * each line is composed of a succession of property=value fields separated by a space character. - */ - def clientList[K, V](implicit connection: StatefulRedisConnection[K, V]): Task[String] = - Task.from(connection.reactive().clientList()) - - /** - * Get total number of Redis commands. - * @return Number of total commands in this Redis server. - */ - def commandCount[K, V](implicit connection: StatefulRedisConnection[K, V]): Task[Long] = - Task.from(connection.reactive().commandCount()).map(_.longValue) - - /** - * Get the value of a configuration parameter. - * @return Bulk string reply - */ - def configGet[K, V](parameter: String)( - implicit - connection: StatefulRedisConnection[K, V]): Task[mutable.Map[String, String]] = - Task.from(connection.reactive().configGet(parameter)).map(_.asScala) - - /** - * Reset the stats returned by INFO. - * @return Always OK. - */ - def configResetstat[K, V](implicit connection: StatefulRedisConnection[K, V]): Task[String] = - Task.from(connection.reactive().configResetstat()) - - /** - * Remove all keys from all databases. - * @return Simple string reply - */ - def flushallAsync[K, V]()(implicit connection: StatefulRedisConnection[K, V]): Task[String] = - Task.from(connection.reactive().flushallAsync()) - - /** - * Remove all keys reactivehronously from the current database. - * @return Single string reply - */ - def flushdbAsync[K, V]()(implicit connection: StatefulRedisConnection[K, V]): Task[String] = - Task.from(connection.reactive().flushdbAsync()) - -} - -@deprecated("use the pure `monix.connect.redis.client.RedisConnection`", "0.6.0") -object RedisServer extends RedisServer diff --git a/redis/src/main/scala/monix/connect/redis/RedisSet.scala b/redis/src/main/scala/monix/connect/redis/RedisSet.scala deleted file mode 100644 index 4b2c63bf9..000000000 --- a/redis/src/main/scala/monix/connect/redis/RedisSet.scala +++ /dev/null @@ -1,163 +0,0 @@ -/* - * Copyright (c) 2020-2021 by The Monix Connect Project Developers. - * See the project homepage at: https://connect.monix.io - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package monix.connect.redis - -import io.lettuce.core.api.StatefulRedisConnection -import io.lettuce.core.ValueScanCursor -import monix.eval.Task -import monix.reactive.Observable - -/** - * @see The reference to lettuce api [[io.lettuce.core.api.reactive.RedisSetReactiveCommands]] - */ -@deprecated("use the pure `monix.connect.redis.client.RedisConnection`", "0.6.0") -private[redis] trait RedisSet { - - /** - * Add one or more members to a set. - * @return The number of elements that were added to the set, not including all the elements already - * present into the set. - */ - def sadd[K, V](key: K, members: V*)(implicit connection: StatefulRedisConnection[K, V]): Task[Long] = - Task.from(connection.reactive().sadd(key, members: _*)).map(_.longValue) - - /** - * Get the number of members in a set. - * @return The cardinality (number of elements) of the set, or { @literal false} if { @code key} does not - * exist. - */ - def scard[K, V](key: K)(implicit connection: StatefulRedisConnection[K, V]): Task[Long] = - Task.from(connection.reactive().scard(key)).map(_.longValue) - - /** - * Subtract multiple sets. - * @return A list with members of the resulting set. - */ - def sdiff[K, V](keys: K*)(implicit connection: StatefulRedisConnection[K, V]): Observable[V] = - Observable.fromReactivePublisher(connection.reactive().sdiff(keys: _*)) - - /** - * Subtract multiple sets and store the resulting set in a key. - * @return The number of elements in the resulting set. - */ - def sdiffstore[K, V](destination: K, keys: K*)(implicit connection: StatefulRedisConnection[K, V]): Task[Long] = - Task.from(connection.reactive().sdiffstore(destination, keys: _*)).map(_.longValue) - - /** - * Intersect multiple sets. - * @return A list with members of the resulting set. - */ - def sinter[K, V](keys: K*)(implicit connection: StatefulRedisConnection[K, V]): Observable[V] = - Observable.fromReactivePublisher(connection.reactive().sinter(keys: _*)) - - /** - * Intersect multiple sets and store the resulting set in a key. - * @return The number of elements in the resulting set. - */ - def sinterstore[K, V](destination: K, keys: K*)( - implicit - connection: StatefulRedisConnection[K, V]): Task[java.lang.Long] = - Task.from(connection.reactive().sinterstore(destination, keys: _*)) - - /** - * Determine if a given value is a member of a set. - * @return True if the element is a member of the set. - * False if the element is not a member of the set, or if key does not exist. - */ - def sismember[K, V](key: K, member: V)(implicit connection: StatefulRedisConnection[K, V]): Task[Boolean] = - Task.from(connection.reactive().sismember(key, member)).map(_.booleanValue) - - /** - * Move a member from one set to another. - * @return True if the element is moved. - * False if the element is not a member of source and no operation was performed. - */ - def smove[K, V](source: K, destination: K, member: V)( - implicit - connection: StatefulRedisConnection[K, V]): Task[Boolean] = - Task.from(connection.reactive().smove(source, destination, member)).map(_.booleanValue) - - /** - * Get all the members in a set. - * @return All elements of the set. - */ - def smembers[K, V](key: K)(implicit connection: StatefulRedisConnection[K, V]): Observable[V] = - Observable.fromReactivePublisher(connection.reactive().smembers(key)) - - /** - * Remove and return a random member from a set. - * @return The removed element, or null when key does not exist. - */ - def spop[K, V](key: K)(implicit connection: StatefulRedisConnection[K, V]): Task[V] = - Task.from(connection.reactive().spop(key)) - - /** - * Remove and return one or multiple random members from a set. - * @return The removed element, or null when key does not exist. - */ - def spop[K, V](key: K, count: Long)(implicit connection: StatefulRedisConnection[K, V]): Observable[V] = - Observable.fromReactivePublisher(connection.reactive().spop(key, count)) - - /** - * Get one random member from a set. - * @return Without the additional count argument the command returns a Bulk Reply with the - * randomly selected element, or null when key does not exist. - */ - def srandmember[K, V](key: K)(implicit connection: StatefulRedisConnection[K, V]): Task[V] = - Task.from(connection.reactive().srandmember(key)) - - /** - * Get one or multiple random members from a set. - * @return The elements without the additional count argument the command returns a Bulk Reply - * with the randomly selected element, or null when key does not exist. - */ - def srandmember[K, V](key: K, count: Long)(implicit connection: StatefulRedisConnection[K, V]): Observable[V] = - Observable.fromReactivePublisher(connection.reactive().srandmember(key, count)) - - /** - * Remove one or more members from a set. - * @return Long hat represents the number of members that were removed from the set, not including non existing members. - */ - def srem[K, V](key: K, members: V*)(implicit connection: StatefulRedisConnection[K, V]): Task[Long] = - Task.from(connection.reactive().srem(key, members: _*)).map(_.longValue) - - /** - * Add multiple sets. - * @return The members of the resulting set. - */ - def sunion[K, V](keys: K*)(implicit connection: StatefulRedisConnection[K, V]): Observable[V] = - Observable.fromReactivePublisher(connection.reactive().sunion(keys: _*)) - - /** - * Add multiple sets and store the resulting set in a key. - * @return Long that represents the number of elements in the resulting set. - */ - def sunionstore[K, V](destination: K, keys: K*)(implicit connection: StatefulRedisConnection[K, V]): Task[Long] = - Task.from(connection.reactive().sunionstore(destination, keys: _*)).map(_.longValue) - - /** - * Incrementally iterate Set elements. - * @return Scan cursor. - */ - def sscan[K, V](key: K)(implicit connection: StatefulRedisConnection[K, V]): Task[ValueScanCursor[V]] = - Task.from(connection.reactive().sscan(key)) - -} - -@deprecated("use the pure `monix.connect.redis.client.RedisConnection`", "0.6.0") -object RedisSet extends RedisSet diff --git a/redis/src/main/scala/monix/connect/redis/RedisSortedSet.scala b/redis/src/main/scala/monix/connect/redis/RedisSortedSet.scala deleted file mode 100644 index 2dc941d46..000000000 --- a/redis/src/main/scala/monix/connect/redis/RedisSortedSet.scala +++ /dev/null @@ -1,366 +0,0 @@ -/* - * Copyright (c) 2020-2021 by The Monix Connect Project Developers. - * See the project homepage at: https://connect.monix.io - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package monix.connect.redis - -import io.lettuce.core.api.StatefulRedisConnection -import io.lettuce.core.{KeyValue, Limit, Range, ScoredValue, ScoredValueScanCursor} -import monix.eval.Task -import monix.reactive.Observable - -/** - * @see The reference Lettuce Api at: [[io.lettuce.core.api.reactive.RedisSortedSetReactiveCommands]] - */ -@deprecated("use the pure `monix.connect.redis.client.RedisConnection`", "0.6.0") -private[redis] trait RedisSortedSet { - - /** - * Removes and returns a member with the lowest scores in the sorted set stored at one of the keys. - * @return Multi-bulk containing the name of the key, the score and the popped member. - */ - def bzpopmin[K, V](timeout: Long, keys: K*)( - implicit - connection: StatefulRedisConnection[K, V]): Task[KeyValue[K, ScoredValue[V]]] = - Task.from(connection.reactive().bzpopmin(timeout, keys: _*)) - - /** - * Removes and returns a member with the highest scores in the sorted set stored at one of the keys. - * @return Multi-bulk containing the name of the key, the score and the popped member. - */ - def bzpopmax[K, V](timeout: Long, keys: K*)( - implicit - connection: StatefulRedisConnection[K, V]): Task[KeyValue[K, ScoredValue[V]]] = - Task.from(connection.reactive().bzpopmax(timeout, keys: _*)) - - /** - * Add one or more members to a sorted set, or update its score if it already exists. - * @return Long integer-reply specifically: - * The number of elements added to the sorted sets, not including elements already existing for which the score was - * updated. - */ - def zadd[K, V](key: K, score: Double, member: V)(implicit connection: StatefulRedisConnection[K, V]): Task[Long] = - Task.from(connection.reactive().zadd(key, score, member)).map(_.longValue) - - /** - * Add one or more members to a sorted set, or update its score if it already exists. - * @return Long integer-reply specifically: - * The number of elements added to the sorted sets, not including elements already existing for which the score was - * updated. - */ - def zadd[K, V](key: K, scoredValues: ScoredValue[V]*)( - implicit - connection: StatefulRedisConnection[K, V]): Task[Long] = - Task.from(connection.reactive().zadd(key, scoredValues: _*)).map(_.longValue) - - /** - * Add one or more members to a sorted set, or update its score if it already exists applying the INCR option. ZADD - * acts like ZINCRBY. - * @return The total number of elements changed - */ - def zaddincr[K, V](key: K, score: Double, member: V)( - implicit - connection: StatefulRedisConnection[K, V]): Task[Double] = - Task.from(connection.reactive().zaddincr(key, score, member)).map(_.doubleValue) - - /** - * Get the number of members in a sorted set. - * - * @return Long integer-reply specifically: - * The number of elements added to the sorted sets, not including elements already existing for which the score was - * updated. - */ - def zcard[K, V](key: K)(implicit connection: StatefulRedisConnection[K, V]): Task[Long] = - Task.from(connection.reactive().zcard(key)).map(_.longValue) - - /** - * Count the members in a sorted set with scores within the given [[Range]]. - * @return The number of elements of the sorted set, or false if key does not exist. - */ - def zcount[K, V](key: K, range: Range[_ <: Number])(implicit connection: StatefulRedisConnection[K, V]): Task[Long] = - Task.from(connection.reactive().zcount(key, range)).map(_.longValue) - - /** - * Increment the score of a member in a sorted set. - * @return The new score of member, represented as string. - */ - def zincrby[K, V](key: K, amount: Double, member: V)( - implicit - connection: StatefulRedisConnection[K, V]): Task[Double] = - Task.from(connection.reactive().zincrby(key, amount, member)).map(_.doubleValue) - - /** - * Intersect multiple sorted sets and store the resulting sorted set in a new key. - * @return The number of elements in the resulting sorted set at destination. - */ - def zinterstore[K, V](destination: K, keys: K*)(implicit connection: StatefulRedisConnection[K, V]): Task[Long] = - Task.from(connection.reactive().zinterstore(destination, keys: _*)).map(_.longValue) - - /** - * Count the number of members in a sorted set between a given lexicographical range. - * @return The number of elements in the specified score range. - */ - def zlexcount[K, V](key: K, range: Range[_ <: V])(implicit connection: StatefulRedisConnection[K, V]): Task[Long] = - Task.from(connection.reactive().zlexcount(key, range)).map(_.longValue) - - /** - * Removes and returns up to count members with the lowest scores in the sorted set stored at key. - * @return Scored value the removed element. - */ - def zpopmin[K, V](key: K)(implicit connection: StatefulRedisConnection[K, V]): Task[ScoredValue[V]] = - Task.from(connection.reactive().zpopmin(key)) - - /** - * Removes and returns up to count members with the lowest scores in the sorted set stored at key. - * @return Scored values of the popped scores and elements. - */ - def zpopmin[K, V](key: K, count: Long)( - implicit - connection: StatefulRedisConnection[K, V]): Observable[ScoredValue[V]] = - Observable.fromReactivePublisher(connection.reactive().zpopmin(key, count)) - - /** - * Removes and returns up to count members with the highest scores in the sorted set stored at key. - * @return Scored value of the removed element. - */ - def zpopmax[K, V](key: K)(implicit connection: StatefulRedisConnection[K, V]): Task[ScoredValue[V]] = - Task.from(connection.reactive().zpopmax(key)) - - /** - * Removes and returns up to count members with the highest scores in the sorted set stored at key. - * @return Scored values of popped scores and elements. - */ - def zpopmax[K, V](key: K, count: Long)( - implicit - connection: StatefulRedisConnection[K, V]): Observable[ScoredValue[V]] = - Observable.fromReactivePublisher(connection.reactive().zpopmax(key, count)) - - /** - * Return a range of members in a sorted set, by index. - * @return Elements in the specified range. - */ - def zrange[K, V](key: K, start: Long, stop: Long)(implicit connection: StatefulRedisConnection[K, V]): Observable[V] = - Observable.fromReactivePublisher(connection.reactive().zrange(key, start, stop)) - - /** - * Return a range of members with scores in a sorted set, by index. - * @return Elements in the specified range. - */ - def zrangeWithScores[K, V](key: K, start: Long, stop: Long)( - implicit - connection: StatefulRedisConnection[K, V]): Observable[ScoredValue[V]] = - Observable.fromReactivePublisher(connection.reactive().zrangeWithScores(key, start, stop)) - - /** - * Return a range of members in a sorted set, by lexicographical range. - * @return Elements in the specified range. - */ - def zrangebylex[K, V](key: K, range: Range[_ <: V])( - implicit - connection: StatefulRedisConnection[K, V]): Observable[V] = - Observable.fromReactivePublisher(connection.reactive().zrangebylex(key, range)) - - /** - * Return a range of members in a sorted set, by lexicographical range. - * @return Elements in the specified range. - */ - def zrangebylex[K, V](key: K, range: Range[_ <: V], limit: Limit)( - implicit - connection: StatefulRedisConnection[K, V]): Observable[V] = - Observable.fromReactivePublisher(connection.reactive().zrangebylex(key, range, limit)) - - /** - * Return a range of members in a sorted set, by score. - * @return Elements in the specified score range. - */ - def zrangebyscore[K, V](key: K, range: Range[_ <: Number])( - implicit - connection: StatefulRedisConnection[K, V]): Observable[V] = - Observable.fromReactivePublisher(connection.reactive().zrangebyscore(key, range)) - - /** - * Return a range of members in a sorted set, by score. - * @return Elements in the specified score range. - */ - def zrangebyscore[K, V](key: K, range: Range[_ <: Number], limit: Limit)( - implicit - connection: StatefulRedisConnection[K, V]): Observable[V] = - Observable.fromReactivePublisher(connection.reactive().zrangebyscore(key, range, limit)) - - /** - * Return a range of members with score in a sorted set, by score. - * @return Scored values in the specified score range. - */ - def zrangebyscoreWithScores[K, V](key: K, range: Range[_ <: Number])( - implicit - connection: StatefulRedisConnection[K, V]): Observable[ScoredValue[V]] = - Observable.fromReactivePublisher(connection.reactive().zrangebyscoreWithScores(key, range)) - - /** - * Return a range of members with score in a sorted set, by score. - * @return Elements in the specified score range. - */ - def zrangebyscoreWithScores[K, V](key: K, range: Range[_ <: Number], limit: Limit)( - implicit - connection: StatefulRedisConnection[K, V]): Observable[ScoredValue[V]] = - Observable.fromReactivePublisher(connection.reactive().zrangebyscoreWithScores(key, range, limit)) - - /** - * Determine the index of a member in a sorted set. - * @return The rank of member. If member does not exist in the sorted set or key does not exist. - */ - def zrank[K, V](key: K, member: V)(implicit connection: StatefulRedisConnection[K, V]): Task[Long] = - Task.from(connection.reactive().zrank(key, member)).map(_.longValue) - - /** - * Remove one or more members from a sorted set. - * @return The number of members removed from the sorted set, not including non existing members. - */ - def zrem[K, V](key: K, members: V*)(implicit connection: StatefulRedisConnection[K, V]): Task[Long] = - Task.from(connection.reactive().zrem(key, members: _*)).map(_.longValue) - - /** - * Remove all members in a sorted set between the given lexicographical range. - * @return The number of elements removed. - */ - def zremrangebylex[K, V](key: K, range: Range[_ <: V])( - implicit - connection: StatefulRedisConnection[K, V]): Task[Long] = - Task.from(connection.reactive().zremrangebylex(key, range)).map(_.longValue) - - /** - * Remove all members in a sorted set within the given indexes. - * @return The number of elements removed. - */ - def zremrangebyrank[K, V](key: K, start: Long, stop: Long)( - implicit - connection: StatefulRedisConnection[K, V]): Task[Long] = - Task.from(connection.reactive().zremrangebyrank(key, start, stop)).map(_.longValue) - - /** - * Remove all members in a sorted set within the given scores. - * @return The number of elements removed. - */ - def zremrangebyscore[K, V](key: K, range: Range[_ <: Number])( - implicit - connection: StatefulRedisConnection[K, V]): Task[Long] = - Task.from(connection.reactive().zremrangebyscore(key, range)).map(_.longValue) - - /** - * Return a range of members in a sorted set, by index, with scores ordered from high to low. - * @return Elements in the specified range. - */ - def zrevrange[K, V](key: K, start: Long, stop: Long)( - implicit - connection: StatefulRedisConnection[K, V]): Observable[V] = - Observable.fromReactivePublisher(connection.reactive().zrevrange(key, start, stop)) - - /** - * Return a range of members with scores in a sorted set, by index, with scores ordered from high to low. - * @return Elements in the specified range. - */ - def zrevrangeWithScores[K, V](key: K, start: Long, stop: Long)( - implicit - connection: StatefulRedisConnection[K, V]): Observable[ScoredValue[V]] = - Observable.fromReactivePublisher(connection.reactive().zrevrangeWithScores(key, start, stop)) - - /** - * Return a range of members in a sorted set, by lexicographical range ordered from high to low. - * @return Elements in the specified score range. - */ - def zrevrangebylex[K, V](key: K, range: Range[_ <: V])( - implicit - connection: StatefulRedisConnection[K, V]): Observable[V] = - Observable.fromReactivePublisher(connection.reactive().zrevrangebylex(key, range)) - - /** - * Return a range of members in a sorted set, by lexicographical range ordered from high to low. - * @return Elements in the specified score range. - */ - def zrevrangebylex[K, V](key: K, range: Range[_ <: V], limit: Limit)( - implicit - connection: StatefulRedisConnection[K, V]): Observable[V] = - Observable.fromReactivePublisher(connection.reactive().zrevrangebylex(key, range, limit)) - - /** - * Return a range of members in a sorted set, by score, with scores ordered from high to low. - * @return Elements in the specified score range. - */ - def zrevrangebyscore[K, V](key: K, range: Range[_ <: Number])( - implicit - connection: StatefulRedisConnection[K, V]): Observable[V] = - Observable.fromReactivePublisher(connection.reactive().zrevrangebyscore(key, range)) - - /** - * Return a range of members in a sorted set, by score, with scores ordered from high to low. - * @return Elements in the specified score range. - */ - def zrevrangebyscore[K, V](key: K, range: Range[_ <: Number], limit: Limit)( - implicit - connection: StatefulRedisConnection[K, V]): Observable[V] = - Observable.fromReactivePublisher(connection.reactive().zrevrangebyscore(key, range, limit)) - - /** - * Return a range of members with scores in a sorted set, by score, with scores ordered from high to low. - * @return Elements in the specified score range. - */ - def zrevrangebyscoreWithScores[K, V](key: K, range: Range[_ <: Number])( - implicit - connection: StatefulRedisConnection[K, V]): Observable[ScoredValue[V]] = - Observable.fromReactivePublisher(connection.reactive().zrevrangebyscoreWithScores(key, range)) - - /** - * Return a range of members with scores in a sorted set, by score, with scores ordered from high to low. - * @return Elements in the specified score range. - */ - def zrevrangebyscoreWithScores[K, V](key: K, range: Range[_ <: Number], limit: Limit)( - implicit - connection: StatefulRedisConnection[K, V]): Observable[ScoredValue[V]] = - Observable.fromReactivePublisher(connection.reactive().zrevrangebyscoreWithScores(key, range, limit)) - - /** - * Determine the index of a member in a sorted set, with scores ordered from high to low. - * @return The rank of member. If member does not exist in the sorted set or key - * does not exist. - */ - def zrevrank[K, V](key: K, member: V)(implicit connection: StatefulRedisConnection[K, V]): Task[Long] = - Task.from(connection.reactive().zrevrank(key, member)).map(_.longValue) - - /** - * Incrementally iterate sorted sets elements and associated scores. - * @return Scan cursor. - */ - def zscan[K, V](key: K)(implicit connection: StatefulRedisConnection[K, V]): Task[ScoredValueScanCursor[V]] = - Task.from(connection.reactive().zscan(key)) - - /** - * Get the score associated with the given member in a sorted set. - * @return The score of member represented as string. - */ - def zscore[K, V](key: K, member: V)(implicit connection: StatefulRedisConnection[K, V]): Task[Double] = - Task.from(connection.reactive().zscore(key, member)).map(_.doubleValue) - - /** - * Add multiple sorted sets and store the resulting sorted set in a new key. - * @return The number of elements in the resulting sorted set at destination. - */ - def zunionstore[K, V](destination: K, keys: K*)(implicit connection: StatefulRedisConnection[K, V]): Task[Long] = - Task.from(connection.reactive().zunionstore(destination, keys: _*)).map(_.longValue) - -} - -@deprecated("use the pure `monix.connect.redis.client.RedisConnection`", "0.6.0") -object RedisSortedSet extends RedisSortedSet diff --git a/redis/src/main/scala/monix/connect/redis/RedisStream.scala b/redis/src/main/scala/monix/connect/redis/RedisStream.scala deleted file mode 100644 index b3647d202..000000000 --- a/redis/src/main/scala/monix/connect/redis/RedisStream.scala +++ /dev/null @@ -1,222 +0,0 @@ -/* - * Copyright (c) 2020-2021 by The Monix Connect Project Developers. - * See the project homepage at: https://connect.monix.io - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package monix.connect.redis - -import io.lettuce.core.api.StatefulRedisConnection -import io.lettuce.core.{Limit, Range, StreamMessage, Consumer => LConsumer} -import monix.eval.Task -import monix.reactive.Observable -import io.lettuce.core.XReadArgs.StreamOffset - -/** - * The Stream is a new data type introduced recently, wwhich models a log data structure - * in a more abstract way, like a log file often implemented as a file open in apend only mode, - * Redis streams are primarily an append only data structure. At least conceptually, because being Redis streams - * an abstract data type represented in memory, they implement more powerful opperations, - * to overcome the limits of the log file itself. - * Check the official documentation to see the available operations at: https://redis.io/commands#stream - * @see The reference to lettuce api: [[io.lettuce.core.api.reactive.RedisStreamReactiveCommands]] - * - */ -@deprecated("use the pure `monix.connect.redis.client.RedisConnection`", "0.6.0") -trait RedisStream { - - /** - * Acknowledge one or more messages as processed. - * @return simple-reply the lenght of acknowledged messages. - */ - def xack[K, V](key: K, group: K, messageIds: String*)( - implicit - connection: StatefulRedisConnection[K, V]): Task[Long] = { - Task.from(connection.reactive().xack(key, group, messageIds: _*)).map(_.longValue()) - } - - /** - * Append a message to the stream key. - * @return simple-reply the message Id. - */ - def xadd[K, V](key: K, body: Map[K, V])(implicit connection: StatefulRedisConnection[K, V]): Task[String] = { - Task.from(connection.reactive().xadd(key, body)) - } //1/4 - - /** - * Gets ownership of one or multiple messages in the Pending Entries List of a given stream consumer group. - * - * @return simple-reply the { @link StreamMessage} - */ - def xclaim[K, V](key: K, consumer: LConsumer[K], minIdleTime: Long, messageIds: String*)( - implicit - connection: StatefulRedisConnection[K, V]): Observable[StreamMessage[K, V]] = - Observable.fromReactivePublisher(connection.reactive().xclaim(key, consumer, minIdleTime, messageIds: _*)) - - /** - * Removes the specified entries from the stream. Returns the number of items deleted, that may be different from the number - * of IDs passed in case certain IDs do not exist. - * @return simple-reply number of removed entries. - */ - def xdel[K, V](key: K, messageIds: String*)(implicit connection: StatefulRedisConnection[K, V]): Task[String] = - Task.from(connection.reactive().xadd(key, messageIds: _*)) - - /** - * Create a consumer group. - * @return simple-reply { @literal true} if successful. - */ - def xgroupCreate[K, V](streamOffset: StreamOffset[K], group: K)( - implicit - connection: StatefulRedisConnection[K, V]): Task[String] = - Task.from(connection.reactive().xgroupCreate(streamOffset, group)) - - /** - * Delete a consumer from a consumer group. - * @return simple-reply { @literal true} if successful. - */ - def xgroupDelconsumer[K, V](key: K, consumer: LConsumer[K])( - implicit - connection: StatefulRedisConnection[K, V]): Task[Long] = - Task.from(connection.reactive().xgroupDelconsumer(key, consumer)).map(_.longValue()) - - /** - * Destroy a consumer group. - * @return simple-reply { @literal true} if successful. - */ - def xgroupDestroy[K, V](key: K, group: K)(implicit connection: StatefulRedisConnection[K, V]): Task[Boolean] = - Task.from(connection.reactive().xgroupDestroy(key, group)).map(_.booleanValue()) - - /** - * Set the current group id. - * @return simple-reply OK - */ - def xgroupSetid[K, V](streamOffset: StreamOffset[K], group: K)( - implicit - connection: StatefulRedisConnection[K, V]): Task[String] = - Task.from(connection.reactive().xgroupSetid(streamOffset, group)) - /** - * Get the length of a steam. - * @return simple-reply the lenght of the stream. - */ - def xlen[K, V](key: K)(implicit connection: StatefulRedisConnection[K, V]): Task[Long] = - Task.from(connection.reactive().xlen(key)).map(_.longValue()) - - /** - * Read pending messages from a stream for a group. - * @return List<Object> array-reply list pending entries. - */ - def xpending[K, V](key: K, group: K)(implicit connection: StatefulRedisConnection[K, V]): Observable[Any] = - Observable.fromReactivePublisher(connection.reactive().xpending(key, group)) - - /** - * Read pending messages from a stream within a specific [[Range]]. - * @return List<Object> array-reply list with members of the resulting stream. - */ - def xpending[K, V](key: K, group: K, range: Range[String], limit: Limit)( - implicit - connection: StatefulRedisConnection[K, V]): Observable[Any] = - Observable.fromReactivePublisher(connection.reactive().xpending(key, group)) - - /** - * Read pending messages from a stream within a specific [[Range]]. - * @return List<Object> array-reply list with members of the resulting stream. - */ - def xpending[K, V](key: K, consumer: LConsumer[K], range: Range[String], limit: Limit)( - implicit - connection: StatefulRedisConnection[K, V]): Observable[Any] = - Observable.fromReactivePublisher(connection.reactive().xpending(key, consumer, range, limit)) - - /** - * Read messages from a stream within a specific [[Range]]. - * @return Members of the resulting stream. - */ - def xrange[K, V](key: K, range: Range[String])( - implicit - connection: StatefulRedisConnection[K, V]): Observable[StreamMessage[K, V]] = - Observable.fromReactivePublisher(connection.reactive().xrange(key, range)) - - /** - * Read messages from a stream within a specific [[Range]] applying a [[Limit]]. - * @return Members of the resulting stream. - */ - def xrange[K, V](key: K, range: Range[String], limit: Limit)( - implicit - connection: StatefulRedisConnection[K, V]): Observable[StreamMessage[K, V]] = - Observable.fromReactivePublisher(connection.reactive().xrange(key, range)) - - /** - * Read messages from one or more [[StreamOffset]]s. - * @return Members of the resulting stream. - */ - def xread[K, V](streams: StreamOffset[K]*)( - implicit - connection: StatefulRedisConnection[K, V]): Observable[StreamMessage[K, V]] = - Observable.fromReactivePublisher(connection.reactive().xread(streams: _*)) - - /* /** - * Read messages from one or more [[StreamOffset]]s. - */ - def xread[K, V](args: XReadArgs, streams: StreamOffset[K]*)(implicit connection: StatefulRedisConnection[K, V]): Observable[StreamMessage[K, V]] = - Observable.fromReactivePublisher(connection.reactive().xread(args, streams: _*))*/ - - /** - * Read messages from one or more [[StreamOffset]]s using a consumer group. - * @return List<StreamMessage> array-reply list with members of the resulting stream. - */ - def xreadgroup[K, V](consumer: LConsumer[K], streams: StreamOffset[K]*)( - implicit - connection: StatefulRedisConnection[K, V]): Observable[StreamMessage[K, V]] = - Observable.fromReactivePublisher(connection.reactive().xreadgroup(consumer, streams: _*)) - - /** - * Read messages from a stream within a specific [[Range]] in reverse order. - * @return Members of the resulting stream. - */ - def xrevrange[K, V](key: K, range: Range[String])( - implicit - connection: StatefulRedisConnection[K, V]): Observable[StreamMessage[K, V]] = - Observable.fromReactivePublisher(connection.reactive().xrevrange(key, range)) - - /** - * Read messages from a stream within a specific [[Range]] applying a [[Limit]] in reverse order. - * @return Meembers of the resulting stream. - */ - def xrevrange[K, V](key: K, range: Range[String], limit: Limit)( - implicit - connection: StatefulRedisConnection[K, V]): Observable[StreamMessage[K, V]] = - Observable.fromReactivePublisher(connection.reactive().xrevrange(key, range)) - - /** - * Trims the stream to count elements. - * @return Number of removed entries. - */ - def xtrim[K, V](key: K, count: Long)(implicit connection: StatefulRedisConnection[K, V]): Task[Long] = - Task.from(connection.reactive().xtrim(key, count)).map(_.longValue()) - - /** - * Trims the stream to count elements. - * @return Number of removed entries. - */ - def xtrim[K, V](key: K, approximateTrimming: Boolean, count: Long)( - implicit - connection: StatefulRedisConnection[K, V]): Task[Long] = - Task.from(connection.reactive().xtrim(key, approximateTrimming, count)).map(_.longValue()) - -} - -/** - * Exposes only methods from the RedisStream api - */ -@deprecated("use the pure `monix.connect.redis.client.RedisConnection`", "0.6.0") -object RedisStream extends RedisStream diff --git a/redis/src/main/scala/monix/connect/redis/RedisString.scala b/redis/src/main/scala/monix/connect/redis/RedisString.scala deleted file mode 100644 index 5e4d2ed43..000000000 --- a/redis/src/main/scala/monix/connect/redis/RedisString.scala +++ /dev/null @@ -1,251 +0,0 @@ -/* - * Copyright (c) 2020-2021 by The Monix Connect Project Developers. - * See the project homepage at: https://connect.monix.io - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package monix.connect.redis - -import io.lettuce.core.api.StatefulRedisConnection -import io.lettuce.core._ -import monix.eval.Task -import monix.reactive.Observable - -import scala.jdk.CollectionConverters._ - -/** - * @see The reference to lettuce api [[io.lettuce.core.api.reactive.RedisStringReactiveCommands]] - */ -@deprecated("use the pure `monix.connect.redis.client.RedisConnection`", "0.6.0") -private[redis] trait RedisString { - - /** - * Append a value to a key. - * @return The length of the string after the append operation. - */ - def append[K, V](key: K, value: V)(implicit connection: StatefulRedisConnection[K, V]): Task[Long] = - Task.from(connection.reactive().append(key, value)).map(_.longValue) - - /** - * Count set bits in a string. - * @return The number of bits set to 1. - */ - def bitcount[K, V](key: K)(implicit connection: StatefulRedisConnection[K, V]): Task[Long] = - Task.from(connection.reactive().bitcount(key)).map(_.longValue) - - /** - * Count set bits in a string. - * @return The number of bits set to 1. - */ - def bitcount[K, V](key: K, start: Long, end: Long)(implicit connection: StatefulRedisConnection[K, V]): Task[Long] = - Task.from(connection.reactive().bitcount(key, start, end)).map(_.longValue) - - /** - * Find first bit set or clear in a string. - * @return The command returns the position of the first bit set to 1 or 0 according to the request. - */ - def bitpos[K, V](key: K, state: Boolean)(implicit connection: StatefulRedisConnection[K, V]): Task[Long] = - Task.from(connection.reactive().bitpos(key, state)).map(_.longValue) - - /** - * Find first bit set or clear in a string. - * @return The command returns the position of the first bit set to 1 or 0 according to the request. - */ - def bitpos[K, V](key: K, state: Boolean, start: Long)( - implicit - connection: StatefulRedisConnection[K, V]): Task[Long] = - Task.from(connection.reactive().bitpos(key, state, start)).map(_.longValue) - - /** - * Find first bit set or clear in a string. - * @return The command returns the position of the first bit set to 1 or 0 according to the request. - */ - def bitpos[K, V](key: K, state: Boolean, start: Long, end: Long)( - implicit - connection: StatefulRedisConnection[K, V]): Task[Long] = - Task.from(connection.reactive().bitpos(key, state, start, end)).map(_.longValue) - - /** - * Perform bitwise AND between strings. - * @return The size of the string stored in the destination key, that is equal to the size of the longest - * input string. - */ - def bitopAnd[K, V](destination: K, keys: K*)(implicit connection: StatefulRedisConnection[K, V]): Task[Long] = - Task.from(connection.reactive().bitopAnd(destination, keys: _*)).map(_.longValue) - - /** - * Perform bitwise NOT between strings. - * @return The size of the string stored in the destination key, that is equal to the size of the longest - * input string. - */ - def bitopNot[K, V](destination: K, source: K)(implicit connection: StatefulRedisConnection[K, V]): Task[Long] = - Task.from(connection.reactive().bitopNot(destination, source)).map(_.longValue) - - /** - * Perform bitwise OR between strings. - * @return The size of the string stored in the destination key, that is equal to the size of the longest - * input string. - */ - def bitopOr[K, V](destination: K, keys: K*)(implicit connection: StatefulRedisConnection[K, V]): Task[Long] = - Task.from(connection.reactive().bitopOr(destination, keys: _*)).map(_.longValue) - - /** - * Perform bitwise XOR between strings. - * @return The size of the string stored in the destination key, that is equal to the size of the longest - * input string. - */ - def bitopXor[K, V](destination: K, keys: K*)(implicit connection: StatefulRedisConnection[K, V]): Task[Long] = - Task.from(connection.reactive().bitopXor(destination, keys: _*)).map(_.longValue) - - /** - * Decrement the integer value of a key by one. - * @return The value of key after the decrement - */ - def decr[K, V](key: K)(implicit connection: StatefulRedisConnection[K, V]): Task[Long] = - Task.from(connection.reactive().decr(key)).map(_.longValue) - - /** - * Decrement the integer value of a key by the given number. - * @return The value of key after the decrement. - */ - def decrby[K, V](key: K, amount: Long)(implicit connection: StatefulRedisConnection[K, V]): Task[Long] = - Task.from(connection.reactive().decrby(key, amount)).map(_.longValue) - - /** - * Get the value of a key. - * @return The value of key, or null when key does not exist. - */ - def get[K, V](key: K)(implicit connection: StatefulRedisConnection[K, V]): Task[V] = - Task.from(connection.reactive().get(key)) - - /** - * Returns the bit value at offset in the string value stored at key. - * @return The bit value stored at offset. - */ - def getbit[K, V](key: K, offset: Long)(implicit connection: StatefulRedisConnection[K, V]): Task[Long] = - Task.from(connection.reactive().getbit(key, offset)).map(_.longValue) - - /** - * Get a substring of the string stored at a key. - * @return Bulk string reply. - */ - def getrange[K, V](key: K, start: Long, end: Long)(implicit connection: StatefulRedisConnection[K, V]): Task[V] = - Task.from(connection.reactive().getrange(key, start, end)) - - /** - * Set the string value of a key and return its old value. - * @return The old value stored at key, or null when key did not exist. - */ - def getset[K, V](key: K, value: V)(implicit connection: StatefulRedisConnection[K, V]): Task[V] = - Task.from(connection.reactive().getset(key, value)) - - /** - * Increment the integer value of a key by one. - * @return The value of key after the increment. - */ - def incr[K, V](key: K)(implicit connection: StatefulRedisConnection[K, V]): Task[Long] = - Task.from(connection.reactive().incr(key)).map(_.longValue) - - /** - * Increment the integer value of a key by the given amount. - * @return The value of key after the increment. - */ - def incrby[K, V](key: K, amount: Long)(implicit connection: StatefulRedisConnection[K, V]): Task[Long] = - Task.from(connection.reactive().incrby(key, amount)).map(_.longValue) - - /** - * Increment the float value of a key by the given amount. - * @return Double bulk string reply the value of key after the increment. - */ - def incrbyfloat[K, V](key: K, amount: Double)(implicit connection: StatefulRedisConnection[K, V]): Task[Double] = - Task.from(connection.reactive().incrbyfloat(key, amount)).map(_.doubleValue) - - /** - * Get the values of all the given keys. - * @return Values at the specified keys. - */ - def mget[K, V](keys: K*)(implicit connection: StatefulRedisConnection[K, V]): Observable[KeyValue[K, V]] = - Observable.fromReactivePublisher(connection.reactive().mget(keys: _*)) - - /** - * Set multiple keys to multiple values. - * @return Always OK since MSET can't fail. - */ - def mset[K, V](map: Map[K, V])(implicit connection: StatefulRedisConnection[K, V]): Task[String] = - Task.from(connection.reactive().mset(map.asJava)) - - /** - * Set multiple keys to multiple values, only if none of the keys exist. - * @return True if the all the keys were set. - * False if no key was set (at least one key already existed). - */ - def msetnx[K, V](map: Map[K, V])(implicit connection: StatefulRedisConnection[K, V]): Task[Boolean] = - Task.from(connection.reactive().msetnx(map.asJava)).map(_.booleanValue) - - /** - * Set the string value of a key. - * @return OK if SET was executed correctly. - */ - def set[K, V](key: K, value: V)(implicit connection: StatefulRedisConnection[K, V]): Task[String] = - Task.from(connection.reactive().set(key, value)) - - /** - * Sets or clears the bit at offset in the string value stored at key. - * @return The original bit value stored at offset. - */ - def setbit[K, V](key: K, offset: Long, value: Int)(implicit connection: StatefulRedisConnection[K, V]): Task[Long] = - Task.from(connection.reactive().setbit(key, offset, value)).map(_.longValue) - - /** - * Set the value and expiration of a key. - * @return Simple string reply. - */ - def setex[K, V](key: K, seconds: Long, value: V)(implicit connection: StatefulRedisConnection[K, V]): Task[String] = - Task.from(connection.reactive().setex(key, seconds, value)) - - /** - * Set the value and expiration in milliseconds of a key. - * @return String simple-string-reply - */ - def psetex[K, V](key: K, milliseconds: Long, value: V)( - implicit - connection: StatefulRedisConnection[K, V]): Task[String] = - Task.from(connection.reactive().psetex(key, milliseconds, value)) - - /** - * Set the value of a key, only if the key does not exist. - * @return True if the key was set. - * False if the key was not set - */ - def setnx[K, V](key: K, value: V)(implicit connection: StatefulRedisConnection[K, V]): Task[Boolean] = - Task.from(connection.reactive().setnx(key, value)).map(_.booleanValue) - - /** - * Overwrite part of a string at key starting at the specified offset. - * @return The length of the string after it was modified by the command. - */ - def setrange[K, V](key: K, offset: Long, value: V)(implicit connection: StatefulRedisConnection[K, V]): Task[Long] = - Task.from(connection.reactive().setrange(key, offset, value)).map(_.longValue) - - /** - * Get the length of the value stored in a key. - * @return The length of the string at key, or 0 when key does not exist. - */ - def strlen[K, V](key: K)(implicit connection: StatefulRedisConnection[K, V]): Task[Long] = - Task.from(connection.reactive().strlen(key)).map(_.longValue) - -} - -@deprecated("use the pure `monix.connect.redis.client.RedisConnection`", "0.6.0") -object RedisString extends RedisString diff --git a/redis/src/main/scala/monix/connect/redis/client/RedisUri.scala b/redis/src/main/scala/monix/connect/redis/client/RedisUri.scala index c6664d156..8cc385cf1 100644 --- a/redis/src/main/scala/monix/connect/redis/client/RedisUri.scala +++ b/redis/src/main/scala/monix/connect/redis/client/RedisUri.scala @@ -74,7 +74,7 @@ class RedisUri( .map(uri => RedisURI.create(uri)) .merge database.foreach(_ => redisUri.setDatabase(_)) - password.foreach(pass => redisUri.setPassword(pass)) + password.foreach(pass => redisUri.setPassword(pass.toCharArray)) ssl.foreach(_ => redisUri.setSsl(_)) verifyPeer.foreach(redisUri.setVerifyPeer) startTls.foreach(_ => redisUri.setStartTls(_)) @@ -127,7 +127,7 @@ object RedisUri { * .withClientName("companyX") * }}} */ - def apply(host: String, port: Int): RedisUri = new RedisUri(Right(host, port)) + def apply(host: String, port: Int): RedisUri = new RedisUri(Right((host, port))) /** * Creates a [[RedisUri]] from a the plain string uri. diff --git a/redis/src/test/scala/monix/connect/redis/RedisSpec.scala b/redis/src/test/scala/monix/connect/redis/RedisSpec.scala deleted file mode 100644 index a09ea1a25..000000000 --- a/redis/src/test/scala/monix/connect/redis/RedisSpec.scala +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Copyright (c) 2020-2021 by The Monix Connect Project Developers. - * See the project homepage at: https://connect.monix.io - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package monix.connect.redis - -import org.scalatest.matchers.should.Matchers -import org.scalatest.wordspec.AnyWordSpecLike -import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} - -class RedisSpec extends AnyWordSpecLike with Matchers with BeforeAndAfterEach with BeforeAndAfterAll { - - s"${Redis} " should { - " implement RedisKey" in { - Redis shouldBe a[RedisKey] - } - "implement RedisHash" in { - Redis shouldBe a[RedisHash] - } - "implement RedisList" in { - Redis shouldBe a[RedisList] - } - "implement RedisPubSub" in { - Redis shouldBe a[RedisPubSub] - } - "implement RedisSet" in { - Redis shouldBe a[RedisSet] - } - "implement RedisSortedSet" in { - Redis shouldBe a[RedisSortedSet] - } - "implement RedisStream" in { - Redis shouldBe a[RedisStream] - } - "implement RedisString" in { - Redis shouldBe a[RedisString] - } - } - -} diff --git a/s3/src/it/scala/monix/connect/s3/S3Suite.scala b/s3/src/it/scala/monix/connect/s3/S3Suite.scala index 8dcf17816..0ee166039 100644 --- a/s3/src/it/scala/monix/connect/s3/S3Suite.scala +++ b/s3/src/it/scala/monix/connect/s3/S3Suite.scala @@ -5,10 +5,11 @@ import monix.connect.aws.auth.MonixAwsConf import java.io.FileInputStream import monix.eval.Task import monix.execution.Scheduler +import monix.execution.exceptions.DummyException import monix.reactive.Observable import monix.testing.scalatest.MonixTaskTest import org.scalacheck.Gen -import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} +import org.scalatest.BeforeAndAfterAll import software.amazon.awssdk.services.s3.model.{CopyObjectResponse, NoSuchBucketException, NoSuchKeyException, PutObjectResponse} import org.scalatest.matchers.should.Matchers import org.scalatest.concurrent.Eventually @@ -29,7 +30,7 @@ class S3Suite } "S3" can "be created from config file" in { - val s3FromConf = S3.fromConfig + val s3FromConf = S3.fromConfig() val (k1, k2) = (Gen.identifier.sample.get, Gen.identifier.sample.get) for { _ <- s3FromConf.use(_.upload(bucketName, k1, k1.getBytes())) @@ -248,7 +249,7 @@ class S3Suite } yield { existedBefore shouldBe false deleteAttempt.isLeft shouldBe true - deleteAttempt.left.get shouldBe a[NoSuchBucketException] + deleteAttempt.swap.getOrElse(DummyException("failed")) shouldBe a[NoSuchBucketException] existsAfterDeletion shouldBe false } } @@ -329,7 +330,7 @@ class S3Suite val keys: List[String] = Gen.listOfN(n, Gen.alphaLowerStr.map(str => prefix + str)).sample.get - S3.fromConfig.use { s3 => + S3.fromConfig().use { s3 => for { _ <- Task.traverse(keys) { key => s3.upload(bucketName, key, "dummyContent".getBytes()) } latest <- s3.listLatestObject(bucketName, prefix = Some(prefix)) @@ -350,7 +351,7 @@ class S3Suite val keys: List[String] = Gen.listOfN(n, Gen.alphaLowerStr.map(str => prefix + str)).sample.get - S3.fromConfig.use { s3 => + S3.fromConfig().use { s3 => for { _ <- Task .traverse(keys) { key => s3.upload(bucketName, key, "dummyContent".getBytes()) } @@ -373,7 +374,7 @@ class S3Suite Gen.listOfN(10, Gen.alphaLowerStr.map(str => prefix + str)).sample.get //when - S3.fromConfig.use { s3 => + S3.fromConfig().use { s3 => for { _ <- Task .traverse(keys) { key => s3.upload(bucketName, key, "dummyContent".getBytes()) } @@ -402,7 +403,7 @@ class S3Suite } yield download } - S3.fromConfig.use(runS3App).asserting(_ shouldBe content.getBytes) + S3.fromConfig().use(runS3App).asserting(_ shouldBe content.getBytes) } diff --git a/s3/src/main/scala/monix/connect/s3/ListObjectsObservable.scala b/s3/src/main/scala/monix/connect/s3/ListObjectsObservable.scala index 72c496c5c..32ba21ad1 100644 --- a/s3/src/main/scala/monix/connect/s3/ListObjectsObservable.scala +++ b/s3/src/main/scala/monix/connect/s3/ListObjectsObservable.scala @@ -119,8 +119,8 @@ private[s3] object ListObjectsObservable { requestPayer: Option[RequestPayer] = None, s3AsyncClient: S3AsyncClient): Observable[S3Object] = { for { - listResponse <- ListObjectsObservable(bucket, prefix, None, None, s3AsyncClient).foldLeft(List.empty[S3Object])( - (prev, curr) => { + listResponse <- ListObjectsObservable(bucket, prefix, Some(n), requestPayer, s3AsyncClient) + .foldLeft(List.empty[S3Object])((prev, curr) => { (prev ++ curr.contents.asScala).sortWith(sort).take(n) }) s3Object <- Observable.fromIterable(listResponse) diff --git a/s3/src/main/scala/monix/connect/s3/MultipartUploadSubscriber.scala b/s3/src/main/scala/monix/connect/s3/MultipartUploadSubscriber.scala index 0ba35cfa8..c40f4e467 100644 --- a/s3/src/main/scala/monix/connect/s3/MultipartUploadSubscriber.scala +++ b/s3/src/main/scala/monix/connect/s3/MultipartUploadSubscriber.scala @@ -150,9 +150,12 @@ private[s3] class MultipartUploadSubscriber( * It should be called either when the received chunk is bigger than minimum size, * or later when the last chunk arrives representing the completion of the stream. */ - def uploadPart(bucket: String, key: String, partNumber: Int, uploadId: String, chunk: Array[Byte])( - implicit - scheduler: Scheduler): Task[CompletedPart] = { + def uploadPart( + bucket: String, + key: String, + partNumber: Int, + uploadId: String, + chunk: Array[Byte]): Task[CompletedPart] = { for { request <- Task { S3RequestBuilder.uploadPartRequest( diff --git a/s3/src/main/scala/monix/connect/s3/S3RequestBuilder.scala b/s3/src/main/scala/monix/connect/s3/S3RequestBuilder.scala index 0bc347def..c399d5abf 100644 --- a/s3/src/main/scala/monix/connect/s3/S3RequestBuilder.scala +++ b/s3/src/main/scala/monix/connect/s3/S3RequestBuilder.scala @@ -138,7 +138,7 @@ private[s3] object S3RequestBuilder { destinationKey: String, copyObjectSettings: CopyObjectSettings): CopyObjectRequest = { val request = CopyObjectRequest.builder - .copySource(sourceBucket + "/" + sourceKey) + .sourceKey(sourceBucket + "/" + sourceKey) .destinationBucket(destinationBucket) .destinationKey(destinationKey) copyObjectSettings.copySourceIfMatches.map(request.copySourceIfMatch) diff --git a/s3/src/test/scala/monix.connect.s3/S3RequestBuilderSpec.scala b/s3/src/test/scala/monix.connect.s3/S3RequestBuilderSpec.scala index be505d6c3..a78c01b5b 100644 --- a/s3/src/test/scala/monix.connect.s3/S3RequestBuilderSpec.scala +++ b/s3/src/test/scala/monix.connect.s3/S3RequestBuilderSpec.scala @@ -109,7 +109,7 @@ class S3RequestBuilderSpec .copyObjectRequest(sourceBucket, sourceKey, destinationBucket, destinationKey, copyObjectSettings) //then - request.copySource shouldBe sourceBucket + "/" + sourceKey + request.sourceKey shouldBe sourceBucket + "/" + sourceKey request.destinationBucket shouldBe destinationBucket request.destinationKey shouldBe destinationKey request.copySourceIfNoneMatch shouldBe copyObjectSettings.copySourceIfNoneMatch.orNull diff --git a/s3/src/test/scala/monix.connect.s3/S3RequestGenerators.scala b/s3/src/test/scala/monix.connect.s3/S3RequestGenerators.scala index d0b278bfe..66a8533c1 100644 --- a/s3/src/test/scala/monix.connect.s3/S3RequestGenerators.scala +++ b/s3/src/test/scala/monix.connect.s3/S3RequestGenerators.scala @@ -200,7 +200,6 @@ trait S3RequestGenerators { ifModifiedSince <- Gen.option(Gen.oneOf(Seq(Instant.now()))) ifNoneMatch <- genOptionStr ifUnmodifiedSince <- Gen.option(Gen.oneOf(Seq(Instant.now()))) - partNumber <- Gen.option(Gen.chooseNum[Int](1, 200)) //maybe to be added in the future requestPayer <- Gen.option(RequestPayer.fromValue("unknown")) sseCustomerAlgorithm <- genOptionStr sseCustomerKey <- genOptionStr diff --git a/s3/src/test/scala/monix.connect.s3/S3Spec.scala b/s3/src/test/scala/monix.connect.s3/S3Spec.scala index 19e7800ea..304b7a00a 100644 --- a/s3/src/test/scala/monix.connect.s3/S3Spec.scala +++ b/s3/src/test/scala/monix.connect.s3/S3Spec.scala @@ -35,7 +35,7 @@ class S3Spec extends AnyFlatSpec with BeforeAndAfterEach with Matchers with Befo override def beforeEach(): Unit = {} s"${S3}" should "use default configurable params" in { - S3.fromConfig.use { s3 => Task(s3 shouldBe a[S3]) }.runSyncUnsafe() + S3.fromConfig().use { s3 => Task(s3 shouldBe a[S3]) }.runSyncUnsafe() } it should "unsafely be created" in { diff --git a/sqs/src/it/scala/monix.connect.sqs/ConsumerSuite.scala b/sqs/src/it/scala/monix.connect.sqs/ConsumerSuite.scala index b6c7a2a15..b04cd42b6 100644 --- a/sqs/src/it/scala/monix.connect.sqs/ConsumerSuite.scala +++ b/sqs/src/it/scala/monix.connect.sqs/ConsumerSuite.scala @@ -5,7 +5,7 @@ import monix.execution.Scheduler import monix.reactive.Observable import monix.testing.scalatest.MonixTaskTest import org.scalacheck.Gen -import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} +import org.scalatest.BeforeAndAfterAll import org.scalatest.concurrent.ScalaFutures import org.scalatest.flatspec.AsyncFlatSpecLike import org.scalatest.matchers.should.Matchers diff --git a/sqs/src/it/scala/monix.connect.sqs/FifoQueueSuite.scala b/sqs/src/it/scala/monix.connect.sqs/FifoQueueSuite.scala index eb939689c..3377b4cc5 100644 --- a/sqs/src/it/scala/monix.connect.sqs/FifoQueueSuite.scala +++ b/sqs/src/it/scala/monix.connect.sqs/FifoQueueSuite.scala @@ -163,7 +163,7 @@ class FifoQueueSuite extends AsyncFlatSpec with MonixTaskTest with Matchers with val deduplicationId = Gen.some(genId).sample.get val delayDuration = Some(5.seconds) val delayedMessage: Message = new Message(body, groupId = groupId, deduplicationId = deduplicationId, delayDuration = delayDuration) - Sqs.fromConfig.use { case Sqs(operator, producer, _) => + Sqs.fromConfig().use { case Sqs(operator, producer, _) => for { fifoQueueName <- Task.from(genFifoQueueName) queueUrl <- operator.createQueue(fifoQueueName, attributes = fifoDeduplicationQueueAttr) diff --git a/sqs/src/it/scala/monix.connect.sqs/OperatorSuite.scala b/sqs/src/it/scala/monix.connect.sqs/OperatorSuite.scala index 1e45880e7..bcdffa43a 100644 --- a/sqs/src/it/scala/monix.connect.sqs/OperatorSuite.scala +++ b/sqs/src/it/scala/monix.connect.sqs/OperatorSuite.scala @@ -1,8 +1,9 @@ package monix.connect.sqs -import monix.connect.sqs.domain.{QueueName, QueueUrl} +import monix.connect.sqs.domain.QueueUrl import monix.eval.Task import monix.execution.Scheduler +import monix.execution.exceptions.DummyException import monix.testing.scalatest.MonixTaskTest import org.scalacheck.Gen import org.scalatest.BeforeAndAfterAll @@ -65,7 +66,7 @@ class OperatorSuite extends AsyncFlatSpec with MonixTaskTest with Matchers with } yield { appliedTags shouldBe initialTags ++ tagsByUrl nonExistingQueue.isLeft shouldBe true - nonExistingQueue.left.get.getMessage should include(invalidRequestErrorMsg) + nonExistingQueue.swap.getOrElse(DummyException("failed")).getMessage should include(invalidRequestErrorMsg) } } @@ -88,7 +89,7 @@ class OperatorSuite extends AsyncFlatSpec with MonixTaskTest with Matchers with untagByUrl shouldBe initialTags.filterNot(kv => kv._1 == queueType) untagByName shouldBe Map(dummyTagKey -> "123") nonExistingQueue.isLeft shouldBe true - nonExistingQueue.left.get.getMessage should include(invalidRequestErrorMsg) + nonExistingQueue.swap.getOrElse(DummyException("failed")).getMessage should include(invalidRequestErrorMsg) } } @@ -105,7 +106,7 @@ class OperatorSuite extends AsyncFlatSpec with MonixTaskTest with Matchers with val expectedAttributes = initialAttributes ++ attributesByUrl attributes.filter(kv => expectedAttributes.keys.toList.contains(kv._1)) should contain theSameElementsAs expectedAttributes nonExistingQueue.isLeft shouldBe true - nonExistingQueue.left.get.getMessage should include(invalidRequestErrorMsg) + nonExistingQueue.swap.getOrElse(DummyException("failed")).getMessage should include(invalidRequestErrorMsg) } } @@ -120,7 +121,7 @@ class OperatorSuite extends AsyncFlatSpec with MonixTaskTest with Matchers with createdQueueUrl shouldBe QueueUrl(queueUrlPrefix(queueName.name)) queueUrl shouldBe QueueUrl(queueUrlPrefix(queueName.name)) nonExistingQueue.isLeft shouldBe true - nonExistingQueue.left.get.isInstanceOf[QueueDoesNotExistException] shouldBe true + nonExistingQueue.swap.getOrElse(DummyException("failed")).isInstanceOf[QueueDoesNotExistException] shouldBe true } } diff --git a/sqs/src/it/scala/monix.connect.sqs/SqsSuite.scala b/sqs/src/it/scala/monix.connect.sqs/SqsSuite.scala index 1ca7fe7a8..046799ae6 100644 --- a/sqs/src/it/scala/monix.connect.sqs/SqsSuite.scala +++ b/sqs/src/it/scala/monix.connect.sqs/SqsSuite.scala @@ -14,7 +14,7 @@ class SqsSuite extends AsyncFlatSpec with MonixTaskTest with Matchers with Befor override implicit val scheduler = Scheduler.io("sqs-suite") s"$Sqs" can "be created from config file" in { - Sqs.fromConfig.use { sqs => + Sqs.fromConfig().use { sqs => for { fifoQueueName <- Task.from(genFifoQueueName) createdQueueUrl <- sqs.operator.createQueue(fifoQueueName) diff --git a/sqs/src/main/scala/monix/connect/sqs/consumer/SqsConsumer.scala b/sqs/src/main/scala/monix/connect/sqs/consumer/SqsConsumer.scala index 5fcd9ffc0..762734c8d 100644 --- a/sqs/src/main/scala/monix/connect/sqs/consumer/SqsConsumer.scala +++ b/sqs/src/main/scala/monix/connect/sqs/consumer/SqsConsumer.scala @@ -265,7 +265,7 @@ class SqsConsumer private[sqs] (private[sqs] implicit val asyncClient: SqsAsyncC queueUrl: QueueUrl, maxMessages: Int, visibilityTimeout: FiniteDuration, - waitTimeSeconds: FiniteDuration = Duration.Zero): ReceiveMessageRequest = { + waitTimeSeconds: FiniteDuration): ReceiveMessageRequest = { val builder = ReceiveMessageRequest.builder .queueUrl(queueUrl.url) .maxNumberOfMessages(maxMessages)