Skip to content

Commit

Permalink
Merge pull request #272 from camunda-community-hub/update-zeebe-8.2.0…
Browse files Browse the repository at this point in the history
…-alpha5

Update Zeebe to `8.2.0-alpha5`
  • Loading branch information
saig0 authored Mar 10, 2023
2 parents 3b6ccc6 + 6f37654 commit eadec39
Show file tree
Hide file tree
Showing 9 changed files with 335 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import io.camunda.zeebe.protocol.record.Record
import io.camunda.zeebe.protocol.record.RecordValue
import io.camunda.zeebe.protocol.record.ValueType
import io.camunda.zeebe.protocol.record.value.*
import io.camunda.zeebe.protocol.record.value.deployment.DecisionRecordValue
import io.camunda.zeebe.protocol.record.value.deployment.DecisionRequirementsRecordValue
import io.camunda.zeebe.protocol.record.value.deployment.Process
import org.camunda.community.eze.records.*

Expand Down Expand Up @@ -75,4 +77,12 @@ interface RecordStreamSource {
return records().ofValueType(ValueType.PROCESS_MESSAGE_SUBSCRIPTION)
}

fun decisionRequirementsRecords(): Iterable<Record<DecisionRequirementsRecordValue>> {
return records().ofValueType(ValueType.DECISION_REQUIREMENTS)
}

fun decisionRecords(): Iterable<Record<DecisionRecordValue>> {
return records().ofValueType(ValueType.DECISION)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,33 @@ class EzeDbColumnFamily<ColumnFamilyNames : Enum<ColumnFamilyNames>,
keyInstance: KeyType,
valueInstance: ValueType,
visitor: KeyValuePairVisitor<KeyType, ValueType>
) {
whileWithPrefix(
context = context,
prefix = prefix,
keyInstance = keyInstance,
valueInstance = valueInstance,
visitor = visitor,
predicate = { prefixKey, prefixLength, key ->
startsWith(
prefixKey,
0,
prefixLength,
key,
0,
key.size
)
}
)
}

private fun <KeyType : DbKey?, ValueType : DbValue?> whileWithPrefix(
context: TransactionContext,
prefix: DbKey?,
keyInstance: KeyType,
valueInstance: ValueType,
visitor: KeyValuePairVisitor<KeyType, ValueType>,
predicate: (ByteArray, Int, ByteArray) -> Boolean = { _, _, _ -> true }
) {
columnFamilyContext.withPrefixKey(
prefix!!
Expand All @@ -293,16 +320,16 @@ class EzeDbColumnFamily<ColumnFamilyNames : Enum<ColumnFamilyNames>,
transaction.newIterator().seek(prefixKey!!, prefixLength).iterate().forEach {

val keyBytes = it.key.byteArray
if (!startsWith(prefixKey, 0, prefixLength, keyBytes, 0, keyBytes.size)) {

val shouldVisit = predicate.invoke(prefixKey, prefixLength, keyBytes)
if (!shouldVisit) {
return@forEach
}
val shouldVisitNext = visit(keyInstance, valueInstance, visitor, it)

val shouldVisitNext = visit(keyInstance, valueInstance, visitor, it)
if (!shouldVisitNext) {
return@ensureInOpenTransaction
}


}
}
}
Expand All @@ -323,4 +350,14 @@ class EzeDbColumnFamily<ColumnFamilyNames : Enum<ColumnFamilyNames>,
valueInstance!!.wrap(valueViewBuffer, 0, valueViewBuffer.capacity())
return iteratorConsumer.visit(keyInstance, valueInstance)
}

override fun whileTrue(startAtKey: KeyType, visitor: KeyValuePairVisitor<KeyType, ValueType>) {
whileWithPrefix(
context = context,
prefix = startAtKey,
keyInstance = keyInstance,
valueInstance = valueInstance,
visitor = visitor
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ object EzeStreamProcessorFactory {
.actorSchedulingService(scheduler)
.streamProcessorMode(StreamProcessorMode.PROCESSING)
.recordProcessors(listOf(createZeebeEngine(partitionCount)))
// disable batch processing until https://github.com/camunda/zeebe/issues/11848 is fixed
.maxCommandsInBatch(1)
.build()

return EzeStreamProcessor(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import com.google.rpc.Code
import com.google.rpc.Status
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass
import io.camunda.zeebe.protocol.impl.encoding.MsgPackConverter
import io.camunda.zeebe.protocol.impl.record.value.decision.DecisionEvaluationRecord
import io.camunda.zeebe.protocol.impl.record.value.deployment.DeploymentRecord
import io.camunda.zeebe.protocol.impl.record.value.incident.IncidentRecord
import io.camunda.zeebe.protocol.impl.record.value.job.JobBatchRecord
Expand Down Expand Up @@ -117,6 +118,8 @@ class GrpcResponseWriter(
}

ValueType.PROCESS_INSTANCE_MODIFICATION -> createProcessInstanceModificationResponse()
ValueType.DECISION_EVALUATION -> createDecisionEvaluationResponse()
ValueType.RESOURCE_DELETION -> createDeleteResourceResponse()

else -> TODO("not supported command '$valueType'")
}
Expand Down Expand Up @@ -314,6 +317,67 @@ class GrpcResponseWriter(
.build()
}

private fun createDecisionEvaluationResponse(): GatewayOuterClass.EvaluateDecisionResponse {
val record = DecisionEvaluationRecord()
record.wrap(valueBufferView)

val builder = GatewayOuterClass.EvaluateDecisionResponse.newBuilder()
.setDecisionKey(record.decisionKey)
.setDecisionId(record.decisionId)
.setDecisionName(record.decisionName)
.setDecisionVersion(record.decisionVersion)
.setDecisionRequirementsId(record.decisionRequirementsId)
.setDecisionRequirementsKey(record.decisionRequirementsKey)
.setFailedDecisionId(record.failedDecisionId)
.setFailureMessage(record.evaluationFailureMessage)
.setDecisionOutput(record.decisionOutput)
.addAllEvaluatedDecisions(
record.evaluatedDecisions().map { evaluatedDecision ->
GatewayOuterClass.EvaluatedDecision.newBuilder()
.setDecisionKey(evaluatedDecision.decisionKey)
.setDecisionId(evaluatedDecision.decisionId)
.setDecisionName(evaluatedDecision.decisionName)
.setDecisionVersion(evaluatedDecision.decisionVersion)
.setDecisionType(evaluatedDecision.decisionType)
.setDecisionOutput(evaluatedDecision.decisionOutput)
.addAllEvaluatedInputs(
evaluatedDecision.evaluatedInputs().map {
GatewayOuterClass.EvaluatedDecisionInput.newBuilder()
.setInputId(it.inputId)
.setInputName(it.inputName)
.setInputValue(it.inputValue)
.build()
}
)
.addAllMatchedRules(
evaluatedDecision.matchedRules().map { rule ->
GatewayOuterClass.MatchedDecisionRule.newBuilder()
.setRuleId(rule.ruleId)
.setRuleIndex(rule.ruleIndex)
.addAllEvaluatedOutputs(
rule.evaluatedOutputs().map {
GatewayOuterClass.EvaluatedDecisionOutput.newBuilder()
.setOutputId(it.outputId)
.setOutputName(it.outputName)
.setOutputValue(it.outputValue)
.build()
}
)
.build()
}
)
.build()
}
)

return builder.build()
}

private fun createDeleteResourceResponse(): GatewayOuterClass.DeleteResourceResponse {
return GatewayOuterClass.DeleteResourceResponse.newBuilder()
.build()
}

private fun createRejectionResponse(): Status {
val statusCode = when (rejectionType) {
RejectionType.INVALID_ARGUMENT -> Code.INVALID_ARGUMENT_VALUE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@ import io.camunda.zeebe.logstreams.log.LogStreamWriter
import io.camunda.zeebe.protocol.impl.encoding.MsgPackConverter
import io.camunda.zeebe.protocol.impl.record.RecordMetadata
import io.camunda.zeebe.protocol.impl.record.UnifiedRecordValue
import io.camunda.zeebe.protocol.impl.record.value.decision.DecisionEvaluationRecord
import io.camunda.zeebe.protocol.impl.record.value.deployment.DeploymentRecord
import io.camunda.zeebe.protocol.impl.record.value.incident.IncidentRecord
import io.camunda.zeebe.protocol.impl.record.value.job.JobBatchRecord
import io.camunda.zeebe.protocol.impl.record.value.job.JobRecord
import io.camunda.zeebe.protocol.impl.record.value.message.MessageRecord
import io.camunda.zeebe.protocol.impl.record.value.processinstance.*
import io.camunda.zeebe.protocol.impl.record.value.resource.ResourceDeletionRecord
import io.camunda.zeebe.protocol.impl.record.value.variable.VariableDocumentRecord
import io.camunda.zeebe.protocol.record.RecordType
import io.camunda.zeebe.protocol.record.ValueType
Expand Down Expand Up @@ -412,6 +414,48 @@ class GrpcToLogStreamGateway(
}
}

override fun evaluateDecision(
request: GatewayOuterClass.EvaluateDecisionRequest,
responseObserver: StreamObserver<GatewayOuterClass.EvaluateDecisionResponse>
) {
executor.submit {
val requestId = registerNewRequest(responseObserver)

val recordMetadata = prepareRecordMetadata()
.requestId(requestId)
.valueType(ValueType.DECISION_EVALUATION)
.intent(DecisionEvaluationIntent.EVALUATE)

val command = DecisionEvaluationRecord()

request.decisionKey.takeIf { it > 0 }?.let { command.decisionKey = it }
request.decisionId.takeIf { it.isNotEmpty() }?.let { command.decisionId = it }

setVariablesAsMessagePack(request.variables, command::setVariables)

writeCommandWithoutKey(recordMetadata, command)
}
}

override fun deleteResource(
request: GatewayOuterClass.DeleteResourceRequest,
responseObserver: StreamObserver<GatewayOuterClass.DeleteResourceResponse>
) {
executor.submit {
val requestId = registerNewRequest(responseObserver)

val recordMetadata = prepareRecordMetadata()
.requestId(requestId)
.valueType(ValueType.RESOURCE_DELETION)
.intent(ResourceDeletionIntent.DELETE)

val command = ResourceDeletionRecord()
command.resourceKey = request.resourceKey

writeCommandWithoutKey(recordMetadata, command)
}
}

override fun topology(
request: GatewayOuterClass.TopologyRequest,
responseObserver: StreamObserver<GatewayOuterClass.TopologyResponse>
Expand Down
89 changes: 89 additions & 0 deletions eze/src/main/resources/rating.dmn
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
<?xml version="1.0" encoding="UTF-8"?>
<definitions xmlns="https://www.omg.org/spec/DMN/20191111/MODEL/" xmlns:dmndi="https://www.omg.org/spec/DMN/20191111/DMNDI/" xmlns:dc="http://www.omg.org/spec/DMN/20180521/DC/" xmlns:modeler="http://camunda.org/schema/modeler/1.0" xmlns:di="http://www.omg.org/spec/DMN/20180521/DI/" id="Ratings" name="DRD" namespace="http://camunda.org/schema/1.0/dmn" exporter="Camunda Modeler" exporterVersion="5.8.0" modeler:executionPlatform="Camunda Cloud" modeler:executionPlatformVersion="8.1.0">
<decision id="decision_a" name="Decision A">
<informationRequirement id="InformationRequirement_1sdnwtv">
<requiredDecision href="#decision_b" />
</informationRequirement>
<decisionTable id="DecisionTable_1hibhur">
<input id="Input_1" label="B">
<inputExpression id="InputExpression_1" typeRef="string">
<text>decision_b</text>
</inputExpression>
</input>
<output id="Output_1" label="A" name="decision_a" typeRef="string" />
<rule id="DecisionRule_1sz0j2x">
<inputEntry id="UnaryTests_1r1z1nc">
<text>"high"</text>
</inputEntry>
<outputEntry id="LiteralExpression_0gbpbaq">
<text>"A++"</text>
</outputEntry>
</rule>
<rule id="DecisionRule_0gt2ru5">
<inputEntry id="UnaryTests_15968nq">
<text>"mid"</text>
</inputEntry>
<outputEntry id="LiteralExpression_007rqht">
<text>"A+"</text>
</outputEntry>
</rule>
<rule id="DecisionRule_0ecrw8b">
<inputEntry id="UnaryTests_07gkonx">
<text>"low"</text>
</inputEntry>
<outputEntry id="LiteralExpression_1pq462u">
<text>"A"</text>
</outputEntry>
</rule>
</decisionTable>
</decision>
<decision id="decision_b" name="Decision B">
<decisionTable id="DecisionTable_0ihx504" hitPolicy="FIRST">
<input id="InputClause_11xz5ga" label="x">
<inputExpression id="LiteralExpression_01im9ul" typeRef="number">
<text>x</text>
</inputExpression>
</input>
<output id="OutputClause_0przvpv" label="B" name="decision_b" typeRef="string" />
<rule id="DecisionRule_0k1nys1">
<inputEntry id="UnaryTests_10glusy">
<text>&gt; 10</text>
</inputEntry>
<outputEntry id="LiteralExpression_1cogxsy">
<text>"high"</text>
</outputEntry>
</rule>
<rule id="DecisionRule_018nu5a">
<inputEntry id="UnaryTests_1ayzs2v">
<text>&gt; 5</text>
</inputEntry>
<outputEntry id="LiteralExpression_10m4edf">
<text>"mid"</text>
</outputEntry>
</rule>
<rule id="DecisionRule_1do7v4g">
<inputEntry id="UnaryTests_0t9q7uj">
<text></text>
</inputEntry>
<outputEntry id="LiteralExpression_10g3mkg">
<text>"low"</text>
</outputEntry>
</rule>
</decisionTable>
</decision>
<dmndi:DMNDI>
<dmndi:DMNDiagram>
<dmndi:DMNShape dmnElementRef="decision_a">
<dc:Bounds height="80" width="180" x="160" y="100" />
</dmndi:DMNShape>
<dmndi:DMNEdge id="DMNEdge_12vg7f9" dmnElementRef="InformationRequirement_1sdnwtv">
<di:waypoint x="250" y="240" />
<di:waypoint x="250" y="200" />
<di:waypoint x="250" y="180" />
</dmndi:DMNEdge>
<dmndi:DMNShape id="DMNShape_0xp7kud" dmnElementRef="decision_b">
<dc:Bounds height="80" width="180" x="160" y="240" />
</dmndi:DMNShape>
</dmndi:DMNDiagram>
</dmndi:DMNDI>
</definitions>
Loading

0 comments on commit eadec39

Please sign in to comment.