Skip to content

Conversation

@fernandobalieiro
Copy link
Contributor

Description

The goal of this PR is to ensure that when Content-Type header is present, it's properly propagated into the Content-Type field of the raw AMQP Message.

Motivation

SmallRye Reactive Messaging relies on the raw AQMP Content-Type to determine whether it should or not convert the payload into a io.vertx.core.json.JsonObject:

        if (body instanceof Data) {
            Binary bin = ((Data) body).getValue();
            byte[] bytes = new byte[bin.getLength()];
            System.arraycopy(bin.getArray(), bin.getArrayOffset(), bytes, 0, bin.getLength());

            if (APPLICATION_JSON.equalsIgnoreCase(msg.contentType())) {
                return Buffer.buffer(bytes).toJson();
            }
            return bytes;
        }

If the Content-Type application/json is not present, the conversion does not happen, and methods annotated with @Incoming with a JsonObject as parameter (such as the one below) break.

In a scenario where Azure Service Bus Producers are migrated to use Apache Camel, all the Consumers that are relying on SmallRye Reactive Messaging will break.

Example method:

    @Incoming("my-subscription")
    public Uni<Void> handleMessage(final JsonObject messageJson) {
        return Uni.createFrom().item(messageJson)
            .call(() -> handle(messageJson))
            .replaceWithVoid();
    }

Stack Trace when consuming the message:

2025-10-22 12:24:34.665 ERROR io.sm.re.me.pr.AbstractMediator:166 [vert.x-eventloop-thread-5-215] SRMSG00200: The method com.acme.MyEventHandler#handleMessage has thrown an exception: java.lang.ClassCastException: class [B cannot be cast to class io.vertx.core.json.JsonObject ([B is in module java.base of loader 'bootstrap'; io.vertx.core.json.JsonObject is in unnamed module of loader io.quarkus.bootstrap.classloading.QuarkusClassLoader @4f83df68)
        at com.acme.MyEventHandler_SmallRyeMessagingInvoker_handleMessage_23d540434dcfaa61d3f6031df6da3d9e581a58aa.invoke(Unknown Source)
        at io.smallrye.reactive.messaging.providers.AbstractMediator.invoke(AbstractMediator.java:164)
        at io.smallrye.reactive.messaging.providers.AbstractMediator.lambda$invokeOnMessageContext$13(AbstractMediator.java:172)
        at io.smallrye.reactive.messaging.providers.locals.LocalContextMetadata.lambda$invokeOnMessageContext$0(LocalContextMetadata.java:34)
        at io.smallrye.reactive.messaging.providers.locals.LocalContextMetadata.lambda$invokeOnMessageContext$1(LocalContextMetadata.java:53)
        at io.smallrye.reactive.messaging.providers.helpers.VertxContext.runOnContext(VertxContext.java:33)
        at io.smallrye.reactive.messaging.providers.locals.LocalContextMetadata.lambda$invokeOnMessageContext$2(LocalContextMetadata.java:51)
        at io.smallrye.context.impl.wrappers.SlowContextualConsumer.accept(SlowContextualConsumer.java:21)
        at io.smallrye.mutiny.operators.uni.builders.UniCreateWithEmitter.subscribe(UniCreateWithEmitter.java:22)
        at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:35)
        at io.smallrye.mutiny.operators.uni.UniOnItemTransformToUni.subscribe(UniOnItemTransformToUni.java:25)
        at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:35)
        at io.smallrye.mutiny.operators.uni.UniOnItemOrFailureFlatMap.subscribe(UniOnItemOrFailureFlatMap.java:27)
        at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:35)
        at io.smallrye.mutiny.converters.uni.UniToMultiPublisher$UniToMultiSubscription.request(UniToMultiPublisher.java:74)
        at io.smallrye.mutiny.operators.multi.MultiConcatMapOp$MainSubscriber.innerOnSubscribe(MultiConcatMapOp.java:109)
        at io.smallrye.mutiny.operators.multi.MultiConcatMapOp$MainSubscriber$InnerSubscriber.onSubscribe(MultiConcatMapOp.java:297)
        at io.smallrye.mutiny.converters.uni.UniToMultiPublisher.subscribe(UniToMultiPublisher.java:25)
        at io.smallrye.mutiny.groups.MultiCreate$1.subscribe(MultiCreate.java:165)
        at io.smallrye.mutiny.operators.multi.MultiConcatMapOp$MainSubscriber.onItem(MultiConcatMapOp.java:119)
        at io.smallrye.mutiny.operators.multi.MultiMapOp$MapProcessor.onItem(MultiMapOp.java:50)
        at io.smallrye.mutiny.operators.multi.MultiOnItemInvoke$MultiOnItemInvokeProcessor.onItem(MultiOnItemInvoke.java:41)
        at io.smallrye.mutiny.operators.multi.MultiOperatorProcessor.onItem(MultiOperatorProcessor.java:100)
        at io.smallrye.reactive.messaging.providers.locals.ContextOperator$ContextMulti$ContextProcessor.lambda$onItem$1(ContextOperator.java:71)
        at io.smallrye.reactive.messaging.providers.helpers.VertxContext.lambda$runOnContext$0(VertxContext.java:35)
        at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:270)
        at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:252)
        at io.vertx.core.impl.ContextInternal.lambda$runOnContext$0(ContextInternal.java:50)
        at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
        at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
        at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:569)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:998)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base/java.lang.Thread.run(Thread.java:1583)

Target

  • I checked that the commit is targeting the correct branch (Camel 4 uses the main branch)

Tracking

  • If this is a large change, bug fix, or code improvement, I checked there is a JIRA issue filed for the change (usually before you start working on it).

Apache Camel coding standards and style

  • I checked that each commit in the pull request has a meaningful subject line and body.
  • I have run mvn clean install -DskipTests locally from root folder and I have committed all auto-generated changes.

@github-actions
Copy link
Contributor

🌟 Thank you for your contribution to the Apache Camel project! 🌟

🤖 CI automation will test this PR automatically.

🐫 Apache Camel Committers, please review the following items:

  • First-time contributors require MANUAL approval for the GitHub Actions to run

  • You can use the command /component-test (camel-)component-name1 (camel-)component-name2.. to request a test from the test bot.

  • You can label PRs using build-all, build-dependents, skip-tests and test-dependents to fine-tune the checks executed by this PR.

  • Build and test logs are available in the Summary page. Only Apache Camel committers have access to the summary.

  • ⚠️ Be careful when sharing logs. Review their contents before sharing them publicly.

@davsclaus davsclaus force-pushed the feature/azure-servicebus-content-type-propagation branch from b7d8c05 to 0166d26 Compare October 22, 2025 14:54
@apupier
Copy link
Contributor

apupier commented Oct 23, 2025

/component-test azure

Result ✅ The tests passed successfully

@github-actions
Copy link
Contributor

🤖 The Apache Camel test robot will run the tests for you 👍

@davsclaus davsclaus merged commit 45d02cd into apache:main Oct 23, 2025
4 checks passed
@davsclaus
Copy link
Contributor

Thanks for the PR.

@fernandobalieiro
Copy link
Contributor Author

Thanks for the PR.

Thank you for the quick review, @davsclaus and @oscerd.
Do you have an ETA on when the new version with this would be released?

Regards.

@fernandobalieiro fernandobalieiro deleted the feature/azure-servicebus-content-type-propagation branch October 23, 2025 13:11
@davsclaus
Copy link
Contributor

Sometime next month

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants