Skip to content

Commit

Permalink
feat(command): Support inject request extension headers
Browse files Browse the repository at this point in the history
- Rename CommandHeaders to CommandRequestHeaders to better reflect its usage
- Update related imports and usages across multiple files
- Add test case for injecting extension headers in CommandMessageParser
  • Loading branch information
Ahoo-Wang committed Jan 9, 2025
1 parent 484f998 commit f2f22b1
Show file tree
Hide file tree
Showing 25 changed files with 248 additions and 178 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import me.ahoo.coapi.api.CoApi
import me.ahoo.wow.apiclient.command.RestCommandGateway.Companion.toException
import me.ahoo.wow.command.CommandResult
import me.ahoo.wow.command.wait.CommandStage
import me.ahoo.wow.openapi.command.CommandHeaders
import me.ahoo.wow.openapi.command.CommandRequestHeaders
import org.springframework.http.ResponseEntity
import org.springframework.web.bind.annotation.RequestBody
import org.springframework.web.bind.annotation.RequestHeader
Expand All @@ -32,31 +32,31 @@ interface ReactiveRestCommandGateway : RestCommandGateway<Mono<ResponseEntity<Co
@PostExchange
override fun send(
sendUri: URI,
@RequestHeader(CommandHeaders.COMMAND_TYPE, required = false)
@RequestHeader(CommandRequestHeaders.COMMAND_TYPE, required = false)
commandType: String,
@RequestBody
command: Any,
@RequestHeader(CommandHeaders.WAIT_STAGE, required = false)
@RequestHeader(CommandRequestHeaders.WAIT_STAGE, required = false)
waitStage: CommandStage,
@RequestHeader(CommandHeaders.WAIT_CONTEXT, required = false)
@RequestHeader(CommandRequestHeaders.WAIT_CONTEXT, required = false)
waitContext: String?,
@RequestHeader(CommandHeaders.WAIT_PROCESSOR, required = false)
@RequestHeader(CommandRequestHeaders.WAIT_PROCESSOR, required = false)
waitProcessor: String?,
@RequestHeader(CommandHeaders.WAIT_TIME_OUT, required = false)
@RequestHeader(CommandRequestHeaders.WAIT_TIME_OUT, required = false)
waitTimeout: Long?,
@RequestHeader(CommandHeaders.TENANT_ID, required = false)
@RequestHeader(CommandRequestHeaders.TENANT_ID, required = false)
tenantId: String?,
@RequestHeader(CommandHeaders.AGGREGATE_ID, required = false)
@RequestHeader(CommandRequestHeaders.AGGREGATE_ID, required = false)
aggregateId: String?,
@RequestHeader(CommandHeaders.AGGREGATE_VERSION, required = false)
@RequestHeader(CommandRequestHeaders.AGGREGATE_VERSION, required = false)
aggregateVersion: Int?,
@RequestHeader(CommandHeaders.REQUEST_ID, required = false)
@RequestHeader(CommandRequestHeaders.REQUEST_ID, required = false)
requestId: String?,
@RequestHeader(CommandHeaders.LOCAL_FIRST, required = false)
@RequestHeader(CommandRequestHeaders.LOCAL_FIRST, required = false)
localFirst: Boolean?,
@RequestHeader(CommandHeaders.COMMAND_AGGREGATE_CONTEXT, required = false)
@RequestHeader(CommandRequestHeaders.COMMAND_AGGREGATE_CONTEXT, required = false)
context: String?,
@RequestHeader(CommandHeaders.COMMAND_AGGREGATE_NAME, required = false)
@RequestHeader(CommandRequestHeaders.COMMAND_AGGREGATE_NAME, required = false)
aggregate: String?
): Mono<ResponseEntity<CommandResult>>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import me.ahoo.wow.api.exception.DefaultErrorInfo
import me.ahoo.wow.command.CommandResult
import me.ahoo.wow.command.CommandResultException
import me.ahoo.wow.command.wait.CommandStage
import me.ahoo.wow.openapi.command.CommandHeaders
import me.ahoo.wow.openapi.command.CommandRequestHeaders
import me.ahoo.wow.serialization.toObject
import org.springframework.web.bind.annotation.RequestBody
import org.springframework.web.bind.annotation.RequestHeader
Expand All @@ -30,31 +30,31 @@ interface RestCommandGateway<RW, RB> {
@PostExchange
fun send(
sendUri: URI,
@RequestHeader(CommandHeaders.COMMAND_TYPE, required = false)
@RequestHeader(CommandRequestHeaders.COMMAND_TYPE, required = false)
commandType: String,
@RequestBody
command: Any,
@RequestHeader(CommandHeaders.WAIT_STAGE, required = false)
@RequestHeader(CommandRequestHeaders.WAIT_STAGE, required = false)
waitStage: CommandStage = CommandStage.PROCESSED,
@RequestHeader(CommandHeaders.WAIT_CONTEXT, required = false)
@RequestHeader(CommandRequestHeaders.WAIT_CONTEXT, required = false)
waitContext: String? = null,
@RequestHeader(CommandHeaders.WAIT_PROCESSOR, required = false)
@RequestHeader(CommandRequestHeaders.WAIT_PROCESSOR, required = false)
waitProcessor: String? = null,
@RequestHeader(CommandHeaders.WAIT_TIME_OUT, required = false)
@RequestHeader(CommandRequestHeaders.WAIT_TIME_OUT, required = false)
waitTimeout: Long? = null,
@RequestHeader(CommandHeaders.TENANT_ID, required = false)
@RequestHeader(CommandRequestHeaders.TENANT_ID, required = false)
tenantId: String? = null,
@RequestHeader(CommandHeaders.AGGREGATE_ID, required = false)
@RequestHeader(CommandRequestHeaders.AGGREGATE_ID, required = false)
aggregateId: String? = null,
@RequestHeader(CommandHeaders.AGGREGATE_VERSION, required = false)
@RequestHeader(CommandRequestHeaders.AGGREGATE_VERSION, required = false)
aggregateVersion: Int? = null,
@RequestHeader(CommandHeaders.REQUEST_ID, required = false)
@RequestHeader(CommandRequestHeaders.REQUEST_ID, required = false)
requestId: String? = null,
@RequestHeader(CommandHeaders.LOCAL_FIRST, required = false)
@RequestHeader(CommandRequestHeaders.LOCAL_FIRST, required = false)
localFirst: Boolean? = null,
@RequestHeader(CommandHeaders.COMMAND_AGGREGATE_CONTEXT, required = false)
@RequestHeader(CommandRequestHeaders.COMMAND_AGGREGATE_CONTEXT, required = false)
context: String? = null,
@RequestHeader(CommandHeaders.COMMAND_AGGREGATE_NAME, required = false)
@RequestHeader(CommandRequestHeaders.COMMAND_AGGREGATE_NAME, required = false)
aggregate: String? = null
): RW

Expand Down Expand Up @@ -94,7 +94,7 @@ interface RestCommandGateway<RW, RB> {
}

fun WebClientResponseException.toException(request: CommandRequest): RestCommandGatewayException {
val errorCode = this.headers.getFirst(CommandHeaders.WOW_ERROR_CODE).orEmpty()
val errorCode = this.headers.getFirst(CommandRequestHeaders.WOW_ERROR_CODE).orEmpty()
val responseBody = this.responseBodyAsString
if (responseBody.isBlank()) {
return RestCommandGatewayException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import me.ahoo.coapi.api.CoApi
import me.ahoo.wow.apiclient.command.RestCommandGateway.Companion.toException
import me.ahoo.wow.command.CommandResult
import me.ahoo.wow.command.wait.CommandStage
import me.ahoo.wow.openapi.command.CommandHeaders
import me.ahoo.wow.openapi.command.CommandRequestHeaders
import org.springframework.http.ResponseEntity
import org.springframework.web.bind.annotation.RequestBody
import org.springframework.web.bind.annotation.RequestHeader
Expand All @@ -30,31 +30,31 @@ interface SyncRestCommandGateway : RestCommandGateway<ResponseEntity<CommandResu
@PostExchange
override fun send(
sendUri: URI,
@RequestHeader(CommandHeaders.COMMAND_TYPE, required = false)
@RequestHeader(CommandRequestHeaders.COMMAND_TYPE, required = false)
commandType: String,
@RequestBody
command: Any,
@RequestHeader(CommandHeaders.WAIT_STAGE, required = false)
@RequestHeader(CommandRequestHeaders.WAIT_STAGE, required = false)
waitStage: CommandStage,
@RequestHeader(CommandHeaders.WAIT_CONTEXT, required = false)
@RequestHeader(CommandRequestHeaders.WAIT_CONTEXT, required = false)
waitContext: String?,
@RequestHeader(CommandHeaders.WAIT_PROCESSOR, required = false)
@RequestHeader(CommandRequestHeaders.WAIT_PROCESSOR, required = false)
waitProcessor: String?,
@RequestHeader(CommandHeaders.WAIT_TIME_OUT, required = false)
@RequestHeader(CommandRequestHeaders.WAIT_TIME_OUT, required = false)
waitTimeout: Long?,
@RequestHeader(CommandHeaders.TENANT_ID, required = false)
@RequestHeader(CommandRequestHeaders.TENANT_ID, required = false)
tenantId: String?,
@RequestHeader(CommandHeaders.AGGREGATE_ID, required = false)
@RequestHeader(CommandRequestHeaders.AGGREGATE_ID, required = false)
aggregateId: String?,
@RequestHeader(CommandHeaders.AGGREGATE_VERSION, required = false)
@RequestHeader(CommandRequestHeaders.AGGREGATE_VERSION, required = false)
aggregateVersion: Int?,
@RequestHeader(CommandHeaders.REQUEST_ID, required = false)
@RequestHeader(CommandRequestHeaders.REQUEST_ID, required = false)
requestId: String?,
@RequestHeader(CommandHeaders.LOCAL_FIRST, required = false)
@RequestHeader(CommandRequestHeaders.LOCAL_FIRST, required = false)
localFirst: Boolean?,
@RequestHeader(CommandHeaders.COMMAND_AGGREGATE_CONTEXT, required = false)
@RequestHeader(CommandRequestHeaders.COMMAND_AGGREGATE_CONTEXT, required = false)
context: String?,
@RequestHeader(CommandHeaders.COMMAND_AGGREGATE_NAME, required = false)
@RequestHeader(CommandRequestHeaders.COMMAND_AGGREGATE_NAME, required = false)
aggregate: String?
): ResponseEntity<CommandResult>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import me.ahoo.wow.openapi.HeaderRef.Companion.ERROR_CODE_HEADER
import me.ahoo.wow.openapi.SchemaRef.Companion.toArraySchema
import me.ahoo.wow.openapi.SchemaRef.Companion.toRefSchema
import me.ahoo.wow.openapi.SchemaRef.Companion.toSchemaName
import me.ahoo.wow.openapi.command.CommandHeaders
import me.ahoo.wow.openapi.command.CommandRequestHeaders

interface ComponentRef<C> {
val name: String
Expand Down Expand Up @@ -185,7 +185,7 @@ class HeaderRef(
companion object {
const val COMPONENTS_HEADERS_REF: String = COMPONENTS_REF + "headers/"
val ERROR_CODE_HEADER = HeaderRef(
name = CommandHeaders.WOW_ERROR_CODE,
name = CommandRequestHeaders.WOW_ERROR_CODE,
component = Header()
.content(StringSchema().toContent())
.description("Error Code"),
Expand Down Expand Up @@ -289,7 +289,7 @@ class ResponseRef(override val name: String, override val component: ApiResponse
fun Content.toResponse(description: String = ErrorInfo.SUCCEEDED): ApiResponse {
return ApiResponse()
.addHeaderObject(CommandHeaders.WOW_ERROR_CODE, ERROR_CODE_HEADER.ref)
.addHeaderObject(CommandRequestHeaders.WOW_ERROR_CODE, ERROR_CODE_HEADER.ref)
.description(description)
.content(this)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,18 +91,18 @@ class CommandFacadeRouteSpec(
class CommandFacadeRouteSpecFactory : GlobalRouteSpecFactory {
companion object {
val COMMAND_TYPE_PARAMETER: Parameter = Parameter()
.name(CommandHeaders.COMMAND_TYPE)
.name(CommandRequestHeaders.COMMAND_TYPE)
.required(true)
.`in`(ParameterIn.HEADER.toString())
.schema(StringSchema())
.description("Command Body Class fully qualified name")
val COMMAND_AGGREGATE_CONTEXT_PARAMETER: Parameter = Parameter()
.name(CommandHeaders.COMMAND_AGGREGATE_CONTEXT)
.name(CommandRequestHeaders.COMMAND_AGGREGATE_CONTEXT)
.`in`(ParameterIn.HEADER.toString())
.schema(StringSchema())
.description("Command Aggregate Context")
val COMMAND_AGGREGATE_NAME_PARAMETER: Parameter = Parameter()
.name(CommandHeaders.COMMAND_AGGREGATE_NAME)
.name(CommandRequestHeaders.COMMAND_AGGREGATE_NAME)
.`in`(ParameterIn.HEADER.toString())
.schema(StringSchema())
.description("Command Aggregate Name")
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright [2021-present] [ahoo wang <[email protected]> (https://github.com/Ahoo-Wang)].
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package me.ahoo.wow.openapi.command

object CommandRequestHeaders {

const val COMMAND_HEADERS_PREFIX = "Command-"
const val WAIT_CONTEXT = "${COMMAND_HEADERS_PREFIX}Wait-Context"
const val TENANT_ID = "${COMMAND_HEADERS_PREFIX}Tenant-Id"
const val AGGREGATE_ID = "${COMMAND_HEADERS_PREFIX}Aggregate-Id"
const val AGGREGATE_VERSION = "${COMMAND_HEADERS_PREFIX}Aggregate-Version"
const val WAIT_STAGE = "${COMMAND_HEADERS_PREFIX}Wait-Stage"
const val WAIT_TIME_OUT = "${COMMAND_HEADERS_PREFIX}Wait-Timout"

const val WAIT_PROCESSOR = "${COMMAND_HEADERS_PREFIX}Wait-Processor"
const val REQUEST_ID = "${COMMAND_HEADERS_PREFIX}Request-Id"
const val LOCAL_FIRST = "${COMMAND_HEADERS_PREFIX}Local-First"

const val COMMAND_AGGREGATE_CONTEXT = "${COMMAND_HEADERS_PREFIX}Aggregate-Context"
const val COMMAND_AGGREGATE_NAME = "${COMMAND_HEADERS_PREFIX}Aggregate-Name"
const val COMMAND_TYPE = "${COMMAND_HEADERS_PREFIX}Type"

const val COMMAND_HEADER_X_PREFIX = "${COMMAND_HEADERS_PREFIX}Header-"
const val WOW_ERROR_CODE = "Wow-Error-Code"
}
Original file line number Diff line number Diff line change
Expand Up @@ -179,56 +179,56 @@ class CommandRouteSpecFactory : AbstractAggregateRouteSpecFactory() {
companion object {
val COMMAND_STAGE_SCHEMA = CommandStage::class.java.toSchemaRef(CommandStage.PROCESSED.name)
val WAIT_STAGE_PARAMETER = Parameter()
.name(CommandHeaders.WAIT_STAGE)
.name(CommandRequestHeaders.WAIT_STAGE)
.`in`(ParameterIn.HEADER.toString())
.schema(COMMAND_STAGE_SCHEMA.ref).let {
ParameterRef("${Wow.WOW_PREFIX}${it.name}", it)
}
val WAIT_CONTEXT_PARAMETER = Parameter()
.name(CommandHeaders.WAIT_CONTEXT)
.name(CommandRequestHeaders.WAIT_CONTEXT)
.`in`(ParameterIn.HEADER.toString())
.schema(StringSchema()).let {
ParameterRef("${Wow.WOW_PREFIX}${it.name}", it)
}
val WAIT_PROCESSOR_PARAMETER = Parameter()
.name(CommandHeaders.WAIT_PROCESSOR)
.name(CommandRequestHeaders.WAIT_PROCESSOR)
.`in`(ParameterIn.HEADER.toString())
.schema(StringSchema()).let {
ParameterRef("${Wow.WOW_PREFIX}${it.name}", it)
}
val WAIT_TIME_OUT_PARAMETER = Parameter()
.name(CommandHeaders.WAIT_TIME_OUT)
.name(CommandRequestHeaders.WAIT_TIME_OUT)
.`in`(ParameterIn.HEADER.toString())
.schema(IntegerSchema())
.description("Unit: millisecond").let {
ParameterRef("${Wow.WOW_PREFIX}${it.name}", it)
}
val TENANT_ID_PARAMETER = Parameter()
.name(CommandHeaders.TENANT_ID)
.name(CommandRequestHeaders.TENANT_ID)
.`in`(ParameterIn.HEADER.toString())
.schema(StringSchema()).let {
ParameterRef("${Wow.WOW_PREFIX}${it.name}", it)
}
val AGGREGATE_ID_PARAMETER = Parameter()
.name(CommandHeaders.AGGREGATE_ID)
.name(CommandRequestHeaders.AGGREGATE_ID)
.`in`(ParameterIn.HEADER.toString())
.schema(StringSchema()).let {
ParameterRef("${Wow.WOW_PREFIX}${it.name}", it)
}
val AGGREGATE_VERSION_PARAMETER = Parameter()
.name(CommandHeaders.AGGREGATE_VERSION)
.name(CommandRequestHeaders.AGGREGATE_VERSION)
.`in`(ParameterIn.HEADER.toString())
.schema(IntegerSchema()).let {
ParameterRef("${Wow.WOW_PREFIX}${it.name}", it)
}
val REQUEST_ID_PARAMETER = Parameter()
.name(CommandHeaders.REQUEST_ID)
.name(CommandRequestHeaders.REQUEST_ID)
.`in`(ParameterIn.HEADER.toString())
.schema(StringSchema()).let {
ParameterRef("${Wow.WOW_PREFIX}${it.name}", it)
}
val LOCAL_FIRST_PARAMETER = Parameter()
.name(CommandHeaders.LOCAL_FIRST)
.name(CommandRequestHeaders.LOCAL_FIRST)
.`in`(ParameterIn.HEADER.toString())
.required(false)
.schema(BooleanSchema()).let {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,6 @@ class WebFluxAutoConfiguration {
return GlobalExceptionHandler
}


@Bean
@ConditionalOnWebfluxGlobalErrorEnabled
fun commandMessageParser(commandMessageFactory: CommandMessageFactory,): CommandMessageParser {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import me.ahoo.wow.api.exception.BindingError
import me.ahoo.wow.api.exception.ErrorInfo
import me.ahoo.wow.exception.ErrorCodes
import me.ahoo.wow.exception.toErrorInfo
import me.ahoo.wow.openapi.command.CommandHeaders
import me.ahoo.wow.openapi.command.CommandRequestHeaders
import me.ahoo.wow.serialization.toJsonString
import me.ahoo.wow.webflux.exception.ErrorHttpStatusMapping.toHttpStatus
import org.slf4j.LoggerFactory
Expand Down Expand Up @@ -48,7 +48,7 @@ object GlobalExceptionHandler : WebExceptionHandler, Ordered {
val status = errorInfo.toHttpStatus()
val response = exchange.response
response.statusCode = status
response.headers.set(CommandHeaders.WOW_ERROR_CODE, errorInfo.errorCode)
response.headers.set(CommandRequestHeaders.WOW_ERROR_CODE, errorInfo.errorCode)
response.headers.contentType = MediaType.APPLICATION_JSON
return response.writeWith(Mono.just(response.bufferFactory().wrap(errorInfo.toJsonString().toByteArray())))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ package me.ahoo.wow.webflux.exception

import me.ahoo.wow.api.exception.ErrorInfo
import me.ahoo.wow.exception.toErrorInfo
import me.ahoo.wow.openapi.command.CommandHeaders.WOW_ERROR_CODE
import me.ahoo.wow.openapi.command.CommandRequestHeaders.WOW_ERROR_CODE
import me.ahoo.wow.serialization.toJsonString
import me.ahoo.wow.webflux.exception.ErrorHttpStatusMapping.toHttpStatus
import org.springframework.http.MediaType
Expand Down
Loading

0 comments on commit f2f22b1

Please sign in to comment.