From 020a4f2fd3365300d0c0df484e6c42ba6f28cecc Mon Sep 17 00:00:00 2001 From: Jacob Viau Date: Thu, 5 Oct 2023 16:44:25 -0700 Subject: [PATCH] Add special EntityMessageEvent --- .../AzureStorageOrchestrationService.cs | 7 +++-- src/DurableTask.Core/Common/Entities.cs | 9 ++++++ .../Entities/EntityMessageEvent.cs | 6 ++-- .../Entities/OrchestrationEntityContext.cs | 2 +- .../History/EntityOperationEvent.cs | 31 +++++++++++++++++++ .../History/EventRaisedEvent.cs | 24 +++++++++++++- src/DurableTask.Core/OrchestrationInstance.cs | 22 +++++++++++++ src/DurableTask.Core/OrchestrationTags.cs | 17 ++++++++-- .../TaskOrchestrationDispatcher.cs | 5 +-- 9 files changed, 109 insertions(+), 14 deletions(-) create mode 100644 src/DurableTask.Core/History/EntityOperationEvent.cs diff --git a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs index 9f5d15047..36adce6b8 100644 --- a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs +++ b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs @@ -908,8 +908,9 @@ TraceContextBase GetParentTraceContext(OrchestrationSession session) } break; - } - } else + } + } + else { // In this case, we set the parentTraceContext later in this method @@ -917,7 +918,7 @@ TraceContextBase GetParentTraceContext(OrchestrationSession session) { foundEventRaised = true; } - } + } } // When EventRaisedEvent is present, it will not, out of the box, share the same operation diff --git a/src/DurableTask.Core/Common/Entities.cs b/src/DurableTask.Core/Common/Entities.cs index dc3ba2434..b7bd6d868 100644 --- a/src/DurableTask.Core/Common/Entities.cs +++ b/src/DurableTask.Core/Common/Entities.cs @@ -92,6 +92,15 @@ public static bool AutoStart(string instanceId, IList newMessages) OrchestrationInstance = orchestrationInstance, Event = startedEvent }; + + if (newMessages[0].Event is EntityMessageEvent) + { + startedEvent.Tags = new Dictionary() + { + [OrchestrationTags.Entity] = "" + }; + } + newMessages.Insert(0, taskMessage); return true; } diff --git a/src/DurableTask.Core/Entities/EntityMessageEvent.cs b/src/DurableTask.Core/Entities/EntityMessageEvent.cs index 924d5c7fd..a7d569846 100644 --- a/src/DurableTask.Core/Entities/EntityMessageEvent.cs +++ b/src/DurableTask.Core/Entities/EntityMessageEvent.cs @@ -25,13 +25,13 @@ public readonly struct EntityMessageEvent { readonly string eventName; readonly EntityMessage message; - readonly OrchestrationInstance target; + readonly EntityInstance target; internal EntityMessageEvent(string eventName, EntityMessage message, OrchestrationInstance target) { this.eventName = eventName; this.message = message; - this.target = target; + this.target = target as EntityInstance ?? EntityInstance.FromBase(target); } /// @@ -78,7 +78,7 @@ public TaskMessage AsTaskMessage() return new TaskMessage { OrchestrationInstance = this.target, - Event = new History.EventRaisedEvent(-1, this.ContentAsString()) + Event = new History.EntityMessageEvent(-1, this.ContentAsString()) { Name = this.eventName } diff --git a/src/DurableTask.Core/Entities/OrchestrationEntityContext.cs b/src/DurableTask.Core/Entities/OrchestrationEntityContext.cs index c93d0ee4a..2787d9ad0 100644 --- a/src/DurableTask.Core/Entities/OrchestrationEntityContext.cs +++ b/src/DurableTask.Core/Entities/OrchestrationEntityContext.cs @@ -199,7 +199,7 @@ public IEnumerable EmitLockReleaseMessages() foreach (var entityId in this.criticalSectionLocks!) { - var instance = new OrchestrationInstance() { InstanceId = entityId.ToString() }; + var instance = new EntityInstance() { InstanceId = entityId.ToString() }; yield return new EntityMessageEvent(EntityMessageEventNames.ReleaseMessageEventName, message, instance); } diff --git a/src/DurableTask.Core/History/EntityOperationEvent.cs b/src/DurableTask.Core/History/EntityOperationEvent.cs new file mode 100644 index 000000000..e83ecc946 --- /dev/null +++ b/src/DurableTask.Core/History/EntityOperationEvent.cs @@ -0,0 +1,31 @@ +// ---------------------------------------------------------------------------------- +// Copyright Microsoft Corporation +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ---------------------------------------------------------------------------------- + +namespace DurableTask.Core.History +{ + /// + /// A special case of for entities-as-orchestrations. + /// + public class EntityMessageEvent : EventRaisedEvent + { + /// + /// Creates a new with the supplied event id and input. + /// + /// The ID of the event. + /// The serialized operation payload. + public EntityMessageEvent(int eventId, string input) + : base(eventId, input) + { + } + } +} diff --git a/src/DurableTask.Core/History/EventRaisedEvent.cs b/src/DurableTask.Core/History/EventRaisedEvent.cs index 6da23f27c..1b24b4c7e 100644 --- a/src/DurableTask.Core/History/EventRaisedEvent.cs +++ b/src/DurableTask.Core/History/EventRaisedEvent.cs @@ -13,8 +13,9 @@ namespace DurableTask.Core.History { + using DurableTask.Core.Command; using DurableTask.Core.Tracing; - using System.Diagnostics; + using System; using System.Runtime.Serialization; /// @@ -56,5 +57,26 @@ public EventRaisedEvent(int eventId, string input) /// [DataMember] public DistributedTraceContext ParentTraceContext { get; set; } + + internal static EventRaisedEvent FromAction(SendEventOrchestratorAction action) + { + if (action is null) + { + throw new ArgumentNullException(nameof(action)); + } + + if (action.Instance is EntityInstance) + { + return new EntityMessageEvent(-1, action.EventData) + { + Name = action.EventName, + }; + } + + return new EventRaisedEvent(-1, action.EventData) + { + Name = action.EventName, + }; + } } } \ No newline at end of file diff --git a/src/DurableTask.Core/OrchestrationInstance.cs b/src/DurableTask.Core/OrchestrationInstance.cs index d6d6ee558..53cae4588 100644 --- a/src/DurableTask.Core/OrchestrationInstance.cs +++ b/src/DurableTask.Core/OrchestrationInstance.cs @@ -13,6 +13,7 @@ namespace DurableTask.Core { + using System; using System.Diagnostics.CodeAnalysis; using System.Runtime.Serialization; @@ -71,4 +72,25 @@ public override string ToString() /// public ExtensionDataObject ExtensionData { get; set; } } + + /// + /// A marker for entity instances. + /// + internal class EntityInstance : OrchestrationInstance + { + internal static EntityInstance FromBase(OrchestrationInstance instance) + { + if (instance is null) + { + throw new ArgumentNullException(nameof(instance)); + } + + return new EntityInstance + { + InstanceId = instance.InstanceId, + ExecutionId = instance.ExecutionId, + ExtensionData = instance.ExtensionData, + }; + } + } } \ No newline at end of file diff --git a/src/DurableTask.Core/OrchestrationTags.cs b/src/DurableTask.Core/OrchestrationTags.cs index 960297143..3a544147d 100644 --- a/src/DurableTask.Core/OrchestrationTags.cs +++ b/src/DurableTask.Core/OrchestrationTags.cs @@ -11,10 +11,8 @@ // limitations under the License. // ---------------------------------------------------------------------------------- -using System; using System.Collections.Generic; using System.Linq; -using System.Text; namespace DurableTask.Core { @@ -36,6 +34,11 @@ public static class OrchestrationTags /// public const string FireAndForget = "FireAndForget"; + /// + /// Used for entities-as-orchestrations. + /// + public const string Entity = "__Entity__"; + /// /// Check whether the given tags contain the fire and forget tag /// @@ -46,6 +49,16 @@ internal static bool IsTaggedAsFireAndForget(IDictionary tags) return tags != null && tags.ContainsKey(FireAndForget); } + /// + /// Check whether the given tags contain the entity tag. + /// + /// + /// + internal static bool IsEntity(IDictionary tags) + { + return tags != null && tags.ContainsKey(Entity); + } + internal static IDictionary MergeTags( IDictionary newTags, IDictionary existingTags) diff --git a/src/DurableTask.Core/TaskOrchestrationDispatcher.cs b/src/DurableTask.Core/TaskOrchestrationDispatcher.cs index 339865c86..df2e38adc 100644 --- a/src/DurableTask.Core/TaskOrchestrationDispatcher.cs +++ b/src/DurableTask.Core/TaskOrchestrationDispatcher.cs @@ -1148,10 +1148,7 @@ TaskMessage ProcessSendEventDecision( runtimeState.AddEvent(historyEvent); - EventRaisedEvent eventRaisedEvent = new EventRaisedEvent(-1, sendEventAction.EventData) - { - Name = sendEventAction.EventName - }; + EventRaisedEvent eventRaisedEvent = EventRaisedEvent.FromAction(sendEventAction); // Distributed Tracing: start a new trace activity derived from the orchestration // for an EventRaisedEvent (external event)