Skip to content

Commit

Permalink
Update scalafmt-core to 3.7.15 (#423)
Browse files Browse the repository at this point in the history
* Update scalafmt-core to 3.7.15

* Reformat with scalafmt 3.7.15

Executed command: scalafmt --non-interactive

* Add 'Reformat with scalafmt 3.7.15' to .git-blame-ignore-revs
  • Loading branch information
scala-steward authored Nov 3, 2023
1 parent 9af0cf9 commit 9e81411
Show file tree
Hide file tree
Showing 21 changed files with 224 additions and 49 deletions.
2 changes: 2 additions & 0 deletions .git-blame-ignore-revs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Scala Steward: Reformat with scalafmt 3.7.15
0770922ed768fdc30a3ea6b07a936d1ec6e82c53
2 changes: 1 addition & 1 deletion .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version = "3.6.1"
version = "3.7.15"
runner.dialect = scala213
maxColumn = 140
align.preset = some
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,12 @@ object syntax {
final private[syntax] class AsJsonSenderPartiallyApplied[F[_], P, A](private val sender: Sender[F, Message[P]]) extends AnyVal {

@scala.annotation.nowarn("cat=unused-params")
def apply[R >: P](to: Destination[R])(implicit encoder: Encoder[A], noGroupId: GroupIdMeta.Absent[R]): Sender[F, A] =
def apply[R >: P](
to: Destination[R]
)(
implicit encoder: Encoder[A],
noGroupId: GroupIdMeta.Absent[R]
): Sender[F, A] =
sender.contramap(JsonMessage(_, to).widen)

}
Expand Down Expand Up @@ -101,7 +106,10 @@ object syntax {

implicit final class ConsumerCirceExtensions[F[_], A](private val consumer: Consumer[F, A]) extends AnyVal {

def asJsonConsumer[B: Decoder](implicit M: MonadError[F, _ >: io.circe.Error], ev: A <:< Payload): Consumer[F, B] =
def asJsonConsumer[B: Decoder](
implicit M: MonadError[F, _ >: io.circe.Error],
ev: A <:< Payload
): Consumer[F, B] =
consumer.mapM(msg => decode[B](msg.text).liftTo[F])

def asJsonConsumerWithMessage[B: Decoder](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@ object syntax {

implicit final class LoggingSenderSyntax[F[_], P](private val underlying: Sender[F, Message[P]]) extends AnyVal {

def logged(implicit logger: Logger[F], F: MonadCancelThrow[F]): Sender[F, Message[P]] = {
def logged(
implicit logger: Logger[F],
F: MonadCancelThrow[F]
): Sender[F, Message[P]] = {
val logBefore = Sender.fromFunction[F, Message[P]] { msg =>
logger.trace(s"Sending message to destination [${msg.destination}]: [${msg.payload}]")
}
Expand All @@ -54,7 +57,12 @@ object syntax {

implicit final class LoggingConsumerSyntax[F[_], A](private val underlying: Consumer[F, Payload]) extends AnyVal {

def logged[P](source: Source[P])(implicit logger: Logger[F], F: MonadCancelThrow[F]): Consumer[F, Payload] =
def logged[P](
source: Source[P]
)(
implicit logger: Logger[F],
F: MonadCancelThrow[F]
): Consumer[F, Payload] =
underlying
.surroundAll { run =>
logger.info(s"Starting consumer from [$source]") *> run.guaranteeCase {
Expand All @@ -76,7 +84,10 @@ object syntax {

implicit final class LoggingBrokerSyntax[F[_], P](private val underlying: Broker[F, P]) extends AnyVal {

def logged(implicit logger: Logger[F], F: MonadCancelThrow[F]): Broker[F, P] =
def logged(
implicit logger: Logger[F],
F: MonadCancelThrow[F]
): Broker[F, P] =
new Broker[F, P] {
override def consumer[R >: P](source: Source[R]): Consumer[F, Payload] =
underlying.consumer(source).logged(source)
Expand All @@ -89,7 +100,10 @@ object syntax {

implicit final class ConnectorLoggingSyntax[F[_], P](val self: Connector[F, P]) extends AnyVal {

def logged(implicit logger: Logger[F], F: Monad[F]): Connector.Aux[F, P, self.Raw] =
def logged(
implicit logger: Logger[F],
F: Monad[F]
): Connector.Aux[F, P, self.Raw] =
new Connector[F, P] {
type Raw = self.Raw
val underlying: self.Raw = self.underlying
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,12 @@ object syntax {
}

implicit final class ConsumeXmlMessageSyntax[F[_]](private val consumer: Consumer[F, String]) {
def asXmlConsumer[A: XmlDecoder](implicit M: MonadError[F, _ >: DecodingError]): Consumer[F, A] =

def asXmlConsumer[A: XmlDecoder](
implicit M: MonadError[F, _ >: DecodingError]
): Consumer[F, A] =
consumer.mapM(XmlDecoder[A].decode(_).liftTo[F])

}

implicit final class ConsumeXmlGenericMessageSyntax[F[_], A](private val consumer: Consumer[F, A]) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ trait S3Client[F[_]] {

object S3Client {

def apply[F[_]](implicit ev: S3Client[F]): S3Client[F] = ev
def apply[F[_]](
implicit ev: S3Client[F]
): S3Client[F] = ev

def usingBuilder[F[_]: Async](
s3Builder: S3AsyncClientBuilder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,11 @@ object syntax {
message = s3.replacePointerWithPayload(config, msg, payload)
} yield message

private def removeDataFromS3(pointer: PayloadS3Pointer)(implicit s3Client: S3Client[F]): F[Unit] =
private def removeDataFromS3(
pointer: PayloadS3Pointer
)(
implicit s3Client: S3Client[F]
): F[Unit] =
s3Client.deleteObject(pointer.s3BucketName, pointer.s3Key)

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,13 @@ private[activemq] object taps {

implicit class AkkaSourceDsl[A, M](source: Graph[SourceShape[A], M]) {

def toStream[F[_]: Async](onMaterialization: M => Unit = _ => ())(implicit materializer: Materializer): Stream[F, A] =
def toStream[F[_]: Async](
onMaterialization: M => Unit = _ => ()
)(
implicit materializer: Materializer
): Stream[F, A] =
akkaSourceToFs2Stream(source)(onMaterialization)

}

implicit class AkkaFlowDsl[A, B, M](flow: Graph[FlowShape[A, B], M]) {
Expand Down Expand Up @@ -141,7 +146,11 @@ private[activemq] object taps {
}
}

private def subscriberStream[F[_], A](subscriber: SinkQueueWithCancel[A])(implicit F: Async[F]): Stream[F, A] = {
private def subscriberStream[F[_], A](
subscriber: SinkQueueWithCancel[A]
)(
implicit F: Async[F]
): Stream[F, A] = {
val pull = Async[F].fromFuture(F.delay(subscriber.pull()))
val cancel = F.delay(subscriber.cancel())
Stream.repeatEval(pull).unNoneTerminate.onFinalize(cancel)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,13 @@ private[activemq] object taps {

implicit class AkkaSourceDsl[A, M](source: Graph[SourceShape[A], M]) {

def toStream[F[_]: Async](onMaterialization: M => Unit = _ => ())(implicit materializer: Materializer): Stream[F, A] =
def toStream[F[_]: Async](
onMaterialization: M => Unit = _ => ()
)(
implicit materializer: Materializer
): Stream[F, A] =
akkaSourceToFs2Stream(source)(onMaterialization)

}

implicit class AkkaFlowDsl[A, B, M](flow: Graph[FlowShape[A, B], M]) {
Expand Down Expand Up @@ -141,7 +146,11 @@ private[activemq] object taps {
}
}

private def subscriberStream[F[_], A](subscriber: SinkQueueWithCancel[A])(implicit F: Async[F]): Stream[F, A] = {
private def subscriberStream[F[_], A](
subscriber: SinkQueueWithCancel[A]
)(
implicit F: Async[F]
): Stream[F, A] = {
val cancel = F.delay(subscriber.cancel())
val pull = Async[F].fromFutureCancelable(F.delay((subscriber.pull(), cancel)))
Stream.repeatEval(pull).unNoneTerminate.onFinalize(cancel)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,10 @@ trait KinesisAttributesProvider[F[_]] {
}

object KinesisAttributesProvider {
def apply[F[_]](implicit ev: KinesisAttributesProvider[F]): KinesisAttributesProvider[F] = ev

def apply[F[_]](
implicit ev: KinesisAttributesProvider[F]
): KinesisAttributesProvider[F] = ev

def default[F[_]: Sync]: KinesisAttributesProvider[F] =
(payload: Payload, _: KinesisDestination) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,10 @@ trait SnsAttributesProvider[F[_]] {
}

object SnsAttributesProvider {
def apply[F[_]](implicit ev: SnsAttributesProvider[F]): SnsAttributesProvider[F] = ev

def apply[F[_]](
implicit ev: SnsAttributesProvider[F]
): SnsAttributesProvider[F] = ev

def default[F[_]: MonadThrow]: SnsAttributesProvider[F] =
new SnsAttributesProvider[F] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,10 @@ trait SqsAttributesProvider[F[_]] {
}

object SqsAttributesProvider {
def apply[F[_]](implicit ev: SqsAttributesProvider[F]): SqsAttributesProvider[F] = ev

def apply[F[_]](
implicit ev: SqsAttributesProvider[F]
): SqsAttributesProvider[F] = ev

def default[F[_]: MonadThrow]: SqsAttributesProvider[F] =
new SqsAttributesProvider[F] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@ trait MyService[F[_]] {
}

object MyService {
def apply[F[_]](implicit F: MyService[F]): F.type = F
def instance[F[_]](implicit sender: Sender[F, Int]): MyService[F] = () => sender.sendOne(42)

def apply[F[_]](
implicit F: MyService[F]
): F.type = F

def instance[F[_]](
implicit sender: Sender[F, Int]
): MyService[F] = () => sender.sendOne(42)

}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ trait Broker[F[_], +P] {

object Broker {

def apply[F[_], P](implicit ev: Broker[F, P]): ev.type = ev
def apply[F[_], P](
implicit ev: Broker[F, P]
): ev.type = ev

def fromConnector[F[_]: Async, P](connector: Connector[F, P]): Broker[F, P] =
new Broker[F, P] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,10 @@ trait Consumer[F[_], +A] extends ((A => F[Unit]) => F[Unit]) with Serializable {
}

object Consumer extends ConsumerInstances {
def apply[F[_], A](implicit C: Consumer[F, A]): Consumer[F, A] = C

def apply[F[_], A](
implicit C: Consumer[F, A]
): Consumer[F, A] = C

/** A consumer that waits for processing of one message (and finalization of its resources), then takes another message from the stream of
* resources. Failures that happen in the message handler are caught and used to rollback the message being processed.
Expand Down Expand Up @@ -254,49 +257,81 @@ object Consumer extends ConsumerInstances {
* are reported normally and have the same impact on the consumer's process as usual processing errors would, meaning they also usually
* trigger a rollback.
*/
def mapM[B](f: A => F[B])(implicit F: FlatMap[F]): Consumer[F, B] =
def mapM[B](
f: A => F[B]
)(
implicit F: FlatMap[F]
): Consumer[F, B] =
handler => self.consume(f >=> handler)

/** Allows to filter certain messages and execute an effect while doing it.
*
* For filtering without an effect use [[functorFilter]] instance.
*/
def evalMapFilter[B](f: A => F[Option[B]])(implicit F: Monad[F]): Consumer[F, B] =
def evalMapFilter[B](
f: A => F[Option[B]]
)(
implicit F: Monad[F]
): Consumer[F, B] =
Consumer.fromFunction[F, B](handler => self.consume(f(_).flatMap(_.fold(Applicative[F].unit)(handler))))

/** Similar to [[mapM()]], but discards the result of the tapped effect.
*/
def contraTapM(f: A => F[Unit])(implicit F: FlatMap[F]): Consumer[F, A] =
def contraTapM(
f: A => F[Unit]
)(
implicit F: FlatMap[F]
): Consumer[F, A] =
handler => self.consume(a => f(a).flatTap(_ => handler(a)))

/** For every message, executes provided finalization action that takes the original message as an input. Useful for performing cleanup
* after the transaction has been completed successfuly. Note that this behaves the same way finalizers would do. The earliest added
* action is executed the last due to the nature of function composition.
*/
def afterEach(f: A => F[Unit])(implicit F: FlatMap[F]): Consumer[F, A] =
def afterEach(
f: A => F[Unit]
)(
implicit F: FlatMap[F]
): Consumer[F, A] =
use => self.consume(msg => use(msg) >> f(msg))

/** For every message, creates an artificial consumer that only handles that one message, and runs it through the given function. This
* follows [[Consumer#flatMap]] semantics, so while the consumer of `B` is busy processing, no further messages will be received by
* `self`.
*/
def selfProduct[B](f: Consumer[F, A] => Consumer[F, B])(implicit F: Defer[F]): Consumer[F, (A, B)] =
def selfProduct[B](
f: Consumer[F, A] => Consumer[F, B]
)(
implicit F: Defer[F]
): Consumer[F, (A, B)] =
self.mproduct(f.compose(Consumer.one[F, A]))

/** Uses this consumer as a source until completion, then it switches to the second consumer.
*/
def zip(another: Consumer[F, A])(implicit F: Apply[F]): Consumer[F, A] =
def zip(
another: Consumer[F, A]
)(
implicit F: Apply[F]
): Consumer[F, A] =
handleA => self.consume(handleA) *> another.consume(handleA)

/** Merges the two consumers by returning one which will run them concurrently, with the same handler. No synchronization between
* underlying consumers is involved - they will run completely independently.
*/
def parZip(another: Consumer[F, A])(implicit F: NonEmptyParallel[F]): Consumer[F, A] =
def parZip(
another: Consumer[F, A]
)(
implicit F: NonEmptyParallel[F]
): Consumer[F, A] =
handleA => F.parProductR(self.consume(handleA))(another.consume(handleA))

}

// For laws. Mortals probably won't have a usecase for this.
implicit def eq[F[_], A](implicit equalFunction: Eq[(A => F[Unit]) => F[Unit]]): Eq[Consumer[F, A]] = equalFunction.narrow
implicit def eq[F[_], A](
implicit equalFunction: Eq[(A => F[Unit]) => F[Unit]]
): Eq[Consumer[F, A]] = equalFunction.narrow

}

// low-priority instances. See Sender companion traits to understand the order they need to be defined in.
Expand Down
Loading

0 comments on commit 9e81411

Please sign in to comment.