Skip to content

Commit

Permalink
Provide fake enqueuer to support testing jobqueue
Browse files Browse the repository at this point in the history
integration

GitOrigin-RevId: 9ca8871b2f53229a279310829851bd5c51bbcbe2
  • Loading branch information
mateuszmrozewski authored and svc-squareup-copybara committed Mar 5, 2025
1 parent 8fa2711 commit f3a947b
Show file tree
Hide file tree
Showing 12 changed files with 1,062 additions and 28 deletions.
4 changes: 2 additions & 2 deletions misk-aws2-sqs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,13 @@ The module will not be considered beta/GA state until the below items are comple
Outstanding work that needs to be done:
* detailed test
* tracing
* test fixtures
* external queues
* installing retry queue only on request
* detailed documentation
Things that are supported in the old documentation but are questionable:
* aws queue attribute importer
* delayed backoff - old implementation does not take into account the original visibility timeout,
only count of retries
Outstanding things to document:
* how batch size plays out with channel size and visibility timeout
2 changes: 1 addition & 1 deletion misk-aws2-sqs/api/misk-aws2-sqs.api
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public final class misk/aws2/sqs/jobqueue/SqsJobConsumer$Companion {
}

public final class misk/aws2/sqs/jobqueue/SqsJobEnqueuer : misk/jobqueue/v2/JobEnqueuer {
public fun <init> (Lsoftware/amazon/awssdk/services/sqs/SqsAsyncClient;Lmisk/aws2/sqs/jobqueue/QueueResolver;Lmisk/aws2/sqs/jobqueue/SqsMetrics;Lcom/squareup/moshi/Moshi;)V
public fun <init> (Lsoftware/amazon/awssdk/services/sqs/SqsAsyncClient;Lmisk/aws2/sqs/jobqueue/QueueResolver;Lwisp/token/TokenGenerator;Lmisk/aws2/sqs/jobqueue/SqsMetrics;Lcom/squareup/moshi/Moshi;)V
public fun enqueue (Lmisk/jobqueue/QueueName;Ljava/lang/String;Ljava/lang/String;Ljava/time/Duration;Ljava/util/Map;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun enqueueAsync (Lmisk/jobqueue/QueueName;Ljava/lang/String;Ljava/lang/String;Ljava/time/Duration;Ljava/util/Map;)Ljava/util/concurrent/CompletableFuture;
public fun enqueueBlocking (Lmisk/jobqueue/QueueName;Ljava/lang/String;Ljava/lang/String;Ljava/time/Duration;Ljava/util/Map;)V
Expand Down
20 changes: 11 additions & 9 deletions misk-aws2-sqs/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,28 @@ plugins {

dependencies {
api(libs.aws2Auth)
implementation(libs.aws2Regions)
api(libs.aws2Sqs)
api(libs.guava)
api(libs.guice)
api(libs.kotlinxCoroutinesCore)
api(libs.jakartaInject)
api(libs.moshiCore)
api(libs.prometheusClient)
api(project(":wisp:wisp-config"))
implementation(libs.aws2Core)
implementation(project(":misk"))
implementation(project(":misk-api"))
api(project(":misk-aws"))
api(project(":misk-inject"))
api(project(":misk-jobqueue"))
api(project(":wisp:wisp-config"))
api(project(":wisp:wisp-token"))
implementation(libs.aws2Core)
implementation(libs.aws2Regions)
implementation(libs.loggingApi)
runtimeOnly(libs.openTracingDatadog)
implementation(project(":wisp:wisp-logging"))
implementation(project(":misk"))
implementation(project(":misk-api"))
implementation(project(":misk-core"))
implementation(project(":misk-metrics"))
implementation(project(":misk-service"))
implementation(project(":wisp:wisp-logging"))
runtimeOnly(libs.openTracingDatadog)
testImplementation(libs.assertj)
testImplementation(libs.awaitility)
testImplementation(libs.dockerApi)
Expand All @@ -38,12 +40,12 @@ dependencies {
testImplementation(libs.kotlinxCoroutinesTest)
testImplementation(libs.mockitoCore)
testImplementation(libs.mockitoKotlin)
testImplementation(project(":misk-clustering"))
testImplementation(project(":misk-testing"))
testImplementation(project(":wisp:wisp-containers-testing"))
testImplementation(project(":wisp:wisp-feature-testing"))
testImplementation(project(":wisp:wisp-time-testing"))
testImplementation(project(":wisp:wisp-logging-testing"))
testImplementation(project(":misk-clustering"))
testImplementation(project(":misk-testing"))
}

mavenPublishing {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,28 @@ import misk.jobqueue.QueueName
import misk.jobqueue.sqs.parentQueue
import misk.jobqueue.v2.JobEnqueuer
import misk.moshi.adapter
import misk.tokens.TokenGenerator
import software.amazon.awssdk.services.sqs.SqsAsyncClient
import software.amazon.awssdk.services.sqs.model.MessageAttributeValue
import software.amazon.awssdk.services.sqs.model.SendMessageRequest
import software.amazon.awssdk.services.sqs.model.SendMessageResponse
import java.time.Duration
import java.util.concurrent.CompletableFuture

@Singleton
class SqsJobEnqueuer @Inject constructor(
private val client: SqsAsyncClient,
private val queueResolver: QueueResolver,
private val tokenGenerator: TokenGenerator,
private val sqsMetrics: SqsMetrics,
private val moshi: Moshi,
) : JobEnqueuer<SendMessageResponse> {
) : JobEnqueuer {
/**
* Enqueue the job and suspend waiting for the confirmation
*/
override suspend fun enqueue(
queueName: QueueName,
body: String,
idempotencyKey: String,
idempotencyKey: String?,
deliveryDelay: Duration?,
attributes: Map<String, String>,
) {
Expand All @@ -47,7 +48,7 @@ class SqsJobEnqueuer @Inject constructor(
override fun enqueueBlocking(
queueName: QueueName,
body: String,
idempotencyKey: String,
idempotencyKey: String?,
deliveryDelay: Duration?,
attributes: Map<String, String>,
) {
Expand All @@ -68,16 +69,17 @@ class SqsJobEnqueuer @Inject constructor(
override fun enqueueAsync(
queueName: QueueName,
body: String,
idempotencyKey: String,
idempotencyKey: String?,
deliveryDelay: Duration?,
attributes: Map<String, String>,
): CompletableFuture<SendMessageResponse> {
): CompletableFuture<Boolean> {
val queueUrl = queueResolver.getQueueUrl(queueName)
val resolvedIdempotencyKey = idempotencyKey ?: tokenGenerator.generate()

val attrs = attributes.map {
it.key to MessageAttributeValue.builder().dataType("String").stringValue(it.value).build()
}.toMap().toMutableMap()
attrs[SqsJob.JOBQUEUE_METADATA_ATTR] = createMetadataMessageAttributeValue(queueName, idempotencyKey)
attrs[SqsJob.JOBQUEUE_METADATA_ATTR] = createMetadataMessageAttributeValue(queueName, resolvedIdempotencyKey)

val request = SendMessageRequest.builder()
.queueUrl(queueUrl)
Expand All @@ -92,7 +94,7 @@ class SqsJobEnqueuer @Inject constructor(
sqsMetrics.jobsEnqueued.labels(queueName.value).inc()
response.whenComplete { _, _ ->
timer.observeDuration()
}
}.thenCompose { CompletableFuture.supplyAsync { true } }
} catch (e: Exception) {
sqsMetrics.jobEnqueueFailures.labels(queueName.value).inc()
throw e
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ import misk.annotation.ExperimentalMiskApi
import misk.cloud.aws.AwsRegion
import misk.inject.KAbstractModule
import misk.jobqueue.v2.JobConsumer
import misk.jobqueue.v2.JobEnqueuer
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
import software.amazon.awssdk.awscore.client.builder.AwsClientBuilder
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.sqs.SqsAsyncClient
import software.amazon.awssdk.services.sqs.SqsAsyncClientBuilder
import software.amazon.awssdk.services.sqs.model.SendMessageResponse
import java.net.URI

@ExperimentalMiskApi
Expand All @@ -23,6 +25,7 @@ open class SqsJobQueueModule @JvmOverloads constructor(
requireBinding<AwsRegion>()
install(ServiceModule<SqsJobConsumer>().dependsOn<ReadyService>())
bind<JobConsumer>().to<SqsJobConsumer>()
bind<JobEnqueuer>().to<SqsJobEnqueuer>()
}

@Provides
Expand Down
67 changes: 67 additions & 0 deletions misk-jobqueue/api/misk-jobqueue.api
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,73 @@ public abstract interface class misk/jobqueue/v2/BlockingJobHandler : misk/jobqu
public abstract fun handleJob (Lmisk/jobqueue/v2/Job;)Lmisk/jobqueue/v2/JobStatus;
}

public final class misk/jobqueue/v2/FakeJob : java/lang/Comparable, misk/jobqueue/v2/Job {
public fun <init> (Lmisk/jobqueue/QueueName;Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Ljava/util/Map;Ljava/time/Instant;Ljava/time/Duration;)V
public synthetic fun <init> (Lmisk/jobqueue/QueueName;Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Ljava/util/Map;Ljava/time/Instant;Ljava/time/Duration;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
public synthetic fun compareTo (Ljava/lang/Object;)I
public fun compareTo (Lmisk/jobqueue/v2/FakeJob;)I
public final fun component1 ()Lmisk/jobqueue/QueueName;
public final fun component2 ()Ljava/lang/String;
public final fun component3 ()Ljava/lang/String;
public final fun component4 ()Ljava/lang/String;
public final fun component5 ()Ljava/util/Map;
public final fun component6 ()Ljava/time/Instant;
public final fun component7 ()Ljava/time/Duration;
public final fun copy (Lmisk/jobqueue/QueueName;Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Ljava/util/Map;Ljava/time/Instant;Ljava/time/Duration;)Lmisk/jobqueue/v2/FakeJob;
public static synthetic fun copy$default (Lmisk/jobqueue/v2/FakeJob;Lmisk/jobqueue/QueueName;Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Ljava/util/Map;Ljava/time/Instant;Ljava/time/Duration;ILjava/lang/Object;)Lmisk/jobqueue/v2/FakeJob;
public final fun delayWithBackoff ()V
public fun equals (Ljava/lang/Object;)Z
public final fun getAcknowledged ()Z
public fun getAttributes ()Ljava/util/Map;
public fun getBody ()Ljava/lang/String;
public final fun getDeadLettered ()Z
public final fun getDelayDuration ()Ljava/lang/Long;
public final fun getDelayedForBackoff ()Z
public final fun getDeliverAt ()Ljava/time/Instant;
public final fun getDeliveryDelay ()Ljava/time/Duration;
public final fun getEnqueuedAt ()Ljava/time/Instant;
public fun getId ()Ljava/lang/String;
public fun getIdempotenceKey ()Ljava/lang/String;
public fun getQueueName ()Lmisk/jobqueue/QueueName;
public fun hashCode ()I
public final fun setDeliveryDelay (Ljava/time/Duration;)V
public fun toString ()Ljava/lang/String;
}

public final class misk/jobqueue/v2/FakeJobEnqueuer : misk/testing/FakeFixture, misk/jobqueue/v2/JobEnqueuer {
public fun <init> (Ljava/time/Clock;Lcom/google/inject/Provider;Lwisp/token/TokenGenerator;)V
public fun enqueue (Lmisk/jobqueue/QueueName;Ljava/lang/String;Ljava/lang/String;Ljava/time/Duration;Ljava/util/Map;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun enqueueAsync (Lmisk/jobqueue/QueueName;Ljava/lang/String;Ljava/lang/String;Ljava/time/Duration;Ljava/util/Map;)Ljava/util/concurrent/CompletableFuture;
public fun enqueueBlocking (Lmisk/jobqueue/QueueName;Ljava/lang/String;Ljava/lang/String;Ljava/time/Duration;Ljava/util/Map;)V
public final fun handleJob (Lmisk/jobqueue/v2/Job;ZI)Z
public static synthetic fun handleJob$default (Lmisk/jobqueue/v2/FakeJobEnqueuer;Lmisk/jobqueue/v2/Job;ZIILjava/lang/Object;)Z
public final fun handleJobs (Lmisk/jobqueue/QueueName;ZIZ)Ljava/util/List;
public final fun handleJobs (ZZ)Ljava/util/List;
public static synthetic fun handleJobs$default (Lmisk/jobqueue/v2/FakeJobEnqueuer;Lmisk/jobqueue/QueueName;ZIZILjava/lang/Object;)Ljava/util/List;
public static synthetic fun handleJobs$default (Lmisk/jobqueue/v2/FakeJobEnqueuer;ZZILjava/lang/Object;)Ljava/util/List;
public final fun peekDeadlettered (Lmisk/jobqueue/QueueName;)Ljava/util/List;
public final fun peekJobs (Lmisk/jobqueue/QueueName;)Ljava/util/List;
public final fun pushFailure (Ljava/lang/Exception;Lmisk/jobqueue/QueueName;)V
public static synthetic fun pushFailure$default (Lmisk/jobqueue/v2/FakeJobEnqueuer;Ljava/lang/Exception;Lmisk/jobqueue/QueueName;ILjava/lang/Object;)V
public final fun reprocessDeadlettered (Lmisk/jobqueue/QueueName;ZI)Ljava/util/List;
public final fun reprocessDeadlettered (Lmisk/jobqueue/v2/Job;ZI)Z
public static synthetic fun reprocessDeadlettered$default (Lmisk/jobqueue/v2/FakeJobEnqueuer;Lmisk/jobqueue/QueueName;ZIILjava/lang/Object;)Ljava/util/List;
public static synthetic fun reprocessDeadlettered$default (Lmisk/jobqueue/v2/FakeJobEnqueuer;Lmisk/jobqueue/v2/Job;ZIILjava/lang/Object;)Z
}

public final class misk/jobqueue/v2/FakeJobEnqueuerModule : misk/inject/KAbstractModule {
public fun <init> ()V
}

public final class misk/jobqueue/v2/FakeJobHandlerModule : misk/inject/KAbstractModule {
public static final field Companion Lmisk/jobqueue/v2/FakeJobHandlerModule$Companion;
public synthetic fun <init> (Lmisk/jobqueue/QueueName;Lkotlin/reflect/KClass;Lkotlin/jvm/internal/DefaultConstructorMarker;)V
}

public final class misk/jobqueue/v2/FakeJobHandlerModule$Companion {
public final fun create (Lmisk/jobqueue/QueueName;Lkotlin/reflect/KClass;)Lmisk/jobqueue/v2/FakeJobHandlerModule;
}

public abstract interface class misk/jobqueue/v2/Job {
public abstract fun getAttributes ()Ljava/util/Map;
public abstract fun getBody ()Ljava/lang/String;
Expand Down
1 change: 1 addition & 0 deletions misk-jobqueue/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ dependencies {
testImplementation(libs.junitApi)
testImplementation(libs.loggingApi)
testImplementation(libs.kotlinTest)
testImplementation(libs.kotlinxCoroutinesTest)
testImplementation(libs.logbackClassic)
testImplementation(libs.moshiCore)
testImplementation(project(":wisp:wisp-logging"))
Expand Down
13 changes: 5 additions & 8 deletions misk-jobqueue/src/main/kotlin/misk/jobqueue/v2/JobEnqueuer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,16 @@ package misk.jobqueue.v2

import misk.jobqueue.QueueName
import java.time.Duration
import java.util.UUID
import java.util.concurrent.CompletableFuture

interface JobEnqueuer<T> {
interface JobEnqueuer {
/**
* Enqueue the job and suspend waiting for the confirmation
*/
suspend fun enqueue(
queueName: QueueName,
body: String,
idempotencyKey: String = UUID.randomUUID().toString(),
idempotencyKey: String? = null,
deliveryDelay: Duration? = Duration.ZERO,
attributes: Map<String, String> = emptyMap(),
)
Expand All @@ -23,21 +22,19 @@ interface JobEnqueuer<T> {
fun enqueueBlocking(
queueName: QueueName,
body: String,
idempotencyKey: String = UUID.randomUUID().toString(),
idempotencyKey: String? = null,
deliveryDelay: Duration? = Duration.ZERO,
attributes: Map<String, String> = emptyMap(),
)

/**
* Enqueue the job and return a CompletableFuture.
*
* This call does not record sending metrics as it's asynchronous.
*/
fun enqueueAsync(
queueName: QueueName,
body: String,
idempotencyKey: String = UUID.randomUUID().toString(),
idempotencyKey: String? = null,
deliveryDelay: Duration? = Duration.ZERO,
attributes: Map<String, String> = emptyMap(),
): CompletableFuture<T>
): CompletableFuture<Boolean>
}
Loading

0 comments on commit f3a947b

Please sign in to comment.