Skip to content

Commit

Permalink
feat(webflux): enhance command message parsing with customizable head…
Browse files Browse the repository at this point in the history
…er appender (#1110)

- Introduce CommandRequestHeaderAppender interface for flexible header handling
- Implement CommandRequestExtendHeaderAppender for extended headers
- Update DefaultCommandMessageParser to use CommandRequestHeaderAppender
- Add commandRequestExtendHeaderAppender bean in WebFluxAutoConfiguration
- Modify commandMessageParser bean to accept CommandRequestHeaderAppender
  • Loading branch information
Ahoo-Wang authored Jan 9, 2025
1 parent 6d0e919 commit 70d380a
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ import me.ahoo.wow.webflux.route.bi.GenerateBIScriptHandlerFunctionFactory
import me.ahoo.wow.webflux.route.command.CommandFacadeHandlerFunctionFactory
import me.ahoo.wow.webflux.route.command.CommandHandlerFunctionFactory
import me.ahoo.wow.webflux.route.command.CommandMessageParser
import me.ahoo.wow.webflux.route.command.CommandRequestExtendHeaderAppender
import me.ahoo.wow.webflux.route.command.CommandRequestHeaderAppender
import me.ahoo.wow.webflux.route.command.DEFAULT_TIME_OUT
import me.ahoo.wow.webflux.route.command.DefaultCommandMessageParser
import me.ahoo.wow.webflux.route.event.CountEventStreamHandlerFunctionFactory
Expand Down Expand Up @@ -141,10 +143,21 @@ class WebFluxAutoConfiguration {
return GlobalExceptionHandler
}

@Bean
fun commandRequestExtendHeaderAppender(): CommandRequestExtendHeaderAppender {
return CommandRequestExtendHeaderAppender
}

@Bean
@ConditionalOnMissingBean
fun commandMessageParser(commandMessageFactory: CommandMessageFactory): CommandMessageParser {
return DefaultCommandMessageParser(commandMessageFactory)
fun commandMessageParser(
commandMessageFactory: CommandMessageFactory,
commandRequestHeaderAppenderObjectProvider: ObjectProvider<CommandRequestHeaderAppender>
): CommandMessageParser {
return DefaultCommandMessageParser(
commandMessageFactory = commandMessageFactory,
commandRequestHeaderAppends = commandRequestHeaderAppenderObjectProvider.toList<CommandRequestHeaderAppender>()
)
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ package me.ahoo.wow.webflux.route.command

import me.ahoo.wow.api.command.CommandMessage
import me.ahoo.wow.command.CommandOperator.withOperator
import me.ahoo.wow.command.factory.CommandBuilder
import me.ahoo.wow.command.factory.CommandBuilder.Companion.commandBuilder
import me.ahoo.wow.command.factory.CommandMessageFactory
import me.ahoo.wow.infra.ifNotBlank
import me.ahoo.wow.messaging.withLocalFirst
import me.ahoo.wow.modeling.matedata.AggregateMetadata
import me.ahoo.wow.openapi.command.CommandRequestHeaders
import me.ahoo.wow.openapi.command.CommandRequestHeaders.AGGREGATE_VERSION
import me.ahoo.wow.openapi.command.CommandRequestHeaders.REQUEST_ID
import org.springframework.web.reactive.function.server.ServerRequest
import reactor.core.publisher.Mono

Expand All @@ -33,23 +33,28 @@ interface CommandMessageParser {
): Mono<CommandMessage<Any>>
}

class DefaultCommandMessageParser(private val commandMessageFactory: CommandMessageFactory) : CommandMessageParser {
class DefaultCommandMessageParser(
private val commandMessageFactory: CommandMessageFactory,
private val commandRequestHeaderAppends: List<CommandRequestHeaderAppender> = listOf()
) : CommandMessageParser {
override fun parse(
aggregateMetadata: AggregateMetadata<*, *>,
commandBody: Any,
request: ServerRequest
): Mono<CommandMessage<Any>> {
val aggregateId = request.getAggregateId()
val tenantId = request.getTenantId(aggregateMetadata)
val aggregateVersion = request.headers().firstHeader(CommandRequestHeaders.AGGREGATE_VERSION)?.toIntOrNull()
val requestId = request.headers().firstHeader(CommandRequestHeaders.REQUEST_ID).ifNotBlank { it }
val aggregateVersion = request.headers().firstHeader(AGGREGATE_VERSION)?.toIntOrNull()
val requestId = request.headers().firstHeader(REQUEST_ID).ifNotBlank { it }
val commandBuilder = commandBody.commandBuilder()
.aggregateId(aggregateId)
.tenantId(tenantId)
.aggregateVersion(aggregateVersion)
.requestId(requestId)
.namedAggregate(aggregateMetadata.namedAggregate)
injectExtensionHeaders(commandBuilder, request)
commandRequestHeaderAppends.forEach {
it.append(request, commandBuilder)
}
request.getLocalFirst()?.let {
commandBuilder.header { header ->
header.withLocalFirst(it)
Expand All @@ -63,18 +68,4 @@ class DefaultCommandMessageParser(private val commandMessageFactory: CommandMess
commandMessageFactory.create<Any>(commandBuilder)
)
}

private fun injectExtensionHeaders(commandBuilder: CommandBuilder, request: ServerRequest) {
val extendedHeaders = request.headers().asHttpHeaders()
.filter { (key, _) -> key.startsWith(CommandRequestHeaders.COMMAND_HEADER_X_PREFIX) }
.map { (key, value) ->
key.substring(CommandRequestHeaders.COMMAND_HEADER_X_PREFIX.length) to value.firstOrNull<String>().orEmpty()
}.toMap<String, String>()
if (extendedHeaders.isEmpty()) {
return
}
commandBuilder.header { header ->
header.with(extendedHeaders)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright [2021-present] [ahoo wang <[email protected]> (https://github.com/Ahoo-Wang)].
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package me.ahoo.wow.webflux.route.command

import me.ahoo.wow.command.factory.CommandBuilder
import me.ahoo.wow.openapi.command.CommandRequestHeaders.COMMAND_HEADER_X_PREFIX
import org.springframework.web.reactive.function.server.ServerRequest

interface CommandRequestHeaderAppender {
fun append(request: ServerRequest, commandBuilder: CommandBuilder)
}

object CommandRequestExtendHeaderAppender : CommandRequestHeaderAppender {
override fun append(request: ServerRequest, commandBuilder: CommandBuilder) {
val extendedHeaders = request.headers().asHttpHeaders()
.filter { (key, _) -> key.startsWith(COMMAND_HEADER_X_PREFIX) }
.map { (key, value) ->
key.substring(COMMAND_HEADER_X_PREFIX.length) to value.firstOrNull<String>().orEmpty()
}.toMap<String, String>()
if (extendedHeaders.isEmpty()) {
return
}
commandBuilder.header { header ->
header.with(extendedHeaders)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ class DefaultCommandMessageParserTest {
every { principal() } returns Mono.empty()
every { headers().firstHeader(CommandRequestHeaders.WAIT_STAGE) } returns CommandStage.SENT.toString()
every { headers().firstHeader(CommandRequestHeaders.LOCAL_FIRST) } returns false.toString()
every { headers().asHttpHeaders() } returns HttpHeaders()
}
val commandMessageParser =
DefaultCommandMessageParser(SimpleCommandMessageFactory((SimpleCommandBuilderRewriterRegistry())))
Expand Down Expand Up @@ -76,7 +75,12 @@ class DefaultCommandMessageParserTest {
)
}
val commandMessageParser =
DefaultCommandMessageParser(SimpleCommandMessageFactory((SimpleCommandBuilderRewriterRegistry())))
DefaultCommandMessageParser(
SimpleCommandMessageFactory((SimpleCommandBuilderRewriterRegistry())),
listOf(
CommandRequestExtendHeaderAppender
)
)
commandMessageParser.parse(
aggregateMetadata = MOCK_AGGREGATE_METADATA,
commandBody = MockCreateAggregate(
Expand Down

0 comments on commit 70d380a

Please sign in to comment.