Skip to content

Commit 484f998

Browse files
authored
refactor(webflux): introduce CommandMessageParser and remove CommandParser (#1108)
- Introduce new CommandMessageParser interface and DefaultCommandMessageParser implementation - Remove static methods from CommandParser object - Update related classes to use the new CommandMessageParser - Rename CommandParserTest to DefaultCommandMessageParserTest
1 parent fbd63fb commit 484f998

25 files changed

+144
-110
lines changed

wow-spring-boot-starter/src/main/kotlin/me/ahoo/wow/spring/boot/starter/webflux/WebFluxAutoConfiguration.kt

+13-4
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,9 @@ import me.ahoo.wow.webflux.route.RouterFunctionBuilder
3737
import me.ahoo.wow.webflux.route.bi.GenerateBIScriptHandlerFunctionFactory
3838
import me.ahoo.wow.webflux.route.command.CommandFacadeHandlerFunctionFactory
3939
import me.ahoo.wow.webflux.route.command.CommandHandlerFunctionFactory
40+
import me.ahoo.wow.webflux.route.command.CommandMessageParser
4041
import me.ahoo.wow.webflux.route.command.DEFAULT_TIME_OUT
42+
import me.ahoo.wow.webflux.route.command.DefaultCommandMessageParser
4143
import me.ahoo.wow.webflux.route.event.CountEventStreamHandlerFunctionFactory
4244
import me.ahoo.wow.webflux.route.event.EventCompensateHandlerFunctionFactory
4345
import me.ahoo.wow.webflux.route.event.ListQueryEventStreamHandlerFunctionFactory
@@ -139,6 +141,13 @@ class WebFluxAutoConfiguration {
139141
return GlobalExceptionHandler
140142
}
141143

144+
145+
@Bean
146+
@ConditionalOnWebfluxGlobalErrorEnabled
147+
fun commandMessageParser(commandMessageFactory: CommandMessageFactory,): CommandMessageParser {
148+
return DefaultCommandMessageParser(commandMessageFactory)
149+
}
150+
142151
@Bean
143152
@Order(Ordered.HIGHEST_PRECEDENCE)
144153
@ConditionalOnMissingBean(name = [COMMAND_WAIT_HANDLER_FUNCTION_FACTORY_BEAN_NAME])
@@ -153,12 +162,12 @@ class WebFluxAutoConfiguration {
153162
@ConditionalOnMissingBean(name = [COMMAND_FACADE_HANDLER_FUNCTION_FACTORY_BEAN_NAME])
154163
fun commandFacadeHandlerFunctionFactory(
155164
commandGateway: CommandGateway,
156-
commandMessageFactory: CommandMessageFactory,
165+
commandMessageParser: CommandMessageParser,
157166
exceptionHandler: RequestExceptionHandler,
158167
): CommandFacadeHandlerFunctionFactory {
159168
return CommandFacadeHandlerFunctionFactory(
160169
commandGateway = commandGateway,
161-
commandMessageFactory = commandMessageFactory,
170+
commandMessageParser = commandMessageParser,
162171
exceptionHandler = exceptionHandler
163172
)
164173
}
@@ -401,12 +410,12 @@ class WebFluxAutoConfiguration {
401410
@ConditionalOnMissingBean(name = [COMMAND_HANDLER_FUNCTION_FACTORY_BEAN_NAME])
402411
fun commandHandlerFunctionFactory(
403412
commandGateway: CommandGateway,
404-
commandMessageFactory: CommandMessageFactory,
413+
commandMessageParser: CommandMessageParser,
405414
exceptionHandler: RequestExceptionHandler,
406415
): CommandHandlerFunctionFactory {
407416
return CommandHandlerFunctionFactory(
408417
commandGateway = commandGateway,
409-
commandMessageFactory = commandMessageFactory,
418+
commandMessageParser = commandMessageParser,
410419
exceptionHandler = exceptionHandler,
411420
timeout = DEFAULT_TIME_OUT
412421
)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Copyright [2021-present] [ahoo wang <[email protected]> (https://github.com/Ahoo-Wang)].
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
* Unless required by applicable law or agreed to in writing, software
8+
* distributed under the License is distributed on an "AS IS" BASIS,
9+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
* See the License for the specific language governing permissions and
11+
* limitations under the License.
12+
*/
13+
14+
package me.ahoo.wow.webflux.route.command
15+
16+
import me.ahoo.wow.api.modeling.TenantId
17+
import me.ahoo.wow.command.wait.CommandStage
18+
import me.ahoo.wow.infra.ifNotBlank
19+
import me.ahoo.wow.modeling.matedata.AggregateMetadata
20+
import me.ahoo.wow.openapi.RoutePaths
21+
import me.ahoo.wow.openapi.command.CommandHeaders
22+
import me.ahoo.wow.serialization.MessageRecords
23+
import org.springframework.web.reactive.function.server.ServerRequest
24+
import java.time.Duration
25+
import java.util.*
26+
27+
fun ServerRequest.getTenantId(aggregateMetadata: AggregateMetadata<*, *>): String? {
28+
aggregateMetadata.staticTenantId.ifNotBlank<String> {
29+
return it
30+
}
31+
pathVariables()[MessageRecords.TENANT_ID].ifNotBlank<String> {
32+
return it
33+
}
34+
headers().firstHeader(CommandHeaders.TENANT_ID).ifNotBlank<String> {
35+
return it
36+
}
37+
return null
38+
}
39+
40+
fun ServerRequest.getTenantIdOrDefault(aggregateMetadata: AggregateMetadata<*, *>): String {
41+
return getTenantId(aggregateMetadata) ?: return TenantId.DEFAULT_TENANT_ID
42+
}
43+
44+
fun ServerRequest.getAggregateId(): String? {
45+
headers().firstHeader(CommandHeaders.AGGREGATE_ID).ifNotBlank<String> {
46+
return it
47+
}
48+
pathVariables()[RoutePaths.ID_KEY].ifNotBlank<String> {
49+
return it
50+
}
51+
return null
52+
}
53+
54+
fun ServerRequest.getLocalFirst(): Boolean? {
55+
headers().firstHeader(CommandHeaders.LOCAL_FIRST).ifNotBlank<String> {
56+
return it.toBoolean()
57+
}
58+
return null
59+
}
60+
61+
fun ServerRequest.getCommandStage(): CommandStage {
62+
return headers().firstHeader(CommandHeaders.WAIT_STAGE).ifNotBlank { stage ->
63+
CommandStage.valueOf(stage.uppercase(Locale.getDefault()))
64+
} ?: CommandStage.PROCESSED
65+
}
66+
67+
fun ServerRequest.getWaitContext(): String {
68+
return headers().firstHeader(CommandHeaders.WAIT_CONTEXT).orEmpty()
69+
}
70+
71+
fun ServerRequest.getWaitProcessor(): String {
72+
return headers().firstHeader(CommandHeaders.WAIT_PROCESSOR).orEmpty()
73+
}
74+
75+
fun ServerRequest.getWaitTimeout(default: Duration = DEFAULT_TIME_OUT): Duration {
76+
return headers().firstHeader(CommandHeaders.WAIT_TIME_OUT)?.toLongOrNull()?.let {
77+
Duration.ofMillis(it)
78+
} ?: default
79+
}

wow-webflux/src/main/kotlin/me/ahoo/wow/webflux/route/command/CommandFacadeHandlerFunction.kt

+4-5
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
package me.ahoo.wow.webflux.route.command
1515

1616
import me.ahoo.wow.command.CommandGateway
17-
import me.ahoo.wow.command.factory.CommandMessageFactory
1817
import me.ahoo.wow.openapi.command.CommandFacadeRouteSpec
1918
import me.ahoo.wow.webflux.exception.RequestExceptionHandler
2019
import me.ahoo.wow.webflux.exception.toServerResponse
@@ -33,14 +32,14 @@ import java.time.Duration
3332
*/
3433
class CommandFacadeHandlerFunction(
3534
private val commandGateway: CommandGateway,
36-
private val commandMessageFactory: CommandMessageFactory,
35+
private val commandMessageParser: CommandMessageParser,
3736
private val exceptionHandler: RequestExceptionHandler,
3837
private val timeout: Duration = DEFAULT_TIME_OUT
3938
) : HandlerFunction<ServerResponse> {
4039

4140
private val handler = CommandHandler(
4241
commandGateway = commandGateway,
43-
commandMessageFactory = commandMessageFactory,
42+
commandMessageParser = commandMessageParser,
4443
timeout = timeout
4544
)
4645

@@ -55,7 +54,7 @@ class CommandFacadeHandlerFunction(
5554

5655
class CommandFacadeHandlerFunctionFactory(
5756
private val commandGateway: CommandGateway,
58-
private val commandMessageFactory: CommandMessageFactory,
57+
private val commandMessageParser: CommandMessageParser,
5958
private val exceptionHandler: RequestExceptionHandler,
6059
private val timeout: Duration = DEFAULT_TIME_OUT
6160
) : RouteHandlerFunctionFactory<CommandFacadeRouteSpec> {
@@ -65,7 +64,7 @@ class CommandFacadeHandlerFunctionFactory(
6564
override fun create(spec: CommandFacadeRouteSpec): HandlerFunction<ServerResponse> {
6665
return CommandFacadeHandlerFunction(
6766
commandGateway = commandGateway,
68-
commandMessageFactory = commandMessageFactory,
67+
commandMessageParser = commandMessageParser,
6968
exceptionHandler = exceptionHandler,
7069
timeout = timeout
7170
)

wow-webflux/src/main/kotlin/me/ahoo/wow/webflux/route/command/CommandHandler.kt

+3-28
Original file line numberDiff line numberDiff line change
@@ -15,41 +15,16 @@ package me.ahoo.wow.webflux.route.command
1515

1616
import me.ahoo.wow.command.CommandGateway
1717
import me.ahoo.wow.command.CommandResult
18-
import me.ahoo.wow.command.factory.CommandMessageFactory
1918
import me.ahoo.wow.command.wait.CommandStage
2019
import me.ahoo.wow.command.wait.WaitingFor
21-
import me.ahoo.wow.infra.ifNotBlank
2220
import me.ahoo.wow.modeling.matedata.AggregateMetadata
23-
import me.ahoo.wow.openapi.command.CommandHeaders
24-
import me.ahoo.wow.webflux.route.command.CommandParser.parse
2521
import org.springframework.web.reactive.function.server.ServerRequest
2622
import reactor.core.publisher.Mono
2723
import java.time.Duration
28-
import java.util.*
29-
30-
fun ServerRequest.getCommandStage(): CommandStage {
31-
return headers().firstHeader(CommandHeaders.WAIT_STAGE).ifNotBlank { stage ->
32-
CommandStage.valueOf(stage.uppercase(Locale.getDefault()))
33-
} ?: CommandStage.PROCESSED
34-
}
35-
36-
fun ServerRequest.getWaitContext(): String {
37-
return headers().firstHeader(CommandHeaders.WAIT_CONTEXT).orEmpty()
38-
}
39-
40-
fun ServerRequest.getWaitProcessor(): String {
41-
return headers().firstHeader(CommandHeaders.WAIT_PROCESSOR).orEmpty()
42-
}
43-
44-
fun ServerRequest.getWaitTimeout(default: Duration = DEFAULT_TIME_OUT): Duration {
45-
return headers().firstHeader(CommandHeaders.WAIT_TIME_OUT)?.toLongOrNull()?.let {
46-
Duration.ofMillis(it)
47-
} ?: default
48-
}
4924

5025
class CommandHandler(
5126
private val commandGateway: CommandGateway,
52-
private val commandMessageFactory: CommandMessageFactory,
27+
private val commandMessageParser: CommandMessageParser,
5328
private val timeout: Duration = DEFAULT_TIME_OUT
5429
) {
5530

@@ -59,10 +34,10 @@ class CommandHandler(
5934
aggregateMetadata: AggregateMetadata<*, *>,
6035
): Mono<CommandResult> {
6136
val commandWaitTimeout = request.getWaitTimeout(timeout)
62-
return request.parse(
37+
return commandMessageParser.parse(
6338
aggregateMetadata = aggregateMetadata,
6439
commandBody = commandBody,
65-
commandMessageFactory = commandMessageFactory
40+
request = request
6641
).flatMap {
6742
val stage: CommandStage = request.getCommandStage()
6843
if (CommandStage.SENT == stage) {

wow-webflux/src/main/kotlin/me/ahoo/wow/webflux/route/command/CommandHandlerFunction.kt

+4-5
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
package me.ahoo.wow.webflux.route.command
1515

1616
import me.ahoo.wow.command.CommandGateway
17-
import me.ahoo.wow.command.factory.CommandMessageFactory
1817
import me.ahoo.wow.modeling.matedata.AggregateMetadata
1918
import me.ahoo.wow.openapi.command.CommandRouteSpec
2019
import me.ahoo.wow.openapi.route.CommandRouteMetadata
@@ -40,12 +39,12 @@ class CommandHandlerFunction(
4039
private val aggregateMetadata: AggregateMetadata<*, *>,
4140
private val commandRouteMetadata: CommandRouteMetadata<out Any>,
4241
private val commandGateway: CommandGateway,
43-
private val commandMessageFactory: CommandMessageFactory,
42+
private val commandMessageParser: CommandMessageParser,
4443
private val exceptionHandler: RequestExceptionHandler,
4544
private val timeout: Duration = DEFAULT_TIME_OUT
4645
) : HandlerFunction<ServerResponse> {
4746
private val bodyExtractor = CommandBodyExtractor(commandRouteMetadata)
48-
private val handler = CommandHandler(commandGateway, commandMessageFactory, timeout)
47+
private val handler = CommandHandler(commandGateway, commandMessageParser, timeout)
4948
override fun handle(request: ServerRequest): Mono<ServerResponse> {
5049
return if (commandRouteMetadata.pathVariableMetadata.isEmpty() && commandRouteMetadata.headerVariableMetadata.isEmpty()) {
5150
request.bodyToMono(commandRouteMetadata.commandMetadata.commandType)
@@ -64,7 +63,7 @@ class CommandHandlerFunction(
6463

6564
class CommandHandlerFunctionFactory(
6665
private val commandGateway: CommandGateway,
67-
private val commandMessageFactory: CommandMessageFactory,
66+
private val commandMessageParser: CommandMessageParser,
6867
private val exceptionHandler: RequestExceptionHandler,
6968
private val timeout: Duration = DEFAULT_TIME_OUT
7069
) : RouteHandlerFunctionFactory<CommandRouteSpec> {
@@ -77,7 +76,7 @@ class CommandHandlerFunctionFactory(
7776
aggregateMetadata = spec.aggregateMetadata,
7877
commandRouteMetadata = spec.commandRouteMetadata as CommandRouteMetadata<Any>,
7978
commandGateway = commandGateway,
80-
commandMessageFactory = commandMessageFactory,
79+
commandMessageParser = commandMessageParser,
8180
exceptionHandler = exceptionHandler,
8281
timeout = timeout
8382
)

wow-webflux/src/main/kotlin/me/ahoo/wow/webflux/route/command/CommandParser.kt wow-webflux/src/main/kotlin/me/ahoo/wow/webflux/route/command/CommandMessageParser.kt

+16-45
Original file line numberDiff line numberDiff line change
@@ -14,75 +14,46 @@
1414
package me.ahoo.wow.webflux.route.command
1515

1616
import me.ahoo.wow.api.command.CommandMessage
17-
import me.ahoo.wow.api.modeling.TenantId
1817
import me.ahoo.wow.command.CommandOperator.withOperator
1918
import me.ahoo.wow.command.factory.CommandBuilder.Companion.commandBuilder
2019
import me.ahoo.wow.command.factory.CommandMessageFactory
2120
import me.ahoo.wow.infra.ifNotBlank
2221
import me.ahoo.wow.messaging.withLocalFirst
2322
import me.ahoo.wow.modeling.matedata.AggregateMetadata
24-
import me.ahoo.wow.openapi.RoutePaths
2523
import me.ahoo.wow.openapi.command.CommandHeaders
26-
import me.ahoo.wow.serialization.MessageRecords
2724
import org.springframework.web.reactive.function.server.ServerRequest
2825
import reactor.core.publisher.Mono
2926

30-
object CommandParser {
31-
fun ServerRequest.getTenantId(aggregateMetadata: AggregateMetadata<*, *>): String? {
32-
aggregateMetadata.staticTenantId.ifNotBlank<String> {
33-
return it
34-
}
35-
pathVariables()[MessageRecords.TENANT_ID].ifNotBlank<String> {
36-
return it
37-
}
38-
headers().firstHeader(CommandHeaders.TENANT_ID).ifNotBlank<String> {
39-
return it
40-
}
41-
return null
42-
}
43-
44-
fun ServerRequest.getTenantIdOrDefault(aggregateMetadata: AggregateMetadata<*, *>): String {
45-
return getTenantId(aggregateMetadata) ?: return TenantId.DEFAULT_TENANT_ID
46-
}
47-
48-
fun ServerRequest.getAggregateId(): String? {
49-
headers().firstHeader(CommandHeaders.AGGREGATE_ID).ifNotBlank<String> {
50-
return it
51-
}
52-
pathVariables()[RoutePaths.ID_KEY].ifNotBlank<String> {
53-
return it
54-
}
55-
return null
56-
}
57-
58-
fun ServerRequest.getLocalFirst(): Boolean? {
59-
headers().firstHeader(CommandHeaders.LOCAL_FIRST).ifNotBlank<String> {
60-
return it.toBoolean()
61-
}
62-
return null
63-
}
27+
interface CommandMessageParser {
28+
fun parse(
29+
aggregateMetadata: AggregateMetadata<*, *>,
30+
commandBody: Any,
31+
request: ServerRequest
32+
): Mono<CommandMessage<Any>>
33+
}
6434

65-
fun ServerRequest.parse(
35+
class DefaultCommandMessageParser(private val commandMessageFactory: CommandMessageFactory) : CommandMessageParser {
36+
override fun parse(
6637
aggregateMetadata: AggregateMetadata<*, *>,
6738
commandBody: Any,
68-
commandMessageFactory: CommandMessageFactory
39+
request: ServerRequest
6940
): Mono<CommandMessage<Any>> {
70-
val aggregateId = getAggregateId()
71-
val tenantId = getTenantId(aggregateMetadata)
72-
val aggregateVersion = headers().firstHeader(CommandHeaders.AGGREGATE_VERSION)?.toIntOrNull()
73-
val requestId = headers().firstHeader(CommandHeaders.REQUEST_ID).ifNotBlank { it }
41+
val aggregateId = request.getAggregateId()
42+
val tenantId = request.getTenantId(aggregateMetadata)
43+
val aggregateVersion = request.headers().firstHeader(CommandHeaders.AGGREGATE_VERSION)?.toIntOrNull()
44+
val requestId = request.headers().firstHeader(CommandHeaders.REQUEST_ID).ifNotBlank { it }
7445
val commandBuilder = commandBody.commandBuilder()
7546
.aggregateId(aggregateId)
7647
.tenantId(tenantId)
7748
.aggregateVersion(aggregateVersion)
7849
.requestId(requestId)
7950
.namedAggregate(aggregateMetadata.namedAggregate)
80-
getLocalFirst()?.let {
51+
request.getLocalFirst()?.let {
8152
commandBuilder.header { header ->
8253
header.withLocalFirst(it)
8354
}
8455
}
85-
return principal().map {
56+
return request.principal().map {
8657
commandBuilder.header { header ->
8758
header.withOperator(it.name)
8859
}

wow-webflux/src/main/kotlin/me/ahoo/wow/webflux/route/event/EventCompensateHandlerFunction.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import me.ahoo.wow.serialization.MessageRecords
2323
import me.ahoo.wow.webflux.exception.RequestExceptionHandler
2424
import me.ahoo.wow.webflux.exception.toServerResponse
2525
import me.ahoo.wow.webflux.route.RouteHandlerFunctionFactory
26-
import me.ahoo.wow.webflux.route.command.CommandParser.getTenantIdOrDefault
26+
import me.ahoo.wow.webflux.route.command.getTenantIdOrDefault
2727
import org.springframework.web.reactive.function.server.HandlerFunction
2828
import org.springframework.web.reactive.function.server.ServerRequest
2929
import org.springframework.web.reactive.function.server.ServerResponse

wow-webflux/src/main/kotlin/me/ahoo/wow/webflux/route/event/LoadEventStreamHandlerFunction.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import me.ahoo.wow.serialization.MessageRecords
2222
import me.ahoo.wow.webflux.exception.RequestExceptionHandler
2323
import me.ahoo.wow.webflux.exception.toServerResponse
2424
import me.ahoo.wow.webflux.route.RouteHandlerFunctionFactory
25-
import me.ahoo.wow.webflux.route.command.CommandParser.getTenantIdOrDefault
25+
import me.ahoo.wow.webflux.route.command.getTenantIdOrDefault
2626
import org.springframework.web.reactive.function.server.HandlerFunction
2727
import org.springframework.web.reactive.function.server.ServerRequest
2828
import org.springframework.web.reactive.function.server.ServerResponse

wow-webflux/src/main/kotlin/me/ahoo/wow/webflux/route/query/CountQueryHandlerFunction.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import me.ahoo.wow.query.filter.QueryHandler
2121
import me.ahoo.wow.webflux.exception.RequestExceptionHandler
2222
import me.ahoo.wow.webflux.exception.toServerResponse
2323
import me.ahoo.wow.webflux.route.RouteHandlerFunctionFactory
24-
import me.ahoo.wow.webflux.route.command.CommandParser.getTenantId
24+
import me.ahoo.wow.webflux.route.command.getTenantId
2525
import org.springframework.web.reactive.function.server.HandlerFunction
2626
import org.springframework.web.reactive.function.server.ServerRequest
2727
import org.springframework.web.reactive.function.server.ServerResponse

0 commit comments

Comments
 (0)