From e432b46495dee3a1481c9bffb0e5009e695b9278 Mon Sep 17 00:00:00 2001 From: Philipp Ossler Date: Thu, 9 Mar 2023 10:17:52 +0100 Subject: [PATCH 1/6] deps: dump zeebe from 8.2.0-alpha4 to 8.2.0-alpha5 --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 6eac725..e425883 100644 --- a/pom.xml +++ b/pom.xml @@ -46,7 +46,7 @@ 3.0 1.8.10 - 8.2.0-alpha4 + 8.2.0-alpha5 5.9.2 From 338c9c6fb9b98ce29d0b3f5369a82e68ac1a2a9d Mon Sep 17 00:00:00 2001 From: Philipp Ossler Date: Thu, 9 Mar 2023 10:18:51 +0100 Subject: [PATCH 2/6] feat: implement new database API --- .../community/eze/db/EzeDbColumnFamily.kt | 20 +++++++++++++ .../community/eze/db/ColumnFamilyTest.java | 28 +++++++++++++++++++ 2 files changed, 48 insertions(+) diff --git a/eze/src/main/kotlin/org/camunda/community/eze/db/EzeDbColumnFamily.kt b/eze/src/main/kotlin/org/camunda/community/eze/db/EzeDbColumnFamily.kt index 7600095..fba427d 100644 --- a/eze/src/main/kotlin/org/camunda/community/eze/db/EzeDbColumnFamily.kt +++ b/eze/src/main/kotlin/org/camunda/community/eze/db/EzeDbColumnFamily.kt @@ -323,4 +323,24 @@ class EzeDbColumnFamily, valueInstance!!.wrap(valueViewBuffer, 0, valueViewBuffer.capacity()) return iteratorConsumer.visit(keyInstance, valueInstance) } + + override fun whileTrue(startAtKey: KeyType, visitor: KeyValuePairVisitor) { + + columnFamilyContext.withPrefixKey( + startAtKey + ) { prefixKey: ByteArray?, prefixLength: Int -> + ensureInOpenTransaction( + context + ) { transaction -> + + transaction.newIterator().seek(prefixKey!!, prefixLength).iterate().forEach { + + val shouldVisitNext = visit(keyInstance, valueInstance, visitor, it) + if (!shouldVisitNext) { + return@ensureInOpenTransaction + } + } + } + } + } } diff --git a/eze/src/test/kotlin/org/camunda/community/eze/db/ColumnFamilyTest.java b/eze/src/test/kotlin/org/camunda/community/eze/db/ColumnFamilyTest.java index 2f4878d..859ad52 100644 --- a/eze/src/test/kotlin/org/camunda/community/eze/db/ColumnFamilyTest.java +++ b/eze/src/test/kotlin/org/camunda/community/eze/db/ColumnFamilyTest.java @@ -445,6 +445,34 @@ public void shouldCheckIfEmpty() { assertThat(columnFamily.isEmpty()).isTrue(); } + @Test + public void shouldUseWhileTrueWithStartKey() { + // given + putKeyValuePair(1, 10); + putKeyValuePair(2, 20); // from here --- + putKeyValuePair(3, 30); + putKeyValuePair(4, 40); // to here --- + putKeyValuePair(5, 50); + + // when + key.wrapLong(2); + + final List keys = new ArrayList<>(); + final List values = new ArrayList<>(); + columnFamily.whileTrue( + key, + (key, value) -> { + keys.add(key.getValue()); + values.add(value.getValue()); + + return key.getValue() != 4; + }); + + // then + assertThat(keys).containsExactly(2L, 3L, 4L); + assertThat(values).containsExactly(20L, 30L, 40L); + } + private void putKeyValuePair(final int key, final int value) { this.key.wrapLong(key); this.value.wrapLong(value); From 5f973b5c4b2169d05fc778a1b5d53ae92111cdb6 Mon Sep 17 00:00:00 2001 From: Philipp Ossler Date: Thu, 9 Mar 2023 10:43:18 +0100 Subject: [PATCH 3/6] refactor: reduce code duplication --- .../community/eze/db/EzeDbColumnFamily.kt | 59 ++++++++++++------- 1 file changed, 38 insertions(+), 21 deletions(-) diff --git a/eze/src/main/kotlin/org/camunda/community/eze/db/EzeDbColumnFamily.kt b/eze/src/main/kotlin/org/camunda/community/eze/db/EzeDbColumnFamily.kt index fba427d..dde32a9 100644 --- a/eze/src/main/kotlin/org/camunda/community/eze/db/EzeDbColumnFamily.kt +++ b/eze/src/main/kotlin/org/camunda/community/eze/db/EzeDbColumnFamily.kt @@ -282,6 +282,33 @@ class EzeDbColumnFamily, keyInstance: KeyType, valueInstance: ValueType, visitor: KeyValuePairVisitor + ) { + whileWithPrefix( + context = context, + prefix = prefix, + keyInstance = keyInstance, + valueInstance = valueInstance, + visitor = visitor, + predicate = { prefixKey, prefixLength, key -> + startsWith( + prefixKey, + 0, + prefixLength, + key, + 0, + key.size + ) + } + ) + } + + private fun whileWithPrefix( + context: TransactionContext, + prefix: DbKey?, + keyInstance: KeyType, + valueInstance: ValueType, + visitor: KeyValuePairVisitor, + predicate: (ByteArray, Int, ByteArray) -> Boolean = { _, _, _ -> true } ) { columnFamilyContext.withPrefixKey( prefix!! @@ -293,16 +320,16 @@ class EzeDbColumnFamily, transaction.newIterator().seek(prefixKey!!, prefixLength).iterate().forEach { val keyBytes = it.key.byteArray - if (!startsWith(prefixKey, 0, prefixLength, keyBytes, 0, keyBytes.size)) { + + val shouldVisit = predicate.invoke(prefixKey, prefixLength, keyBytes) + if (!shouldVisit) { return@forEach } - val shouldVisitNext = visit(keyInstance, valueInstance, visitor, it) + val shouldVisitNext = visit(keyInstance, valueInstance, visitor, it) if (!shouldVisitNext) { return@ensureInOpenTransaction } - - } } } @@ -325,22 +352,12 @@ class EzeDbColumnFamily, } override fun whileTrue(startAtKey: KeyType, visitor: KeyValuePairVisitor) { - - columnFamilyContext.withPrefixKey( - startAtKey - ) { prefixKey: ByteArray?, prefixLength: Int -> - ensureInOpenTransaction( - context - ) { transaction -> - - transaction.newIterator().seek(prefixKey!!, prefixLength).iterate().forEach { - - val shouldVisitNext = visit(keyInstance, valueInstance, visitor, it) - if (!shouldVisitNext) { - return@ensureInOpenTransaction - } - } - } - } + whileWithPrefix( + context = context, + prefix = startAtKey, + keyInstance = keyInstance, + valueInstance = valueInstance, + visitor = visitor + ) } } From 4c22fc951f64d7528e528aa67338238ce3c0a8ab Mon Sep 17 00:00:00 2001 From: Philipp Ossler Date: Thu, 9 Mar 2023 14:11:29 +0100 Subject: [PATCH 4/6] feat: Support new decision evaluation API --- .../community/eze/grpc/GrpcResponseWriter.kt | 58 ++++++++++++ .../eze/grpc/GrpcToLogStreamGateway.kt | 24 +++++ eze/src/main/resources/rating.dmn | 89 +++++++++++++++++++ .../camunda/community/eze/EngineClientTest.kt | 25 ++++++ 4 files changed, 196 insertions(+) create mode 100644 eze/src/main/resources/rating.dmn diff --git a/eze/src/main/kotlin/org/camunda/community/eze/grpc/GrpcResponseWriter.kt b/eze/src/main/kotlin/org/camunda/community/eze/grpc/GrpcResponseWriter.kt index 034426c..1b18392 100644 --- a/eze/src/main/kotlin/org/camunda/community/eze/grpc/GrpcResponseWriter.kt +++ b/eze/src/main/kotlin/org/camunda/community/eze/grpc/GrpcResponseWriter.kt @@ -12,6 +12,7 @@ import com.google.rpc.Code import com.google.rpc.Status import io.camunda.zeebe.gateway.protocol.GatewayOuterClass import io.camunda.zeebe.protocol.impl.encoding.MsgPackConverter +import io.camunda.zeebe.protocol.impl.record.value.decision.DecisionEvaluationRecord import io.camunda.zeebe.protocol.impl.record.value.deployment.DeploymentRecord import io.camunda.zeebe.protocol.impl.record.value.incident.IncidentRecord import io.camunda.zeebe.protocol.impl.record.value.job.JobBatchRecord @@ -117,6 +118,7 @@ class GrpcResponseWriter( } ValueType.PROCESS_INSTANCE_MODIFICATION -> createProcessInstanceModificationResponse() + ValueType.DECISION_EVALUATION -> createDecisionEvaluationResponse() else -> TODO("not supported command '$valueType'") } @@ -314,6 +316,62 @@ class GrpcResponseWriter( .build() } + private fun createDecisionEvaluationResponse(): GatewayOuterClass.EvaluateDecisionResponse { + val record = DecisionEvaluationRecord() + record.wrap(valueBufferView) + + val builder = GatewayOuterClass.EvaluateDecisionResponse.newBuilder() + .setDecisionKey(record.decisionKey) + .setDecisionId(record.decisionId) + .setDecisionName(record.decisionName) + .setDecisionVersion(record.decisionVersion) + .setDecisionRequirementsId(record.decisionRequirementsId) + .setDecisionRequirementsKey(record.decisionRequirementsKey) + .setFailedDecisionId(record.failedDecisionId) + .setFailureMessage(record.evaluationFailureMessage) + .setDecisionOutput(record.decisionOutput) + .addAllEvaluatedDecisions( + record.evaluatedDecisions().map { evaluatedDecision -> + GatewayOuterClass.EvaluatedDecision.newBuilder() + .setDecisionKey(evaluatedDecision.decisionKey) + .setDecisionId(evaluatedDecision.decisionId) + .setDecisionName(evaluatedDecision.decisionName) + .setDecisionVersion(evaluatedDecision.decisionVersion) + .setDecisionType(evaluatedDecision.decisionType) + .setDecisionOutput(evaluatedDecision.decisionOutput) + .addAllEvaluatedInputs( + evaluatedDecision.evaluatedInputs().map { + GatewayOuterClass.EvaluatedDecisionInput.newBuilder() + .setInputId(it.inputId) + .setInputName(it.inputName) + .setInputValue(it.inputValue) + .build() + } + ) + .addAllMatchedRules( + evaluatedDecision.matchedRules().map { rule -> + GatewayOuterClass.MatchedDecisionRule.newBuilder() + .setRuleId(rule.ruleId) + .setRuleIndex(rule.ruleIndex) + .addAllEvaluatedOutputs( + rule.evaluatedOutputs().map { + GatewayOuterClass.EvaluatedDecisionOutput.newBuilder() + .setOutputId(it.outputId) + .setOutputName(it.outputName) + .setOutputValue(it.outputValue) + .build() + } + ) + .build() + } + ) + .build() + } + ) + + return builder.build() + } + private fun createRejectionResponse(): Status { val statusCode = when (rejectionType) { RejectionType.INVALID_ARGUMENT -> Code.INVALID_ARGUMENT_VALUE diff --git a/eze/src/main/kotlin/org/camunda/community/eze/grpc/GrpcToLogStreamGateway.kt b/eze/src/main/kotlin/org/camunda/community/eze/grpc/GrpcToLogStreamGateway.kt index 411f1a4..1c66ce7 100644 --- a/eze/src/main/kotlin/org/camunda/community/eze/grpc/GrpcToLogStreamGateway.kt +++ b/eze/src/main/kotlin/org/camunda/community/eze/grpc/GrpcToLogStreamGateway.kt @@ -16,6 +16,7 @@ import io.camunda.zeebe.logstreams.log.LogStreamWriter import io.camunda.zeebe.protocol.impl.encoding.MsgPackConverter import io.camunda.zeebe.protocol.impl.record.RecordMetadata import io.camunda.zeebe.protocol.impl.record.UnifiedRecordValue +import io.camunda.zeebe.protocol.impl.record.value.decision.DecisionEvaluationRecord import io.camunda.zeebe.protocol.impl.record.value.deployment.DeploymentRecord import io.camunda.zeebe.protocol.impl.record.value.incident.IncidentRecord import io.camunda.zeebe.protocol.impl.record.value.job.JobBatchRecord @@ -412,6 +413,29 @@ class GrpcToLogStreamGateway( } } + override fun evaluateDecision( + request: GatewayOuterClass.EvaluateDecisionRequest, + responseObserver: StreamObserver + ) { + executor.submit { + val requestId = registerNewRequest(responseObserver) + + val recordMetadata = prepareRecordMetadata() + .requestId(requestId) + .valueType(ValueType.DECISION_EVALUATION) + .intent(DecisionEvaluationIntent.EVALUATE) + + val command = DecisionEvaluationRecord() + + request.decisionKey.takeIf { it > 0 }?.let { command.decisionKey = it } + request.decisionId.takeIf { it.isNotEmpty() }?.let { command.decisionId = it } + + setVariablesAsMessagePack(request.variables, command::setVariables) + + writeCommandWithoutKey(recordMetadata, command) + } + } + override fun topology( request: GatewayOuterClass.TopologyRequest, responseObserver: StreamObserver diff --git a/eze/src/main/resources/rating.dmn b/eze/src/main/resources/rating.dmn new file mode 100644 index 0000000..d35cc48 --- /dev/null +++ b/eze/src/main/resources/rating.dmn @@ -0,0 +1,89 @@ + + + + + + + + + + decision_b + + + + + + "high" + + + "A++" + + + + + "mid" + + + "A+" + + + + + "low" + + + "A" + + + + + + + + + x + + + + + + > 10 + + + "high" + + + + + > 5 + + + "mid" + + + + + + + + "low" + + + + + + + + + + + + + + + + + + + + diff --git a/eze/src/test/kotlin/org/camunda/community/eze/EngineClientTest.kt b/eze/src/test/kotlin/org/camunda/community/eze/EngineClientTest.kt index ff36530..ac60cb8 100644 --- a/eze/src/test/kotlin/org/camunda/community/eze/EngineClientTest.kt +++ b/eze/src/test/kotlin/org/camunda/community/eze/EngineClientTest.kt @@ -1059,4 +1059,29 @@ class EngineClientTest { } } + @Test + fun `should evaluate decision`() { + // given + zeebeClient = ZeebeClient.newClientBuilder().usePlaintext().build() + zeebeClient + .newDeployResourceCommand() + .addResourceFromClasspath("rating.dmn") + .send() + .join() + + // when + val decisionEvaluation = + zeebeClient.newEvaluateDecisionCommand() + .decisionId("decision_a") + .variables(mapOf("x" to 7)) + .send() + .join() + + // then + assertThat(decisionEvaluation.decisionId).isEqualTo("decision_a") + assertThat(decisionEvaluation.decisionName).isEqualTo("Decision A") + assertThat(decisionEvaluation.decisionVersion).isEqualTo(1) + assertThat(decisionEvaluation.decisionOutput).isEqualTo("\"A+\"") + assertThat(decisionEvaluation.evaluatedDecisions).hasSize(2) + } } From b005dab9044d2b09b422b7fa677646bf13a4e13f Mon Sep 17 00:00:00 2001 From: Philipp Ossler Date: Fri, 10 Mar 2023 05:58:10 +0100 Subject: [PATCH 5/6] feat: Support new resource delete API --- .../community/eze/RecordStreamSource.kt | 10 ++++++ .../community/eze/grpc/GrpcResponseWriter.kt | 6 ++++ .../eze/grpc/GrpcToLogStreamGateway.kt | 20 +++++++++++ .../camunda/community/eze/EngineClientTest.kt | 35 ++++++++++++++++--- 4 files changed, 67 insertions(+), 4 deletions(-) diff --git a/eze/src/main/kotlin/org/camunda/community/eze/RecordStreamSource.kt b/eze/src/main/kotlin/org/camunda/community/eze/RecordStreamSource.kt index 0b8b222..09dbb33 100644 --- a/eze/src/main/kotlin/org/camunda/community/eze/RecordStreamSource.kt +++ b/eze/src/main/kotlin/org/camunda/community/eze/RecordStreamSource.kt @@ -11,6 +11,8 @@ import io.camunda.zeebe.protocol.record.Record import io.camunda.zeebe.protocol.record.RecordValue import io.camunda.zeebe.protocol.record.ValueType import io.camunda.zeebe.protocol.record.value.* +import io.camunda.zeebe.protocol.record.value.deployment.DecisionRecordValue +import io.camunda.zeebe.protocol.record.value.deployment.DecisionRequirementsRecordValue import io.camunda.zeebe.protocol.record.value.deployment.Process import org.camunda.community.eze.records.* @@ -75,4 +77,12 @@ interface RecordStreamSource { return records().ofValueType(ValueType.PROCESS_MESSAGE_SUBSCRIPTION) } + fun decisionRequirementsRecords(): Iterable> { + return records().ofValueType(ValueType.DECISION_REQUIREMENTS) + } + + fun decisionRecords(): Iterable> { + return records().ofValueType(ValueType.DECISION) + } + } diff --git a/eze/src/main/kotlin/org/camunda/community/eze/grpc/GrpcResponseWriter.kt b/eze/src/main/kotlin/org/camunda/community/eze/grpc/GrpcResponseWriter.kt index 1b18392..cdf1435 100644 --- a/eze/src/main/kotlin/org/camunda/community/eze/grpc/GrpcResponseWriter.kt +++ b/eze/src/main/kotlin/org/camunda/community/eze/grpc/GrpcResponseWriter.kt @@ -119,6 +119,7 @@ class GrpcResponseWriter( ValueType.PROCESS_INSTANCE_MODIFICATION -> createProcessInstanceModificationResponse() ValueType.DECISION_EVALUATION -> createDecisionEvaluationResponse() + ValueType.RESOURCE_DELETION -> createDeleteResourceResponse() else -> TODO("not supported command '$valueType'") } @@ -372,6 +373,11 @@ class GrpcResponseWriter( return builder.build() } + private fun createDeleteResourceResponse(): GatewayOuterClass.DeleteResourceResponse { + return GatewayOuterClass.DeleteResourceResponse.newBuilder() + .build() + } + private fun createRejectionResponse(): Status { val statusCode = when (rejectionType) { RejectionType.INVALID_ARGUMENT -> Code.INVALID_ARGUMENT_VALUE diff --git a/eze/src/main/kotlin/org/camunda/community/eze/grpc/GrpcToLogStreamGateway.kt b/eze/src/main/kotlin/org/camunda/community/eze/grpc/GrpcToLogStreamGateway.kt index 1c66ce7..291159e 100644 --- a/eze/src/main/kotlin/org/camunda/community/eze/grpc/GrpcToLogStreamGateway.kt +++ b/eze/src/main/kotlin/org/camunda/community/eze/grpc/GrpcToLogStreamGateway.kt @@ -23,6 +23,7 @@ import io.camunda.zeebe.protocol.impl.record.value.job.JobBatchRecord import io.camunda.zeebe.protocol.impl.record.value.job.JobRecord import io.camunda.zeebe.protocol.impl.record.value.message.MessageRecord import io.camunda.zeebe.protocol.impl.record.value.processinstance.* +import io.camunda.zeebe.protocol.impl.record.value.resource.ResourceDeletionRecord import io.camunda.zeebe.protocol.impl.record.value.variable.VariableDocumentRecord import io.camunda.zeebe.protocol.record.RecordType import io.camunda.zeebe.protocol.record.ValueType @@ -436,6 +437,25 @@ class GrpcToLogStreamGateway( } } + override fun deleteResource( + request: GatewayOuterClass.DeleteResourceRequest, + responseObserver: StreamObserver + ) { + executor.submit { + val requestId = registerNewRequest(responseObserver) + + val recordMetadata = prepareRecordMetadata() + .requestId(requestId) + .valueType(ValueType.RESOURCE_DELETION) + .intent(ResourceDeletionIntent.DELETE) + + val command = ResourceDeletionRecord() + command.resourceKey = request.resourceKey + + writeCommandWithoutKey(recordMetadata, command) + } + } + override fun topology( request: GatewayOuterClass.TopologyRequest, responseObserver: StreamObserver diff --git a/eze/src/test/kotlin/org/camunda/community/eze/EngineClientTest.kt b/eze/src/test/kotlin/org/camunda/community/eze/EngineClientTest.kt index ac60cb8..362b87f 100644 --- a/eze/src/test/kotlin/org/camunda/community/eze/EngineClientTest.kt +++ b/eze/src/test/kotlin/org/camunda/community/eze/EngineClientTest.kt @@ -14,10 +14,7 @@ import io.camunda.zeebe.client.api.response.PartitionBrokerHealth import io.camunda.zeebe.client.api.response.PartitionBrokerRole import io.camunda.zeebe.client.api.worker.JobClient import io.camunda.zeebe.model.bpmn.Bpmn -import io.camunda.zeebe.protocol.record.intent.Intent -import io.camunda.zeebe.protocol.record.intent.JobIntent -import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent -import io.camunda.zeebe.protocol.record.intent.TimerIntent +import io.camunda.zeebe.protocol.record.intent.* import io.camunda.zeebe.protocol.record.value.BpmnElementType import io.camunda.zeebe.protocol.record.value.ProcessInstanceRecordValue import io.camunda.zeebe.protocol.record.value.VariableRecordValue @@ -1084,4 +1081,34 @@ class EngineClientTest { assertThat(decisionEvaluation.decisionOutput).isEqualTo("\"A+\"") assertThat(decisionEvaluation.evaluatedDecisions).hasSize(2) } + + @Test + fun `should delete resources`() { + // given + zeebeClient = ZeebeClient.newClientBuilder().usePlaintext().build() + val deployment = zeebeClient + .newDeployResourceCommand() + .addResourceFromClasspath("rating.dmn") + .send() + .join() + + val drg = deployment.decisionRequirements.first() + assertThat(drg).isNotNull + + // when + zeebeClient + .newDeleteResourceCommand(drg.decisionRequirementsKey) + .send() + .join() + + // then + await.untilAsserted { + val drgDeleted = + zeebeEngine.decisionRequirementsRecords() + .withIntent(DecisionRequirementsIntent.DELETED).first() + + assertThat(drgDeleted).isNotNull + assertThat(drgDeleted.value.decisionRequirementsKey).isEqualTo(drg.decisionRequirementsKey) + } + } } From 6f37654f409bdd41941a3ad0f938f8b8f3373f28 Mon Sep 17 00:00:00 2001 From: Philipp Ossler Date: Fri, 10 Mar 2023 06:15:53 +0100 Subject: [PATCH 6/6] fix: disable batch processing Disable the batch processing for now to avoid losing command responses. Otherwise, a test case fails. --- .../camunda/community/eze/engine/EzeStreamProcessorFactory.kt | 2 ++ 1 file changed, 2 insertions(+) diff --git a/eze/src/main/kotlin/org/camunda/community/eze/engine/EzeStreamProcessorFactory.kt b/eze/src/main/kotlin/org/camunda/community/eze/engine/EzeStreamProcessorFactory.kt index a29a2c7..332d13e 100644 --- a/eze/src/main/kotlin/org/camunda/community/eze/engine/EzeStreamProcessorFactory.kt +++ b/eze/src/main/kotlin/org/camunda/community/eze/engine/EzeStreamProcessorFactory.kt @@ -43,6 +43,8 @@ object EzeStreamProcessorFactory { .actorSchedulingService(scheduler) .streamProcessorMode(StreamProcessorMode.PROCESSING) .recordProcessors(listOf(createZeebeEngine(partitionCount))) + // disable batch processing until https://github.com/camunda/zeebe/issues/11848 is fixed + .maxCommandsInBatch(1) .build() return EzeStreamProcessor(