Skip to content

Commit 02571f3

Browse files
committed
feat(webflux): enhance command message parsing with customizable header appender
- Introduce CommandRequestHeaderAppender interface for flexible header handling - Implement CommandRequestExtendHeaderAppender for extended headers - Update DefaultCommandMessageParser to use CommandRequestHeaderAppender - Add commandRequestExtendHeaderAppender bean in WebFluxAutoConfiguration - Modify commandMessageParser bean to accept CommandRequestHeaderAppender
1 parent 6d0e919 commit 02571f3

File tree

4 files changed

+70
-24
lines changed

4 files changed

+70
-24
lines changed

wow-spring-boot-starter/src/main/kotlin/me/ahoo/wow/spring/boot/starter/webflux/WebFluxAutoConfiguration.kt

+15-2
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ import me.ahoo.wow.webflux.route.bi.GenerateBIScriptHandlerFunctionFactory
3838
import me.ahoo.wow.webflux.route.command.CommandFacadeHandlerFunctionFactory
3939
import me.ahoo.wow.webflux.route.command.CommandHandlerFunctionFactory
4040
import me.ahoo.wow.webflux.route.command.CommandMessageParser
41+
import me.ahoo.wow.webflux.route.command.CommandRequestExtendHeaderAppender
42+
import me.ahoo.wow.webflux.route.command.CommandRequestHeaderAppender
4143
import me.ahoo.wow.webflux.route.command.DEFAULT_TIME_OUT
4244
import me.ahoo.wow.webflux.route.command.DefaultCommandMessageParser
4345
import me.ahoo.wow.webflux.route.event.CountEventStreamHandlerFunctionFactory
@@ -141,10 +143,21 @@ class WebFluxAutoConfiguration {
141143
return GlobalExceptionHandler
142144
}
143145

146+
@Bean
147+
fun commandRequestExtendHeaderAppender(): CommandRequestExtendHeaderAppender {
148+
return CommandRequestExtendHeaderAppender
149+
}
150+
144151
@Bean
145152
@ConditionalOnMissingBean
146-
fun commandMessageParser(commandMessageFactory: CommandMessageFactory): CommandMessageParser {
147-
return DefaultCommandMessageParser(commandMessageFactory)
153+
fun commandMessageParser(
154+
commandMessageFactory: CommandMessageFactory,
155+
commandRequestHeaderAppenderObjectProvider: ObjectProvider<CommandRequestHeaderAppender>
156+
): CommandMessageParser {
157+
return DefaultCommandMessageParser(
158+
commandMessageFactory = commandMessageFactory,
159+
commandRequestHeaderAppends = commandRequestHeaderAppenderObjectProvider.toList<CommandRequestHeaderAppender>()
160+
)
148161
}
149162

150163
@Bean

wow-webflux/src/main/kotlin/me/ahoo/wow/webflux/route/command/CommandMessageParser.kt

+11-20
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,13 @@ package me.ahoo.wow.webflux.route.command
1515

1616
import me.ahoo.wow.api.command.CommandMessage
1717
import me.ahoo.wow.command.CommandOperator.withOperator
18-
import me.ahoo.wow.command.factory.CommandBuilder
1918
import me.ahoo.wow.command.factory.CommandBuilder.Companion.commandBuilder
2019
import me.ahoo.wow.command.factory.CommandMessageFactory
2120
import me.ahoo.wow.infra.ifNotBlank
2221
import me.ahoo.wow.messaging.withLocalFirst
2322
import me.ahoo.wow.modeling.matedata.AggregateMetadata
24-
import me.ahoo.wow.openapi.command.CommandRequestHeaders
23+
import me.ahoo.wow.openapi.command.CommandRequestHeaders.AGGREGATE_VERSION
24+
import me.ahoo.wow.openapi.command.CommandRequestHeaders.REQUEST_ID
2525
import org.springframework.web.reactive.function.server.ServerRequest
2626
import reactor.core.publisher.Mono
2727

@@ -33,23 +33,28 @@ interface CommandMessageParser {
3333
): Mono<CommandMessage<Any>>
3434
}
3535

36-
class DefaultCommandMessageParser(private val commandMessageFactory: CommandMessageFactory) : CommandMessageParser {
36+
class DefaultCommandMessageParser(
37+
private val commandMessageFactory: CommandMessageFactory,
38+
private val commandRequestHeaderAppends: List<CommandRequestHeaderAppender> = listOf()
39+
) : CommandMessageParser {
3740
override fun parse(
3841
aggregateMetadata: AggregateMetadata<*, *>,
3942
commandBody: Any,
4043
request: ServerRequest
4144
): Mono<CommandMessage<Any>> {
4245
val aggregateId = request.getAggregateId()
4346
val tenantId = request.getTenantId(aggregateMetadata)
44-
val aggregateVersion = request.headers().firstHeader(CommandRequestHeaders.AGGREGATE_VERSION)?.toIntOrNull()
45-
val requestId = request.headers().firstHeader(CommandRequestHeaders.REQUEST_ID).ifNotBlank { it }
47+
val aggregateVersion = request.headers().firstHeader(AGGREGATE_VERSION)?.toIntOrNull()
48+
val requestId = request.headers().firstHeader(REQUEST_ID).ifNotBlank { it }
4649
val commandBuilder = commandBody.commandBuilder()
4750
.aggregateId(aggregateId)
4851
.tenantId(tenantId)
4952
.aggregateVersion(aggregateVersion)
5053
.requestId(requestId)
5154
.namedAggregate(aggregateMetadata.namedAggregate)
52-
injectExtensionHeaders(commandBuilder, request)
55+
commandRequestHeaderAppends.forEach {
56+
it.append(request, commandBuilder)
57+
}
5358
request.getLocalFirst()?.let {
5459
commandBuilder.header { header ->
5560
header.withLocalFirst(it)
@@ -63,18 +68,4 @@ class DefaultCommandMessageParser(private val commandMessageFactory: CommandMess
6368
commandMessageFactory.create<Any>(commandBuilder)
6469
)
6570
}
66-
67-
private fun injectExtensionHeaders(commandBuilder: CommandBuilder, request: ServerRequest) {
68-
val extendedHeaders = request.headers().asHttpHeaders()
69-
.filter { (key, _) -> key.startsWith(CommandRequestHeaders.COMMAND_HEADER_X_PREFIX) }
70-
.map { (key, value) ->
71-
key.substring(CommandRequestHeaders.COMMAND_HEADER_X_PREFIX.length) to value.firstOrNull<String>().orEmpty()
72-
}.toMap<String, String>()
73-
if (extendedHeaders.isEmpty()) {
74-
return
75-
}
76-
commandBuilder.header { header ->
77-
header.with(extendedHeaders)
78-
}
79-
}
8071
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Copyright [2021-present] [ahoo wang <[email protected]> (https://github.com/Ahoo-Wang)].
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
* Unless required by applicable law or agreed to in writing, software
8+
* distributed under the License is distributed on an "AS IS" BASIS,
9+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
* See the License for the specific language governing permissions and
11+
* limitations under the License.
12+
*/
13+
14+
package me.ahoo.wow.webflux.route.command
15+
16+
import me.ahoo.wow.command.factory.CommandBuilder
17+
import me.ahoo.wow.openapi.command.CommandRequestHeaders.COMMAND_HEADER_X_PREFIX
18+
import org.springframework.web.reactive.function.server.ServerRequest
19+
20+
interface CommandRequestHeaderAppender {
21+
fun append(request: ServerRequest, commandBuilder: CommandBuilder)
22+
}
23+
24+
object CommandRequestExtendHeaderAppender : CommandRequestHeaderAppender {
25+
override fun append(request: ServerRequest, commandBuilder: CommandBuilder) {
26+
val extendedHeaders = request.headers().asHttpHeaders()
27+
.filter { (key, _) -> key.startsWith(COMMAND_HEADER_X_PREFIX) }
28+
.map { (key, value) ->
29+
key.substring(COMMAND_HEADER_X_PREFIX.length) to value.firstOrNull<String>().orEmpty()
30+
}.toMap<String, String>()
31+
if (extendedHeaders.isEmpty()) {
32+
return
33+
}
34+
commandBuilder.header { header ->
35+
header.with(extendedHeaders)
36+
}
37+
}
38+
}

wow-webflux/src/test/kotlin/me/ahoo/wow/webflux/route/command/DefaultCommandMessageParserTest.kt

+6-2
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ class DefaultCommandMessageParserTest {
3535
every { principal() } returns Mono.empty()
3636
every { headers().firstHeader(CommandRequestHeaders.WAIT_STAGE) } returns CommandStage.SENT.toString()
3737
every { headers().firstHeader(CommandRequestHeaders.LOCAL_FIRST) } returns false.toString()
38-
every { headers().asHttpHeaders() } returns HttpHeaders()
3938
}
4039
val commandMessageParser =
4140
DefaultCommandMessageParser(SimpleCommandMessageFactory((SimpleCommandBuilderRewriterRegistry())))
@@ -76,7 +75,12 @@ class DefaultCommandMessageParserTest {
7675
)
7776
}
7877
val commandMessageParser =
79-
DefaultCommandMessageParser(SimpleCommandMessageFactory((SimpleCommandBuilderRewriterRegistry())))
78+
DefaultCommandMessageParser(
79+
SimpleCommandMessageFactory((SimpleCommandBuilderRewriterRegistry())),
80+
listOf(
81+
CommandRequestExtendHeaderAppender
82+
)
83+
)
8084
commandMessageParser.parse(
8185
aggregateMetadata = MOCK_AGGREGATE_METADATA,
8286
commandBody = MockCreateAggregate(

0 commit comments

Comments
 (0)