Skip to content

Commit

Permalink
Add tracing to the new sqs module
Browse files Browse the repository at this point in the history
GitOrigin-RevId: 34e453645bf0c7d8e076237421449b2767301cc4
  • Loading branch information
mateuszmrozewski authored and svc-squareup-copybara committed Mar 5, 2025
1 parent f3a947b commit 0013008
Show file tree
Hide file tree
Showing 8 changed files with 158 additions and 50 deletions.
8 changes: 5 additions & 3 deletions misk-aws2-sqs/api/misk-aws2-sqs.api
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ public final class misk/aws2/sqs/jobqueue/SqsJob : misk/jobqueue/v2/Job {
public static final field Companion Lmisk/aws2/sqs/jobqueue/SqsJob$Companion;
public static final field JOBQUEUE_METADATA_ATTR Ljava/lang/String;
public static final field JOBQUEUE_METADATA_IDEMPOTENCE_KEY Ljava/lang/String;
public static final field JOBQUEUE_METADATA_ORIGINAL_TRACE_ID Ljava/lang/String;
public static final field JOBQUEUE_METADATA_ORIGIN_QUEUE Ljava/lang/String;
public fun <init> (Lmisk/jobqueue/QueueName;Lcom/squareup/moshi/Moshi;Lsoftware/amazon/awssdk/services/sqs/model/Message;Ljava/lang/String;J)V
public fun getAttributes ()Ljava/util/Map;
Expand All @@ -39,7 +40,7 @@ public final class misk/aws2/sqs/jobqueue/SqsJob$Companion {

public final class misk/aws2/sqs/jobqueue/SqsJobConsumer : com/google/common/util/concurrent/AbstractService, misk/jobqueue/v2/JobConsumer {
public static final field Companion Lmisk/aws2/sqs/jobqueue/SqsJobConsumer$Companion;
public fun <init> (Lsoftware/amazon/awssdk/services/sqs/SqsAsyncClient;Lmisk/aws2/sqs/jobqueue/QueueResolver;Lcom/squareup/moshi/Moshi;Lmisk/aws2/sqs/jobqueue/DeadLetterQueueProvider;Lmisk/aws2/sqs/jobqueue/SqsMetrics;Ljava/time/Clock;)V
public fun <init> (Lsoftware/amazon/awssdk/services/sqs/SqsAsyncClient;Lmisk/aws2/sqs/jobqueue/QueueResolver;Lcom/squareup/moshi/Moshi;Lmisk/aws2/sqs/jobqueue/DeadLetterQueueProvider;Lmisk/aws2/sqs/jobqueue/SqsMetrics;Ljava/time/Clock;Lio/opentracing/Tracer;)V
public fun subscribe (Lmisk/jobqueue/QueueName;Lmisk/jobqueue/v2/JobHandler;)V
public final fun subscribe (Lmisk/jobqueue/QueueName;Lmisk/jobqueue/v2/JobHandler;Lmisk/aws2/sqs/jobqueue/config/SqsQueueConfig;)V
public fun unsubscribe (Lmisk/jobqueue/QueueName;)V
Expand All @@ -50,7 +51,7 @@ public final class misk/aws2/sqs/jobqueue/SqsJobConsumer$Companion {
}

public final class misk/aws2/sqs/jobqueue/SqsJobEnqueuer : misk/jobqueue/v2/JobEnqueuer {
public fun <init> (Lsoftware/amazon/awssdk/services/sqs/SqsAsyncClient;Lmisk/aws2/sqs/jobqueue/QueueResolver;Lwisp/token/TokenGenerator;Lmisk/aws2/sqs/jobqueue/SqsMetrics;Lcom/squareup/moshi/Moshi;)V
public fun <init> (Lsoftware/amazon/awssdk/services/sqs/SqsAsyncClient;Lmisk/aws2/sqs/jobqueue/QueueResolver;Lwisp/token/TokenGenerator;Lmisk/aws2/sqs/jobqueue/SqsMetrics;Lcom/squareup/moshi/Moshi;Lio/opentracing/Tracer;)V
public fun enqueue (Lmisk/jobqueue/QueueName;Ljava/lang/String;Ljava/lang/String;Ljava/time/Duration;Ljava/util/Map;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun enqueueAsync (Lmisk/jobqueue/QueueName;Ljava/lang/String;Ljava/lang/String;Ljava/time/Duration;Ljava/util/Map;)Ljava/util/concurrent/CompletableFuture;
public fun enqueueBlocking (Lmisk/jobqueue/QueueName;Ljava/lang/String;Ljava/lang/String;Ljava/time/Duration;Ljava/util/Map;)V
Expand Down Expand Up @@ -98,7 +99,7 @@ public final class misk/aws2/sqs/jobqueue/StaticDeadLetterQueueProvider : misk/a

public final class misk/aws2/sqs/jobqueue/Subscriber {
public static final field Companion Lmisk/aws2/sqs/jobqueue/Subscriber$Companion;
public fun <init> (Lmisk/jobqueue/QueueName;Lmisk/aws2/sqs/jobqueue/config/SqsQueueConfig;Lmisk/jobqueue/QueueName;Lmisk/jobqueue/v2/JobHandler;Lkotlinx/coroutines/channels/Channel;Lsoftware/amazon/awssdk/services/sqs/SqsAsyncClient;Lmisk/aws2/sqs/jobqueue/QueueResolver;Lmisk/aws2/sqs/jobqueue/SqsMetrics;Lcom/squareup/moshi/Moshi;Ljava/time/Clock;)V
public fun <init> (Lmisk/jobqueue/QueueName;Lmisk/aws2/sqs/jobqueue/config/SqsQueueConfig;Lmisk/jobqueue/QueueName;Lmisk/jobqueue/v2/JobHandler;Lkotlinx/coroutines/channels/Channel;Lsoftware/amazon/awssdk/services/sqs/SqsAsyncClient;Lmisk/aws2/sqs/jobqueue/QueueResolver;Lmisk/aws2/sqs/jobqueue/SqsMetrics;Lcom/squareup/moshi/Moshi;Ljava/time/Clock;Lio/opentracing/Tracer;)V
public final fun getChannel ()Lkotlinx/coroutines/channels/Channel;
public final fun getClient ()Lsoftware/amazon/awssdk/services/sqs/SqsAsyncClient;
public final fun getClock ()Ljava/time/Clock;
Expand All @@ -109,6 +110,7 @@ public final class misk/aws2/sqs/jobqueue/Subscriber {
public final fun getQueueName ()Lmisk/jobqueue/QueueName;
public final fun getQueueResolver ()Lmisk/aws2/sqs/jobqueue/QueueResolver;
public final fun getSqsMetrics ()Lmisk/aws2/sqs/jobqueue/SqsMetrics;
public final fun getTracer ()Lio/opentracing/Tracer;
public final fun poll (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public final fun run (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}
Expand Down
3 changes: 3 additions & 0 deletions misk-aws2-sqs/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ dependencies {
api(libs.kotlinxCoroutinesCore)
api(libs.jakartaInject)
api(libs.moshiCore)
api(libs.openTracing)
api(libs.prometheusClient)
api(project(":misk-aws"))
api(project(":misk-inject"))
Expand All @@ -24,6 +25,8 @@ dependencies {
implementation(libs.aws2Core)
implementation(libs.aws2Regions)
implementation(libs.loggingApi)
implementation(libs.openTracingDatadog)
implementation(libs.tracingDatadog)
implementation(project(":misk"))
implementation(project(":misk-api"))
implementation(project(":misk-core"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ class SqsJob(
}

companion object {
/** Message attribute used to store metadata specific to jobqueue functionality.
/**
* Message attribute used to store metadata specific to jobqueue functionality.
* JSON-encoded.
*/
const val JOBQUEUE_METADATA_ATTR = "_jobqueue-metadata"
Expand All @@ -45,5 +46,11 @@ class SqsJob(
* Client-assigned identifier, useful to detect duplicate messages.
*/
const val JOBQUEUE_METADATA_IDEMPOTENCE_KEY = "idempotence_key"

/**
* Name attribute used to pass the original trace id, so that we
* can track from enqueueing to handling.
*/
const val JOBQUEUE_METADATA_ORIGINAL_TRACE_ID = "original_trace_id"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package misk.aws2.sqs.jobqueue
import com.google.common.util.concurrent.AbstractService
import com.google.inject.Singleton
import com.squareup.moshi.Moshi
import io.opentracing.Tracer
import jakarta.inject.Inject
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
Expand Down Expand Up @@ -49,6 +50,7 @@ class SqsJobConsumer @Inject constructor(
private val dlqProvider: DeadLetterQueueProvider,
private val sqsMetrics: SqsMetrics,
private val clock: Clock,
private val tracer: Tracer,
) : JobConsumer, AbstractService() {
private val scope = CoroutineScope(Dispatchers.IO.limitedParallelism(1) + SupervisorJob())

Expand All @@ -74,6 +76,7 @@ class SqsJobConsumer @Inject constructor(
sqsMetrics = sqsMetrics,
moshi = moshi,
clock = clock,
tracer = tracer,
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@ package misk.aws2.sqs.jobqueue
import com.google.inject.Inject
import com.google.inject.Singleton
import com.squareup.moshi.Moshi
import ddtrot.dd.trace.core.DDSpan
import io.opentracing.Scope
import io.opentracing.Span
import io.opentracing.Tracer
import io.opentracing.tag.Tags
import kotlinx.coroutines.future.await
import misk.jobqueue.QueueName
import misk.jobqueue.sqs.parentQueue
Expand All @@ -22,6 +27,7 @@ class SqsJobEnqueuer @Inject constructor(
private val tokenGenerator: TokenGenerator,
private val sqsMetrics: SqsMetrics,
private val moshi: Moshi,
private val tracer: Tracer,
) : JobEnqueuer {
/**
* Enqueue the job and suspend waiting for the confirmation
Expand Down Expand Up @@ -73,44 +79,66 @@ class SqsJobEnqueuer @Inject constructor(
deliveryDelay: Duration?,
attributes: Map<String, String>,
): CompletableFuture<Boolean> {
val queueUrl = queueResolver.getQueueUrl(queueName)
val resolvedIdempotencyKey = idempotencyKey ?: tokenGenerator.generate()
return tracer.withSpanAsync("enqueue-job-${queueName.value}") { span, scope ->
val queueUrl = queueResolver.getQueueUrl(queueName)
val resolvedIdempotencyKey = idempotencyKey ?: tokenGenerator.generate()

val attrs = attributes.map {
it.key to MessageAttributeValue.builder().dataType("String").stringValue(it.value).build()
}.toMap().toMutableMap()
attrs[SqsJob.JOBQUEUE_METADATA_ATTR] = createMetadataMessageAttributeValue(queueName, resolvedIdempotencyKey)
val attrs = attributes.map {
it.key to MessageAttributeValue.builder().dataType("String").stringValue(it.value).build()
}.toMap().toMutableMap()
attrs[SqsJob.JOBQUEUE_METADATA_ATTR] =
createMetadataMessageAttributeValue(queueName, resolvedIdempotencyKey, span)

val request = SendMessageRequest.builder()
.queueUrl(queueUrl)
.messageBody(body)
.delaySeconds(deliveryDelay?.toSeconds()?.toInt())
.messageAttributes(attrs)
.build()
val request = SendMessageRequest.builder()
.queueUrl(queueUrl)
.messageBody(body)
.delaySeconds(deliveryDelay?.toSeconds()?.toInt())
.messageAttributes(attrs)
.build()

val timer = sqsMetrics.sqsSendTime.labels(queueName.value).startTimer()
try {
val response = client.sendMessage(request)
sqsMetrics.jobsEnqueued.labels(queueName.value).inc()
response.whenComplete { _, _ ->
timer.observeDuration()
span.finish()
scope.close()
}.thenCompose { CompletableFuture.supplyAsync { true } }
} catch (e: Exception) {
sqsMetrics.jobEnqueueFailures.labels(queueName.value).inc()
throw e
}
}
}

val timer = sqsMetrics.sqsSendTime.labels(queueName.value).startTimer()
return try {
val response = client.sendMessage(request)
sqsMetrics.jobsEnqueued.labels(queueName.value).inc()
response.whenComplete { _, _ ->
timer.observeDuration()
}.thenCompose { CompletableFuture.supplyAsync { true } }
} catch (e: Exception) {
sqsMetrics.jobEnqueueFailures.labels(queueName.value).inc()
throw e
private fun <T> Tracer.withSpanAsync(spanName: String, block: (span: Span, scope: Scope) -> T): T {
val span = tracer.buildSpan(spanName).start()
val scope = scopeManager().activate(span)
try {
return block(span, scope)
} catch (t: Throwable) {
Tags.ERROR.set(span, true)
throw t
}
}

private fun createMetadataMessageAttributeValue(
queueName: QueueName,
idempotencyKey: String,
span: Span,
): MessageAttributeValue {
// TODO old implementation passes the span id as well
val metadata = mutableMapOf(
SqsJob.JOBQUEUE_METADATA_ORIGIN_QUEUE to queueName.parentQueue.value,
SqsJob.JOBQUEUE_METADATA_IDEMPOTENCE_KEY to idempotencyKey,
)

// Preserve original trace id, if available.
(span as? DDSpan)?.let {
val traceId = it.context().traceId.toString()
metadata[SqsJob.JOBQUEUE_METADATA_ORIGINAL_TRACE_ID] = traceId
}

return MessageAttributeValue.builder()
.dataType("String")
.stringValue(moshi.adapter<Map<String, String>>().toJson(metadata))
Expand Down
69 changes: 47 additions & 22 deletions misk-aws2-sqs/src/main/kotlin/misk/aws2/sqs/jobqueue/Subscriber.kt
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package misk.aws2.sqs.jobqueue

import com.squareup.moshi.Moshi
import io.opentracing.Tracer
import io.opentracing.tag.Tags
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.merge
Expand Down Expand Up @@ -37,38 +39,61 @@ class Subscriber(
val sqsMetrics: SqsMetrics,
val moshi: Moshi,
val clock: Clock,
val tracer: Tracer,
) {
suspend fun run() {
while (true) {
val job = channel.receive()
val receiveFromChannelTimestamp = clock.instant().nano
sqsMetrics.channelReceiveLag.labels(queueName.value)
.observe((receiveFromChannelTimestamp - job.publishToChannelTimestamp).toDouble())
val result = try {
val timer = sqsMetrics.handlerDispatchTime.labels(queueName.value).startTimer()
val result = when (handler) {
is SuspendingJobHandler -> handler.handleJob(job)
is BlockingJobHandler -> runInterruptible {
handler.handleJob(job)
val job = tracer.withSpan("channel-receive-queue-${queueName.value}") {
channel.receive()
}
tracer.withSpan("process-queue-${queueName.value}") {
val receiveFromChannelTimestamp = clock.instant().nano
sqsMetrics.channelReceiveLag.labels(queueName.value)
.observe((receiveFromChannelTimestamp - job.publishToChannelTimestamp).toDouble())
val result = try {
val timer = sqsMetrics.handlerDispatchTime.labels(queueName.value).startTimer()
val result = tracer.withSpan("handle-queue-${queueName.value}") {
when (handler) {
is SuspendingJobHandler -> handler.handleJob(job)
is BlockingJobHandler -> runInterruptible {
handler.handleJob(job)
}
}
}
timer.observeDuration()
result
} catch (e: Exception) {
sqsMetrics.handlerFailures.labels(queueName.value).inc()
return@withSpan
}
timer.observeDuration()
result
} catch (e: Exception) {
sqsMetrics.handlerFailures.labels(queueName.value).inc()
continue
}
when (result) {
JobStatus.OK -> deleteMessage(job)
JobStatus.DEAD_LETTER -> {
deadLetterMessage(job)
deleteMessage(job)
when (result) {
JobStatus.OK -> deleteMessage(job)
JobStatus.DEAD_LETTER -> {
deadLetterMessage(job)
deleteMessage(job)
}

JobStatus.RETRY_LATER -> { /* no-op, will be retried after visibility timeout passes */
}
}
JobStatus.RETRY_LATER -> { /* no-op, will be retried after visibility timeout passes */ }
}
}
}

private suspend fun <T> Tracer.withSpan(spanName: String, block: suspend () -> T): T {
val span = tracer.buildSpan(spanName).start()
val scope = scopeManager().activate(span)
try {
return block()
} catch (t: Throwable) {
Tags.ERROR.set(span, true)
throw t
} finally {
scope.close()
span.finish()
}
}

private suspend fun deleteMessage(job: SqsJob) {
val timer = sqsMetrics.sqsDeleteTime.labels(queueName.value).startTimer()
client.deleteMessage(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package misk.aws2.sqs.jobqueue
import com.github.dockerjava.api.model.ExposedPort
import com.github.dockerjava.api.model.HostConfig
import com.github.dockerjava.api.model.Ports
import misk.aws2.sqs.jobqueue.DockerSqs.clientPort
import misk.testing.ExternalDependency
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package misk.aws2.sqs.jobqueue

import jakarta.inject.Inject
import misk.jobqueue.QueueName
import misk.testing.MiskExternalDependency
import misk.testing.MiskTest
import misk.testing.MiskTestModule
import org.junit.jupiter.api.Disabled
import org.junit.jupiter.api.Test
import software.amazon.awssdk.services.sqs.model.CreateQueueRequest
import software.amazon.awssdk.services.sqs.model.QueueAttributeName
import kotlin.test.assertEquals

@MiskTest(startService = true)
class SqsQueueResolverTest {
@MiskExternalDependency private val dockerSqs = DockerSqs
@MiskTestModule private val module = SqsJobQueueTestModule(dockerSqs)

@Inject
private lateinit var queueResolver: QueueResolver

@Test
@Disabled("Disabled test until docker authorization issues are resolvedc")
fun `caches queue URL`() {
val queueName = QueueName("test-queue")

DockerSqs.client.createQueue(
CreateQueueRequest.builder().queueName(queueName.value)
.attributes(
mapOf(
QueueAttributeName.RECEIVE_MESSAGE_WAIT_TIME_SECONDS to "20",
))
.build()
).join()

val result = queueResolver.getQueueUrl(queueName)
queueResolver.getQueueUrl(queueName)

assertEquals("test", result)
}
}

0 comments on commit 0013008

Please sign in to comment.