diff --git a/src/Orleans.Core.Abstractions/Diagnostics/ActivitySources.cs b/src/Orleans.Core.Abstractions/Diagnostics/ActivitySources.cs new file mode 100644 index 00000000000..25557ec2b34 --- /dev/null +++ b/src/Orleans.Core.Abstractions/Diagnostics/ActivitySources.cs @@ -0,0 +1,32 @@ +using System.Diagnostics; + +namespace Orleans.Diagnostics; + +public static class ActivitySources +{ + /// + /// Spans triggered from application level code + /// + public const string ApplicationGrainActivitySourceName = "Microsoft.Orleans.Application"; + /// + /// Spans triggered from Orleans runtime code + /// + public const string RuntimeActivitySourceName = "Microsoft.Orleans.Runtime"; + /// + /// Spans tied to lifecycle operations such as activation, migration, and deactivation. + /// + public const string LifecycleActivitySourceName = "Microsoft.Orleans.Lifecycle"; + /// + /// Spans tied to persistent storage operations. + /// + public const string StorageActivitySourceName = "Microsoft.Orleans.Storage"; + /// + /// A wildcard name to match all Orleans activity sources. + /// + public const string AllActivitySourceName = "Microsoft.Orleans.*"; + + internal static readonly ActivitySource ApplicationGrainSource = new(ApplicationGrainActivitySourceName, "1.1.0"); + internal static readonly ActivitySource RuntimeGrainSource = new(RuntimeActivitySourceName, "2.0.0"); + internal static readonly ActivitySource LifecycleGrainSource = new(LifecycleActivitySourceName, "1.0.0"); + internal static readonly ActivitySource StorageGrainSource = new(StorageActivitySourceName, "1.0.0"); +} diff --git a/src/Orleans.Core.Abstractions/Diagnostics/ActivityTagKeys.cs b/src/Orleans.Core.Abstractions/Diagnostics/ActivityTagKeys.cs new file mode 100644 index 00000000000..7c887c2a789 --- /dev/null +++ b/src/Orleans.Core.Abstractions/Diagnostics/ActivityTagKeys.cs @@ -0,0 +1,138 @@ +namespace Orleans.Diagnostics; + +/// +/// Contains constants for Activity tag keys used throughout Orleans. +/// +internal static class ActivityTagKeys +{ + /// + /// The request ID for an async enumerable operation. + /// + public const string AsyncEnumerableRequestId = "orleans.async_enumerable.request_id"; + + /// + /// The activation ID tag key. + /// + public const string ActivationId = "orleans.activation.id"; + + /// + /// The activation cause tag key (e.g., "new" or "rehydrate"). + /// + public const string ActivationCause = "orleans.activation.cause"; + + /// + /// The grain ID tag key. + /// + public const string GrainId = "orleans.grain.id"; + + /// + /// The grain type tag key. + /// + public const string GrainType = "orleans.grain.type"; + + /// + /// The silo ID tag key. + /// + public const string SiloId = "orleans.silo.id"; + + /// + /// The directory previous registration present tag key. + /// + public const string DirectoryPreviousRegistrationPresent = "orleans.directory.previousRegistration.present"; + + /// + /// The directory registered address tag key. + /// + public const string DirectoryRegisteredAddress = "orleans.directory.registered.address"; + + /// + /// The directory forwarding address tag key. + /// + public const string DirectoryForwardingAddress = "orleans.directory.forwarding.address"; + + /// + /// The exception type tag key. + /// + public const string ExceptionType = "exception.type"; + + /// + /// The exception message tag key. + /// + public const string ExceptionMessage = "exception.message"; + + /// + /// The placement filter type tag key. + /// + public const string PlacementFilterType = "orleans.placement.filter.type"; + + /// + /// The storage provider tag key. + /// + public const string StorageProvider = "orleans.storage.provider"; + + /// + /// The storage state name tag key. + /// + public const string StorageStateName = "orleans.storage.state.name"; + + /// + /// The storage state type tag key. + /// + public const string StorageStateType = "orleans.storage.state.type"; + + /// + /// The RPC system tag key. + /// + public const string RpcSystem = "rpc.system"; + + /// + /// The RPC service tag key. + /// + public const string RpcService = "rpc.service"; + + /// + /// The RPC method tag key. + /// + public const string RpcMethod = "rpc.method"; + + /// + /// The RPC Orleans target ID tag key. + /// + public const string RpcOrleansTargetId = "rpc.orleans.target_id"; + + /// + /// The RPC Orleans source ID tag key. + /// + public const string RpcOrleansSourceId = "rpc.orleans.source_id"; + + /// + /// The exception stacktrace tag key. + /// + public const string ExceptionStacktrace = "exception.stacktrace"; + + /// + /// The exception escaped tag key. + /// + public const string ExceptionEscaped = "exception.escaped"; + + /// + /// Indicates whether a rehydration attempt was ignored. + /// + public const string RehydrateIgnored = "orleans.rehydrate.ignored"; + + /// + /// The reason why a rehydration attempt was ignored. + /// + public const string RehydrateIgnoredReason = "orleans.rehydrate.ignored.reason"; + + /// + /// The previous registration address during rehydration. + /// + public const string RehydratePreviousRegistration = "orleans.rehydrate.previousRegistration"; + + /// + /// The target silo address for migration. + /// + public const string MigrationTargetSilo = "orleans.migration.target.silo"; +} + diff --git a/src/Orleans.Core.Abstractions/Diagnostics/OpenTelemetryHeaders.cs b/src/Orleans.Core.Abstractions/Diagnostics/OpenTelemetryHeaders.cs new file mode 100644 index 00000000000..c495584c2e3 --- /dev/null +++ b/src/Orleans.Core.Abstractions/Diagnostics/OpenTelemetryHeaders.cs @@ -0,0 +1,7 @@ +namespace Orleans.Diagnostics; + +internal static class OpenTelemetryHeaders +{ + internal const string TraceParent = "traceparent"; + internal const string TraceState = "tracestate"; +} diff --git a/src/Orleans.Core.Abstractions/Properties/AssemblyInfo.cs b/src/Orleans.Core.Abstractions/Properties/AssemblyInfo.cs index 2cdac083250..3700dce0a93 100644 --- a/src/Orleans.Core.Abstractions/Properties/AssemblyInfo.cs +++ b/src/Orleans.Core.Abstractions/Properties/AssemblyInfo.cs @@ -12,6 +12,7 @@ [assembly: InternalsVisibleTo("ServiceBus.Tests")] [assembly: InternalsVisibleTo("Tester.AzureUtils")] [assembly: InternalsVisibleTo("AWSUtils.Tests")] +[assembly: InternalsVisibleTo("Tester")] [assembly: InternalsVisibleTo("TesterInternal")] [assembly: InternalsVisibleTo("TestInternalGrainInterfaces")] [assembly: InternalsVisibleTo("TestInternalGrains")] diff --git a/src/Orleans.Core.Abstractions/Runtime/AsyncEnumerableRequest.cs b/src/Orleans.Core.Abstractions/Runtime/AsyncEnumerableRequest.cs index ded1c094bd8..f5aa55d94f3 100644 --- a/src/Orleans.Core.Abstractions/Runtime/AsyncEnumerableRequest.cs +++ b/src/Orleans.Core.Abstractions/Runtime/AsyncEnumerableRequest.cs @@ -9,6 +9,7 @@ using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Orleans.Concurrency; +using Orleans.Diagnostics; using Orleans.Invocation; using Orleans.Serialization.Invocation; @@ -199,6 +200,7 @@ internal sealed class AsyncEnumeratorProxy : IAsyncEnumerator private int _batchIndex; private bool _disposed; private bool _isInitialized; + private Activity? _sessionActivity; private bool IsBatch => (_current.State & EnumerationResult.Batch) != 0; private bool IsElement => (_current.State & EnumerationResult.Element) != 0; @@ -270,6 +272,9 @@ public async ValueTask DisposeAsync() if (_isInitialized) { + // Restore the session activity as the current activity so that DisposeAsync RPC is parented to it + var previousActivity = Activity.Current; + Activity.Current = _sessionActivity; try { await _target.DisposeAsync(_requestId); @@ -279,9 +284,18 @@ public async ValueTask DisposeAsync() var logger = ((GrainReference)_target).Shared.ServiceProvider.GetRequiredService>>(); logger.LogWarning(exception, "Failed to dispose async enumerator."); } + finally + { + Activity.Current = previousActivity; + } } _cancellationTokenSource?.Dispose(); + + // Stop the session activity after DisposeAsync completes + _sessionActivity?.Stop(); + _sessionActivity?.Dispose(); + _disposed = true; } @@ -302,6 +316,14 @@ public async ValueTask MoveNextAsync() } var isActive = _isInitialized; + + // Restore the session activity as the current activity so that RPC calls are parented to it + var previousActivity = Activity.Current; + if (_sessionActivity is not null) + { + Activity.Current = _sessionActivity; + } + try { (EnumerationResult Status, object Value) result; @@ -311,6 +333,11 @@ public async ValueTask MoveNextAsync() if (!_isInitialized) { + // Start the session activity on first enumeration call + // This span wraps the entire enumeration session + _sessionActivity = ActivitySources.ApplicationGrainSource.StartActivity(_request.GetActivityName(), ActivityKind.Client); + _sessionActivity?.SetTag(ActivityTagKeys.AsyncEnumerableRequestId, _requestId.ToString()); + // Assume the enumerator is active as soon as the call begins. isActive = true; result = await _target.StartEnumeration(_requestId, _request, _cancellationToken); @@ -324,10 +351,12 @@ public async ValueTask MoveNextAsync() isActive = result.Status.IsActive(); if (result.Status is EnumerationResult.Error) { + _sessionActivity?.SetStatus(ActivityStatusCode.Error); ExceptionDispatchInfo.Capture((Exception)result.Value).Throw(); } else if (result.Status is EnumerationResult.Canceled) { + _sessionActivity?.SetStatus(ActivityStatusCode.Error, "Canceled"); throw new OperationCanceledException(); } @@ -339,6 +368,7 @@ public async ValueTask MoveNextAsync() if (result.Status is EnumerationResult.MissingEnumeratorError) { + _sessionActivity?.SetStatus(ActivityStatusCode.Error, "MissingEnumerator"); throw new EnumerationAbortedException("Enumeration aborted: the remote target does not have a record of this enumerator." + " This likely indicates that the remote grain was deactivated since enumeration begun or that the enumerator was idle for longer than the expiration period."); } @@ -356,6 +386,11 @@ await _target.DisposeAsync(_requestId).AsTask() .ConfigureAwait(ConfigureAwaitOptions.ContinueOnCapturedContext | ConfigureAwaitOptions.SuppressThrowing); throw; } + finally + { + // Restore the previous activity after each call + Activity.Current = previousActivity; + } } } diff --git a/src/Orleans.Core/Diagnostics/ActivityNames.cs b/src/Orleans.Core/Diagnostics/ActivityNames.cs new file mode 100644 index 00000000000..7f92b50dbef --- /dev/null +++ b/src/Orleans.Core/Diagnostics/ActivityNames.cs @@ -0,0 +1,15 @@ +namespace Orleans.Runtime; + +public static class ActivityNames +{ + public const string PlaceGrain = "place grain"; + public const string FilterPlacementCandidates = "filter placement candidates"; + public const string ActivateGrain = "activate grain"; + public const string OnActivate = "execute OnActivateAsync"; + public const string RegisterDirectoryEntry = "register directory entry"; + public const string StorageRead = "read storage"; + public const string StorageWrite = "write storage"; + public const string StorageClear = "clear storage"; + public const string ActivationDehydrate = "dehydrate activation"; + public const string ActivationRehydrate = "rehydrate activation"; +} diff --git a/src/Orleans.Core/Diagnostics/ActivityPropagationGrainCallFilter.cs b/src/Orleans.Core/Diagnostics/ActivityPropagationGrainCallFilter.cs index 0b1534c110d..99732e73505 100644 --- a/src/Orleans.Core/Diagnostics/ActivityPropagationGrainCallFilter.cs +++ b/src/Orleans.Core/Diagnostics/ActivityPropagationGrainCallFilter.cs @@ -1,7 +1,5 @@ -using System; -using System.Collections.Generic; using System.Diagnostics; -using System.Threading.Tasks; +using Orleans.Diagnostics; namespace Orleans.Runtime { @@ -10,21 +8,36 @@ namespace Orleans.Runtime /// internal abstract class ActivityPropagationGrainCallFilter { - protected const string TraceParentHeaderName = "traceparent"; - protected const string TraceStateHeaderName = "tracestate"; - internal const string RpcSystem = "orleans"; internal const string OrleansNamespacePrefix = "Orleans"; - internal const string ApplicationGrainActivitySourceName = "Microsoft.Orleans.Application"; - internal const string RuntimeActivitySourceName = "Microsoft.Orleans.Runtime"; - protected static readonly ActivitySource ApplicationGrainSource = new(ApplicationGrainActivitySourceName, "1.0.0"); - protected static readonly ActivitySource RuntimeGrainSource = new(RuntimeActivitySourceName, "1.0.0"); + protected static ActivitySource GetActivitySource(IGrainCallContext context) + { + var interfaceType = context.Request.GetInterfaceType(); + var interfaceTypeName = interfaceType.Name; - protected static ActivitySource GetActivitySource(IGrainCallContext context) => - context.Request.GetInterfaceType().Namespace?.StartsWith(OrleansNamespacePrefix) == true - ? RuntimeGrainSource - : ApplicationGrainSource; + switch (interfaceTypeName) + { + // Memory storage uses grains for its implementation + case "IMemoryStorageGrain": + return ActivitySources.StorageGrainSource; + + // This extension is for explicit migrate/deactivate calls + case "IGrainManagementExtension": + // This target is for accepting migration batches + case "IActivationMigrationManagerSystemTarget": + return ActivitySources.LifecycleGrainSource; + + // These extensions are for async stream subscriptions + case "IAsyncEnumerableGrainExtension": + return ActivitySources.ApplicationGrainSource; + + default: + return interfaceType.Namespace?.StartsWith(OrleansNamespacePrefix) == true + ? ActivitySources.RuntimeGrainSource + : ActivitySources.ApplicationGrainSource; + } + } protected static void GetRequestContextValue(object carrier, string fieldName, out string fieldValue, out IEnumerable fieldValues) { @@ -37,17 +50,17 @@ protected static async Task Process(IGrainCallContext context, Activity activity if (activity is not null) { // rpc attributes from https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/rpc.md - activity.SetTag("rpc.system", RpcSystem); - activity.SetTag("rpc.service", context.InterfaceName); - activity.SetTag("rpc.method", context.MethodName); + activity.SetTag(ActivityTagKeys.RpcSystem, RpcSystem); + activity.SetTag(ActivityTagKeys.RpcService, context.InterfaceName); + activity.SetTag(ActivityTagKeys.RpcMethod, context.MethodName); if (activity.IsAllDataRequested) { // Custom attributes - activity.SetTag("rpc.orleans.target_id", context.TargetId.ToString()); + activity.SetTag(ActivityTagKeys.RpcOrleansTargetId, context.TargetId.ToString()); if (context.SourceId is GrainId sourceId) { - activity.SetTag("rpc.orleans.source_id", sourceId.ToString()); + activity.SetTag(ActivityTagKeys.RpcOrleansSourceId, sourceId.ToString()); } } } @@ -63,14 +76,14 @@ protected static async Task Process(IGrainCallContext context, Activity activity activity.SetStatus(ActivityStatusCode.Error); // exception attributes from https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/exceptions.md - activity.SetTag("exception.type", e.GetType().FullName); - activity.SetTag("exception.message", e.Message); + activity.SetTag(ActivityTagKeys.ExceptionType, e.GetType().FullName); + activity.SetTag(ActivityTagKeys.ExceptionMessage, e.Message); // Note that "exception.stacktrace" is the full exception detail, not just the StackTrace property. // See https://opentelemetry.io/docs/specs/semconv/attributes-registry/exception/ // and https://github.com/open-telemetry/opentelemetry-specification/pull/697#discussion_r453662519 - activity.SetTag("exception.stacktrace", e.ToString()); - activity.SetTag("exception.escaped", true); + activity.SetTag(ActivityTagKeys.ExceptionStacktrace, e.ToString()); + activity.SetTag(ActivityTagKeys.ExceptionEscaped, true); } throw; diff --git a/src/Orleans.Runtime/Catalog/ActivationData.cs b/src/Orleans.Runtime/Catalog/ActivationData.cs index c5b999bad5a..16c9c82a0a0 100644 --- a/src/Orleans.Runtime/Catalog/ActivationData.cs +++ b/src/Orleans.Runtime/Catalog/ActivationData.cs @@ -1,17 +1,14 @@ #nullable enable -using System; using System.Buffers; -using System.Collections.Generic; using System.Diagnostics; using System.Diagnostics.CodeAnalysis; using System.Runtime.CompilerServices; using System.Runtime.InteropServices; -using System.Threading; -using System.Threading.Tasks; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Orleans.Configuration; using Orleans.Core.Internal; +using Orleans.Diagnostics; using Orleans.GrainDirectory; using Orleans.Internal; using Orleans.Runtime.Placement; @@ -25,6 +22,7 @@ namespace Orleans.Runtime; /// /// Maintains additional per-activation state that is required for Orleans internal operations. /// MUST lock this object for any concurrent access +/// MUST lock on `this` object because there are locks taken on ActivationData instances in various places in the codebase such as ActivationCollector.ScheduleCollection. /// Consider: compartmentalize by usage, e.g., using separate interfaces for data for catalog, etc. /// [DebuggerDisplay("GrainId = {GrainId}, State = {State}, Waiting = {WaitingCount}, Executing = {IsCurrentlyExecuting}")] @@ -66,6 +64,23 @@ internal sealed partial class ActivationData : private Task? _messageLoopTask; #pragma warning restore IDE0052 // Remove unread private members + private Activity? _activationActivity; + + /// + /// Constants for activity error event names used during activation lifecycle. + /// + private static class ActivityErrorEvents + { + public const string InstanceCreateFailed = "instance-create-failed"; + public const string DirectoryRegisterFailed = "directory-register-failed"; + public const string ActivationAborted = "activation-aborted"; + public const string ActivationFailed = "activation-failed"; + public const string ActivationError = "activation-error"; + public const string OnActivateFailed = "on-activate-failed"; + public const string RehydrateError = "rehydrate-error"; + public const string DehydrateError = "dehydrate-error"; + } + public ActivationData( GrainAddress grainAddress, Func createWorkItemGroup, @@ -84,18 +99,35 @@ public ActivationData( Debug.Assert(_workItemGroup != null, "_workItemGroup must not be null."); } + internal void SetActivationActivity(Activity activity) + { + _activationActivity = activity; + } + + /// + /// Gets the activity context for the activation activity, if available. + /// This allows child activities to be properly parented during activation lifecycle operations. + /// + internal ActivityContext? GetActivationActivityContext() + { + return _activationActivity?.Context; + } + public void Start(IGrainActivator grainActivator) { Debug.Assert(Equals(ActivationTaskScheduler, TaskScheduler.Current)); + // locking on `this` is intentional as there are other places in the codebase taking locks on ActivationData instances lock (this) { try { var instance = grainActivator.CreateInstance(this); SetGrainInstance(instance); + _activationActivity?.AddEvent(new ActivityEvent("instance-created")); } catch (Exception exception) { + SetActivityError(_activationActivity, exception, ActivityErrorEvents.InstanceCreateFailed); Deactivate(new(DeactivationReasonCode.ActivationFailed, exception, "Error constructing grain instance."), CancellationToken.None); } @@ -1171,7 +1203,10 @@ async Task ProcessOperationsAsync() } finally { - await DisposeAsync(op); + if (op is not null) + { + await DisposeAsync(op); + } } } } @@ -1179,22 +1214,44 @@ async Task ProcessOperationsAsync() private void RehydrateInternal(IRehydrationContext context) { + Activity? rehydrateSpan = null; try { LogRehydratingGrain(_shared.Logger, this); + var grainMigrationParticipant = GrainInstance as IGrainMigrationParticipant; + + if (grainMigrationParticipant is not null) + { + // Start a span for rehydration + rehydrateSpan = _activationActivity is not null + ? ActivitySources.LifecycleGrainSource.StartActivity(ActivityNames.ActivationRehydrate, + ActivityKind.Internal, _activationActivity.Context) + : ActivitySources.LifecycleGrainSource.StartActivity(ActivityNames.ActivationRehydrate, + ActivityKind.Internal); + rehydrateSpan?.SetTag(ActivityTagKeys.GrainId, GrainId.ToString()); + rehydrateSpan?.SetTag(ActivityTagKeys.GrainType, _shared.GrainTypeName); + rehydrateSpan?.SetTag(ActivityTagKeys.SiloId, _shared.Runtime.SiloAddress.ToString()); + rehydrateSpan?.SetTag(ActivityTagKeys.ActivationId, ActivationId.ToString()); + } + lock (this) { if (State != ActivationState.Creating) { LogIgnoringRehydrateAttempt(_shared.Logger, this, State); + rehydrateSpan?.SetTag(ActivityTagKeys.RehydrateIgnored, true); + rehydrateSpan?.SetTag(ActivityTagKeys.RehydrateIgnoredReason, $"State is {State}"); return; } - if (context.TryGetValue(GrainAddressMigrationContextKey, out GrainAddress? previousRegistration) && previousRegistration is not null) + if (context.TryGetValue(GrainAddressMigrationContextKey, out GrainAddress? previousRegistration) && + previousRegistration is not null) { PreviousRegistration = previousRegistration; LogPreviousActivationAddress(_shared.Logger, previousRegistration); + rehydrateSpan?.SetTag(ActivityTagKeys.RehydratePreviousRegistration, + previousRegistration.ToFullString()); } if (_lifecycle is { } lifecycle) @@ -1205,14 +1262,20 @@ private void RehydrateInternal(IRehydrationContext context) } } - (GrainInstance as IGrainMigrationParticipant)?.OnRehydrate(context); + grainMigrationParticipant?.OnRehydrate(context); } LogRehydratedGrain(_shared.Logger); + rehydrateSpan?.AddEvent(new ActivityEvent("rehydrated")); } catch (Exception exception) { LogErrorRehydratingActivation(_shared.Logger, exception); + SetActivityError(rehydrateSpan, exception, ActivityErrorEvents.RehydrateError); + } + finally + { + rehydrateSpan?.Dispose(); } } @@ -1228,11 +1291,35 @@ private void OnDehydrate(IDehydrationContext context) { context.TryAddValue(GrainAddressMigrationContextKey, Address); } - + + Activity? dehydrateSpan = null; try { + // Get the parent activity context from the dehydration context holder (captured when migration was initiated) + var parentContext = DehydrationContext?.MigrationActivityContext; + + var grainMigrationParticipant = GrainInstance as IGrainMigrationParticipant; + + if (grainMigrationParticipant is not null) + { + // Start a span for dehydration, parented to the migration request that triggered it + dehydrateSpan = parentContext.HasValue + ? ActivitySources.LifecycleGrainSource.StartActivity(ActivityNames.ActivationDehydrate, + ActivityKind.Internal, parentContext.Value) + : ActivitySources.LifecycleGrainSource.StartActivity(ActivityNames.ActivationDehydrate, + ActivityKind.Internal); + dehydrateSpan?.SetTag(ActivityTagKeys.GrainId, GrainId.ToString()); + dehydrateSpan?.SetTag(ActivityTagKeys.GrainType, _shared.GrainTypeName); + dehydrateSpan?.SetTag(ActivityTagKeys.SiloId, _shared.Runtime.SiloAddress.ToString()); + dehydrateSpan?.SetTag(ActivityTagKeys.ActivationId, ActivationId.ToString()); + if (ForwardingAddress is { } fwd) + { + dehydrateSpan?.SetTag(ActivityTagKeys.MigrationTargetSilo, fwd.ToString()); + } + } + // Note that these calls are in reverse order from Rehydrate, not for any particular reason other than symmetry. - (GrainInstance as IGrainMigrationParticipant)?.OnDehydrate(context); + grainMigrationParticipant?.OnDehydrate(context); if (_lifecycle is { } lifecycle) { @@ -1245,12 +1332,18 @@ private void OnDehydrate(IDehydrationContext context) catch (Exception exception) { LogErrorDehydratingActivation(_shared.Logger, exception); + SetActivityError(dehydrateSpan, exception, ActivityErrorEvents.DehydrateError); + } + finally + { + dehydrateSpan?.Dispose(); } } LogDehydratedActivation(_shared.Logger); } + /// /// Handle an incoming message and queue/invoke appropriate handler /// @@ -1469,83 +1562,110 @@ private async Task ActivateAsync(Dictionary? requestContextData, return; } - // A chain of promises that will have to complete in order to complete the activation - // Register with the grain directory and call the Activate method on the new activation. + _activationActivity?.AddEvent(new ActivityEvent("activation-start")); try { - // Currently, the only grain type that is not registered in the Grain Directory is StatelessWorker. - // Among those that are registered in the directory, we currently do not have any multi activations. if (IsUsingGrainDirectory) { - Exception? registrationException; - var previousRegistration = PreviousRegistration; bool success; - try + Exception? registrationException; + + // Start directory registration activity as a child of the activation activity + using (var registerSpan = _activationActivity is not null + ? ActivitySources.LifecycleGrainSource.StartActivity(ActivityNames.RegisterDirectoryEntry, ActivityKind.Internal, _activationActivity.Context) + : ActivitySources.LifecycleGrainSource.StartActivity(ActivityNames.RegisterDirectoryEntry, ActivityKind.Internal)) { - while (true) + registerSpan?.SetTag(ActivityTagKeys.GrainId, GrainId.ToString()); + registerSpan?.SetTag(ActivityTagKeys.SiloId, _shared.Runtime.SiloAddress.ToString()); + registerSpan?.SetTag(ActivityTagKeys.ActivationId, ActivationId.ToString()); + registerSpan?.SetTag(ActivityTagKeys.DirectoryPreviousRegistrationPresent, + PreviousRegistration is not null); + var previousRegistration = PreviousRegistration; + + try { - LogRegisteringGrain(_shared.Logger, this, previousRegistration); - - var result = await _shared.InternalRuntime.GrainLocator.Register(Address, previousRegistration).WaitAsync(cancellationToken); - if (Address.Matches(result)) + while (true) { - Address = result; - success = true; - } - else if (result?.SiloAddress is { } registeredSilo && registeredSilo.Equals(Address.SiloAddress)) - { - // Attempt to register this activation again, using the registration of the previous instance of this grain, - // which is registered to this silo. That activation must be a defunct predecessor of this activation, - // since the catalog only allows one activation of a given grain at a time. - // This could occur if the previous activation failed to unregister itself from the grain directory. - previousRegistration = result; - LogAttemptToRegisterWithPreviousActivation(_shared.Logger, GrainId, result); - continue; - } - else - { - // Set the forwarding address so that messages enqueued on this activation can be forwarded to - // the existing activation. - ForwardingAddress = result?.SiloAddress; - if (ForwardingAddress is { } address) + LogRegisteringGrain(_shared.Logger, this, previousRegistration); + + var result = await _shared.InternalRuntime.GrainLocator + .Register(Address, previousRegistration).WaitAsync(cancellationToken); + if (Address.Matches(result)) + { + Address = result; + success = true; + _activationActivity?.AddEvent(new ActivityEvent("directory-register-success")); + registerSpan?.AddEvent(new ActivityEvent("success")); + registerSpan?.SetTag(ActivityTagKeys.DirectoryRegisteredAddress, result.ToFullString()); + } + else if (result?.SiloAddress is { } registeredSilo && + registeredSilo.Equals(Address.SiloAddress)) { - DeactivationReason = new(DeactivationReasonCode.DuplicateActivation, $"This grain is active on another host ({address})."); + previousRegistration = result; + LogAttemptToRegisterWithPreviousActivation(_shared.Logger, GrainId, result); + _activationActivity?.AddEvent(new ActivityEvent("directory-register-retry-previous")); + registerSpan?.AddEvent(new ActivityEvent("retry-previous")); + continue; } + else + { + ForwardingAddress = result?.SiloAddress; + if (ForwardingAddress is { } address) + { + DeactivationReason = new(DeactivationReasonCode.DuplicateActivation, + $"This grain is active on another host ({address})."); + } - success = false; - CatalogInstruments.ActivationConcurrentRegistrationAttempts.Add(1); - // If this was a duplicate, it's not an error, just a race. - // Forward on all of the pending messages, and then forget about this activation. - LogDuplicateActivation( - _shared.Logger, - Address, - ForwardingAddress, - GrainInstance?.GetType(), - new(Address), - WaitingCount); + success = false; + CatalogInstruments.ActivationConcurrentRegistrationAttempts.Add(1); + LogDuplicateActivation( + _shared.Logger, + Address, + ForwardingAddress, + GrainInstance?.GetType(), + new(Address), + WaitingCount); + _activationActivity?.AddEvent(new ActivityEvent("duplicate-activation")); + registerSpan?.AddEvent(new ActivityEvent("duplicate")); + if (ForwardingAddress is { } fwd) + { + registerSpan?.SetTag(ActivityTagKeys.DirectoryForwardingAddress, fwd.ToString()); + } + } + + break; } - break; + registrationException = null; } - - registrationException = null; - } - catch (Exception exception) - { - registrationException = exception; - if (!cancellationToken.IsCancellationRequested) + catch (Exception exception) { - LogFailedToRegisterGrain(_shared.Logger, registrationException, this); + registrationException = exception; + if (!cancellationToken.IsCancellationRequested) + { + LogFailedToRegisterGrain(_shared.Logger, registrationException, this); + } + + success = false; + _activationActivity?.AddEvent(new ActivityEvent("directory-register-failed")); + SetActivityError(registerSpan, exception, ActivityErrorEvents.DirectoryRegisterFailed); } - success = false; } - if (!success) { Deactivate(new(DeactivationReasonCode.DirectoryFailure, registrationException, "Failed to register activation in grain directory.")); // Activation failed. + if (registrationException is not null) + { + SetActivityError(_activationActivity, registrationException, ActivityErrorEvents.ActivationAborted); + } + else + { + SetActivityError(_activationActivity, ActivityErrorEvents.ActivationAborted); + } + return; } } @@ -1554,10 +1674,10 @@ private async Task ActivateAsync(Dictionary? requestContextData, { SetState(ActivationState.Activating); } + _activationActivity?.AddEvent(new ActivityEvent("state-activating")); LogActivatingGrain(_shared.Logger, this); - // Start grain lifecycle within try-catch wrapper to safely capture any exceptions thrown from called function try { RequestContextExtensions.Import(requestContextData); @@ -1565,24 +1685,39 @@ private async Task ActivateAsync(Dictionary? requestContextData, { if (_lifecycle is { } lifecycle) { + _activationActivity?.AddEvent(new ActivityEvent("lifecycle-start")); await lifecycle.OnStart(cancellationToken).WaitAsync(cancellationToken); + _activationActivity?.AddEvent(new ActivityEvent("lifecycle-started")); } } catch (Exception exception) { LogErrorStartingLifecycle(_shared.Logger, exception, this); + _activationActivity?.AddEvent(new ActivityEvent("lifecycle-start-failed")); throw; } if (GrainInstance is IGrainBase grainBase) { + // Start a span for OnActivateAsync execution + using var onActivateSpan = _activationActivity is not null + ? ActivitySources.LifecycleGrainSource.StartActivity(ActivityNames.OnActivate, ActivityKind.Internal, _activationActivity.Context) + : ActivitySources.LifecycleGrainSource.StartActivity(ActivityNames.OnActivate, ActivityKind.Internal); + onActivateSpan?.SetTag(ActivityTagKeys.GrainId, GrainId.ToString()); + onActivateSpan?.SetTag(ActivityTagKeys.GrainType, _shared.GrainTypeName ?? GrainInstance.GetType().FullName); + onActivateSpan?.SetTag(ActivityTagKeys.SiloId, _shared.Runtime.SiloAddress.ToString()); + onActivateSpan?.SetTag(ActivityTagKeys.ActivationId, ActivationId.ToString()); + try { + _activationActivity?.AddEvent(new ActivityEvent("on-activate-start")); await grainBase.OnActivateAsync(cancellationToken).WaitAsync(cancellationToken); + _activationActivity?.AddEvent(new ActivityEvent("on-activate-complete")); } catch (Exception exception) { LogErrorInGrainMethod(_shared.Logger, exception, nameof(IGrainBase.OnActivateAsync), this); + SetActivityError(onActivateSpan, exception, ActivityErrorEvents.OnActivateFailed); throw; } } @@ -1591,35 +1726,27 @@ private async Task ActivateAsync(Dictionary? requestContextData, { if (State is ActivationState.Activating) { - SetState(ActivationState.Valid); // Activate calls on this activation are finished + SetState(ActivationState.Valid); _shared.InternalRuntime.ActivationWorkingSet.OnActivated(this); } } + _activationActivity?.AddEvent(new ActivityEvent("state-valid")); + _activationActivity?.Stop(); LogFinishedActivatingGrain(_shared.Logger, this); } catch (Exception exception) { CatalogInstruments.ActivationFailedToActivate.Add(1); - - // Capture the exception so that it can be propagated to rejection messages var sourceException = (exception as OrleansLifecycleCanceledException)?.InnerException ?? exception; LogErrorActivatingGrain(_shared.Logger, sourceException, this); - - // Unregister this as a message target after some period of time. - // This is delayed so that consistently failing activation, perhaps due to an application bug or network - // issue, does not cause a flood of doomed activations. - // If the cancellation token was canceled, there is no need to wait an additional time, since the activation - // has already waited some significant amount of time. if (!cancellationToken.IsCancellationRequested) { ScheduleOperation(new Command.Delay(TimeSpan.FromSeconds(5))); } - - // Perform the required deactivation steps. Deactivate(new(DeactivationReasonCode.ActivationFailed, sourceException, "Failed to activate grain.")); - - // Activation failed. + SetActivityError(_activationActivity, ActivityErrorEvents.ActivationFailed); + _activationActivity?.Stop(); return; } } @@ -1627,12 +1754,33 @@ private async Task ActivateAsync(Dictionary? requestContextData, { LogActivationFailed(_shared.Logger, exception, this); Deactivate(new(DeactivationReasonCode.ApplicationError, exception, "Failed to activate grain.")); + SetActivityError(_activationActivity, ActivityErrorEvents.ActivationError); + _activationActivity?.Stop(); } finally { _workSignal.Signal(); } } + + private void SetActivityError(Activity? erroredActivity, string? errorEventName) + { + if (erroredActivity is { } activity) + { + activity.SetStatus(ActivityStatusCode.Error, errorEventName); + } + } + + private void SetActivityError(Activity? erroredActivity, Exception exception, string? errorEventName) + { + if (erroredActivity is { } activity) + { + activity.SetStatus(ActivityStatusCode.Error, errorEventName); + activity.SetTag(ActivityTagKeys.ExceptionType, exception.GetType().FullName); + activity.SetTag(ActivityTagKeys.ExceptionMessage, exception.Message); + } + } + #endregion #region Deactivation @@ -2179,6 +2327,12 @@ private class DehydrationContextHolder(SerializerSessionPool sessionPool, Dictio { public readonly MigrationContext MigrationContext = new(sessionPool); public readonly Dictionary? RequestContext = requestContext; + + /// + /// The activity context from the grain call that initiated the migration. + /// This is used to parent the dehydrate span to the migration request trace. + /// + public ActivityContext? MigrationActivityContext { get; set; } = Activity.Current?.Context; } [LoggerMessage( @@ -2361,11 +2515,6 @@ private readonly struct ActivationDataLogValue(ActivationData activation, bool i Message = "Error activating grain {Grain}")] private static partial void LogErrorActivatingGrain(ILogger logger, Exception exception, ActivationData grain); - [LoggerMessage( - Level = LogLevel.Error, - Message = "Activation of grain {Grain} failed")] - private static partial void LogActivationFailed(ILogger logger, Exception exception, ActivationData grain); - [LoggerMessage( Level = LogLevel.Trace, Message = "Completing deactivation of '{Activation}'")] @@ -2432,4 +2581,9 @@ private static partial void LogDuplicateActivation( Level = LogLevel.Debug, Message = "Rerouting {NumMessages} messages from invalid grain activation {Grain}")] private static partial void LogReroutingMessagesNoForwarding(ILogger logger, int numMessages, ActivationData grain); + + [LoggerMessage( + Level = LogLevel.Error, + Message = "Activation of grain {Grain} failed")] + private static partial void LogActivationFailed(ILogger logger, Exception exception, ActivationData grain); } diff --git a/src/Orleans.Runtime/Catalog/Catalog.cs b/src/Orleans.Runtime/Catalog/Catalog.cs index 2783b6f9e87..7a63c2faf77 100644 --- a/src/Orleans.Runtime/Catalog/Catalog.cs +++ b/src/Orleans.Runtime/Catalog/Catalog.cs @@ -1,12 +1,10 @@ -using System; -using System.Collections.Generic; using System.Runtime.CompilerServices; -using System.Threading; -using System.Threading.Tasks; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Orleans.GrainDirectory; using Orleans.Runtime.GrainDirectory; +using System.Diagnostics; +using Orleans.Diagnostics; namespace Orleans.Runtime { @@ -168,6 +166,25 @@ public IGrainContext GetOrCreateActivation( return UnableToCreateActivation(this, grainId); } + // Start activation span with parent context from request if available + var parentContext = TryGetActivityContext(requestContextData); + using var activationActivity = parentContext.HasValue + ? ActivitySources.LifecycleGrainSource.StartActivity(ActivityNames.ActivateGrain, ActivityKind.Internal, parentContext.Value) + : ActivitySources.LifecycleGrainSource.StartActivity(ActivityNames.ActivateGrain, ActivityKind.Internal); + if (activationActivity is not null) + { + activationActivity.SetTag(ActivityTagKeys.GrainId, grainId.ToString()); + activationActivity.SetTag(ActivityTagKeys.GrainType, grainId.Type.ToString()); + activationActivity.SetTag(ActivityTagKeys.SiloId, Silo.ToString()); + activationActivity.SetTag(ActivityTagKeys.ActivationCause, rehydrationContext is null ? "new" : "rehydrate"); + if (result is ActivationData act) + { + activationActivity.SetTag(ActivityTagKeys.ActivationId, act.ActivationId.ToString()); + act.SetActivationActivity(activationActivity); + activationActivity.AddEvent(new ActivityEvent("creating")); + } + } + CatalogInstruments.ActivationsCreated.Add(1); // Rehydration occurs before activation. @@ -431,5 +448,35 @@ private readonly struct SiloAddressLogValue(SiloAddress silo) )] private partial void LogFailedToUnregisterNonExistingActivation(GrainAddress address, Exception exception); + /// + /// Extracts an ActivityContext from request context data if present. + /// + private static ActivityContext? TryGetActivityContext(Dictionary requestContextData) + { + if (requestContextData is not { Count: > 0 }) + { + return null; + } + + string traceParent = null; + string traceState = null; + + if (requestContextData.TryGetValue(OpenTelemetryHeaders.TraceParent, out var traceParentObj) && traceParentObj is string tp) + { + traceParent = tp; + } + + if (requestContextData.TryGetValue(OpenTelemetryHeaders.TraceState, out var traceStateObj) && traceStateObj is string ts) + { + traceState = ts; + } + + if (!string.IsNullOrEmpty(traceParent) && ActivityContext.TryParse(traceParent, traceState, isRemote: true, out var parentContext)) + { + return parentContext; + } + + return null; + } } } diff --git a/src/Orleans.Runtime/Orleans.Runtime.csproj b/src/Orleans.Runtime/Orleans.Runtime.csproj index f9f503df5a3..1c15e73b3ae 100644 --- a/src/Orleans.Runtime/Orleans.Runtime.csproj +++ b/src/Orleans.Runtime/Orleans.Runtime.csproj @@ -16,4 +16,8 @@ + + + + diff --git a/src/Orleans.Runtime/Placement/PlacementService.cs b/src/Orleans.Runtime/Placement/PlacementService.cs index 92acac246a3..d1ea7df88cb 100644 --- a/src/Orleans.Runtime/Placement/PlacementService.cs +++ b/src/Orleans.Runtime/Placement/PlacementService.cs @@ -1,14 +1,10 @@ -using System; -using System.Collections.Generic; using System.Diagnostics; -using System.Linq; using System.Runtime.CompilerServices; using System.Runtime.InteropServices; -using System.Threading.Tasks; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Orleans.Configuration; -using Orleans.Placement; +using Orleans.Diagnostics; using Orleans.Runtime.GrainDirectory; using Orleans.Runtime.Internal; using Orleans.Runtime.Placement.Filtering; @@ -124,6 +120,12 @@ public SiloAddress[] GetCompatibleSilos(PlacementTarget target) foreach (var placementFilter in filters) { var director = _placementFilterDirectoryResolver.GetFilterDirector(placementFilter); + + // Create a span for each filter invocation + using var filterSpan = ActivitySources.LifecycleGrainSource.StartActivity(ActivityNames.FilterPlacementCandidates); + filterSpan?.SetTag(ActivityTagKeys.PlacementFilterType, placementFilter.GetType().Name); + filterSpan?.SetTag(ActivityTagKeys.GrainType, grainType.ToString()); + filteredSilos = director.Filter(placementFilter, target, filteredSilos); } @@ -367,32 +369,48 @@ private void AddressWaitingMessages(GrainPlacementWorkItem completedWorkItem) private async Task GetOrPlaceActivationAsync(Message firstMessage) { await Task.Yield(); - var target = new PlacementTarget( - firstMessage.TargetGrain, - firstMessage.RequestContextData, - firstMessage.InterfaceType, - firstMessage.InterfaceVersion); - - var targetGrain = target.GrainIdentity; - var result = await _placementService._grainLocator.Lookup(targetGrain); - if (result is not null) - { - return result.SiloAddress; - } - var strategy = _placementService._strategyResolver.GetPlacementStrategy(target.GrainIdentity.Type); - var director = _placementService._directorResolver.GetPlacementDirector(strategy); - var siloAddress = await director.OnAddActivation(strategy, target, _placementService); + // InnerGetOrPlaceActivationAsync may set a new activity as current from the RequestContextData, + // so we need to save and restore the current activity. + var currentActivity = Activity.Current; + var activationLocation = await InnerGetOrPlaceActivationAsync(); + Activity.Current = currentActivity; - // Give the grain locator one last chance to tell us that the grain has already been placed - if (_placementService._grainLocator.TryLookupInCache(targetGrain, out result) && _placementService.CachedAddressIsValid(firstMessage, result)) + return activationLocation; + + async Task InnerGetOrPlaceActivationAsync() { - return result.SiloAddress; - } + // Restore activity context from the message's request context data + // This ensures directory lookups are properly traced as children of the original request + using var restoredActivity = TryRestoreActivityContext(firstMessage.RequestContextData, ActivityNames.PlaceGrain); + + var target = new PlacementTarget( + firstMessage.TargetGrain, + firstMessage.RequestContextData, + firstMessage.InterfaceType, + firstMessage.InterfaceVersion); + + var targetGrain = target.GrainIdentity; + var result = await _placementService._grainLocator.Lookup(targetGrain); + if (result is not null) + { + return result.SiloAddress; + } - _placementService._grainLocator.InvalidateCache(targetGrain); - _placementService._grainLocator.UpdateCache(targetGrain, siloAddress); - return siloAddress; + var strategy = _placementService._strategyResolver.GetPlacementStrategy(target.GrainIdentity.Type); + var director = _placementService._directorResolver.GetPlacementDirector(strategy); + var siloAddress = await director.OnAddActivation(strategy, target, _placementService); + + // Give the grain locator one last chance to tell us that the grain has already been placed + if (_placementService._grainLocator.TryLookupInCache(targetGrain, out result) && _placementService.CachedAddressIsValid(firstMessage, result)) + { + return result.SiloAddress; + } + + _placementService._grainLocator.InvalidateCache(targetGrain); + _placementService._grainLocator.UpdateCache(targetGrain, siloAddress); + return siloAddress; + } } private class GrainPlacementWorkItem @@ -403,6 +421,38 @@ private class GrainPlacementWorkItem } } + /// + /// Attempts to restore the parent activity context from request context data. + /// + private static Activity TryRestoreActivityContext(Dictionary requestContextData, string operationName) + { + if (requestContextData is not { Count: > 0 }) + { + return null; + } + + string traceParent = null; + string traceState = null; + + if (requestContextData.TryGetValue(OpenTelemetryHeaders.TraceParent, out var traceParentObj) && traceParentObj is string tp) + { + traceParent = tp; + } + + if (requestContextData.TryGetValue(OpenTelemetryHeaders.TraceState, out var traceStateObj) && traceStateObj is string ts) + { + traceState = ts; + } + + if (!string.IsNullOrEmpty(traceParent) && ActivityContext.TryParse(traceParent, traceState, isRemote: true, out var parentContext)) + { + // Start the activity from the Catalog's ActivitySource to properly associate it with activation tracing + return ActivitySources.LifecycleGrainSource.StartActivity(operationName, ActivityKind.Internal, parentContext); + } + + return null; + } + [LoggerMessage( Level = LogLevel.Debug, Message = "Found address {Address} for grain {GrainId} in cache for message {Message}" diff --git a/src/Orleans.Runtime/Storage/StateStorageBridge.cs b/src/Orleans.Runtime/Storage/StateStorageBridge.cs index 58ddf0bd96a..3b34115cb49 100644 --- a/src/Orleans.Runtime/Storage/StateStorageBridge.cs +++ b/src/Orleans.Runtime/Storage/StateStorageBridge.cs @@ -1,12 +1,11 @@ #nullable enable -using System; using System.Collections.Concurrent; +using System.Diagnostics; using System.Diagnostics.CodeAnalysis; using System.Runtime.ExceptionServices; -using System.Threading.Tasks; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; -using Orleans.Runtime; +using Orleans.Diagnostics; using Orleans.Serialization.Activators; using Orleans.Serialization.Serializers; using Orleans.Storage; @@ -83,6 +82,22 @@ public async Task ReadStateAsync() { GrainRuntime.CheckRuntimeContext(RuntimeContext.Current); + // Try to get the parent activity context from the current activity or from the activation's stored activity + var parentContext = Activity.Current?.Context; + if (parentContext is null && _grainContext is ActivationData activationData) + { + // If we're in activation context and there's an activation activity, use it as parent + parentContext = activationData.GetActivationActivityContext(); + } + + using var activity = parentContext.HasValue + ? ActivitySources.StorageGrainSource.StartActivity(ActivityNames.StorageRead, ActivityKind.Client, parentContext.Value) + : ActivitySources.StorageGrainSource.StartActivity(ActivityNames.StorageRead, ActivityKind.Client); + activity?.SetTag(ActivityTagKeys.GrainId, _grainContext.GrainId.ToString()); + activity?.SetTag(ActivityTagKeys.StorageProvider, _shared.ProviderTypeName); + activity?.SetTag(ActivityTagKeys.StorageStateName, _shared.Name); + activity?.SetTag(ActivityTagKeys.StorageStateType, _shared.StateTypeName); + var sw = ValueStopwatch.StartNew(); await _shared.Store.ReadStateAsync(_shared.Name, _grainContext.GrainId, GrainState); IsStateInitialized = true; @@ -102,6 +117,21 @@ public async Task WriteStateAsync() { GrainRuntime.CheckRuntimeContext(RuntimeContext.Current); + // Try to get the parent activity context from the current activity or from the activation's stored activity + var parentContext = Activity.Current?.Context; + if (parentContext is null && _grainContext is ActivationData activationData) + { + parentContext = activationData.GetActivationActivityContext(); + } + + using var activity = parentContext.HasValue + ? ActivitySources.StorageGrainSource.StartActivity(ActivityNames.StorageWrite, ActivityKind.Client, parentContext.Value) + : ActivitySources.StorageGrainSource.StartActivity(ActivityNames.StorageWrite, ActivityKind.Client); + activity?.SetTag(ActivityTagKeys.GrainId, _grainContext.GrainId.ToString()); + activity?.SetTag(ActivityTagKeys.StorageProvider, _shared.ProviderTypeName); + activity?.SetTag(ActivityTagKeys.StorageStateName, _shared.Name); + activity?.SetTag(ActivityTagKeys.StorageStateType, _shared.StateTypeName); + var sw = ValueStopwatch.StartNew(); await _shared.Store.WriteStateAsync(_shared.Name, _grainContext.GrainId, GrainState); StorageInstruments.OnStorageWrite(sw.Elapsed, _shared.ProviderTypeName, _shared.Name, _shared.StateTypeName); @@ -120,6 +150,21 @@ public async Task ClearStateAsync() { GrainRuntime.CheckRuntimeContext(RuntimeContext.Current); + // Try to get the parent activity context from the current activity or from the activation's stored activity + var parentContext = Activity.Current?.Context; + if (parentContext is null && _grainContext is ActivationData activationData) + { + parentContext = activationData.GetActivationActivityContext(); + } + + using var activity = parentContext.HasValue + ? ActivitySources.StorageGrainSource.StartActivity(ActivityNames.StorageClear, ActivityKind.Client, parentContext.Value) + : ActivitySources.StorageGrainSource.StartActivity(ActivityNames.StorageClear, ActivityKind.Client); + activity?.SetTag(ActivityTagKeys.GrainId, _grainContext.GrainId.ToString()); + activity?.SetTag(ActivityTagKeys.StorageProvider, _shared.ProviderTypeName); + activity?.SetTag(ActivityTagKeys.StorageStateName, _shared.Name); + activity?.SetTag(ActivityTagKeys.StorageStateType, _shared.StateTypeName); + var sw = ValueStopwatch.StartNew(); // Clear state in external storage diff --git a/test/Grains/TestGrainInterfaces/IActivityGrain.cs b/test/Grains/TestGrainInterfaces/IActivityGrain.cs index 3bd0f5e11f9..6aef64f893f 100644 --- a/test/Grains/TestGrainInterfaces/IActivityGrain.cs +++ b/test/Grains/TestGrainInterfaces/IActivityGrain.cs @@ -5,6 +5,21 @@ public interface IActivityGrain : IGrainWithIntegerKey Task GetActivityId(); } + /// + /// Grain interface for testing IAsyncEnumerable activity tracing. + /// + public interface IAsyncEnumerableActivityGrain : IGrainWithIntegerKey + { + /// + /// Gets multiple ActivityData items as an async enumerable. + /// Each item captures the current Activity context at the time of yield. + /// + /// Number of items to yield. + /// Cancellation token. + /// An async enumerable of ActivityData items. + IAsyncEnumerable GetActivityDataStream(int count, CancellationToken cancellationToken = default); + } + [GenerateSerializer] public class ActivityData { diff --git a/test/Grains/TestGrains/ActivityGrain.cs b/test/Grains/TestGrains/ActivityGrain.cs index b3f7d75cda4..54bdbf01994 100644 --- a/test/Grains/TestGrains/ActivityGrain.cs +++ b/test/Grains/TestGrains/ActivityGrain.cs @@ -1,4 +1,5 @@ using System.Diagnostics; +using System.Runtime.CompilerServices; using UnitTests.GrainInterfaces; namespace UnitTests.Grains @@ -23,4 +24,33 @@ public Task GetActivityId() return Task.FromResult(result); } } + + /// + /// Grain implementation for testing IAsyncEnumerable activity tracing. + /// + public class AsyncEnumerableActivityGrain : Grain, IAsyncEnumerableActivityGrain + { + public async IAsyncEnumerable GetActivityDataStream(int count, [EnumeratorCancellation] CancellationToken cancellationToken = default) + { + for (int i = 0; i < count; i++) + { + cancellationToken.ThrowIfCancellationRequested(); + + var activity = Activity.Current; + var data = activity is null + ? new ActivityData() + : new ActivityData + { + Id = activity.Id, + TraceState = activity.TraceStateString, + Baggage = activity.Baggage.ToList(), + }; + + yield return data; + + // Small delay to allow for proper activity propagation + await Task.Yield(); + } + } + } } diff --git a/test/Tester/ActivationTracingTests.cs b/test/Tester/ActivationTracingTests.cs new file mode 100644 index 00000000000..62dc33fe0eb --- /dev/null +++ b/test/Tester/ActivationTracingTests.cs @@ -0,0 +1,944 @@ +using System.Collections.Concurrent; +using System.Diagnostics; +using System.Text; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Orleans.Core.Internal; +using Orleans.Diagnostics; +using Orleans.Placement; +using Orleans.Runtime.Placement; +using Orleans.TestingHost; +using TestExtensions; +using UnitTests.GrainInterfaces; +using Xunit; +using Xunit.Abstractions; + +namespace UnitTests.General +{ + /// + /// Failing test demonstrating missing activation tracing spans. + /// Expects an activation Activity to be created on first grain activation. + /// + [Collection("ActivationTracing")] + public class ActivationTracingTests : OrleansTestingBase, IClassFixture + { + private static readonly ConcurrentBag Started = new(); + + static ActivationTracingTests() + { + var listener = new ActivityListener + { + ShouldListenTo = src => src.Name == ActivitySources.ApplicationGrainActivitySourceName + || src.Name == ActivitySources.LifecycleActivitySourceName + || src.Name == ActivitySources.StorageActivitySourceName, + Sample = (ref _) => ActivitySamplingResult.AllData, + SampleUsingParentId = (ref _) => ActivitySamplingResult.AllData, + ActivityStarted = activity => Started.Add(activity), + }; + ActivitySource.AddActivityListener(listener); + } + + public class Fixture : BaseTestClusterFixture + { + protected override void ConfigureTestCluster(TestClusterBuilder builder) + { + builder.Options.InitialSilosCount = 2; // Need 2 silos for migration tests + builder.ConfigureHostConfiguration(TestDefaultConfiguration.ConfigureHostConfiguration); + builder.AddSiloBuilderConfigurator(); + builder.AddClientBuilderConfigurator(); + } + + private class SiloCfg : ISiloConfigurator + { + public void Configure(ISiloBuilder hostBuilder) + { + hostBuilder + .AddActivityPropagation() + .AddMemoryGrainStorageAsDefault() + .AddMemoryGrainStorage("PubSubStore"); + hostBuilder.Services.AddPlacementFilter(ServiceLifetime.Singleton); + } + } + + private class ClientCfg : IClientBuilderConfigurator + { + public void Configure(IConfiguration configuration, IClientBuilder clientBuilder) + { + clientBuilder.AddActivityPropagation(); + } + } + } + + private readonly Fixture _fixture; + private readonly ITestOutputHelper _output; + + public ActivationTracingTests(Fixture fixture, ITestOutputHelper output) + { + _fixture = fixture; + _output = output; + } + + [Fact] + [TestCategory("BVT")] + public async Task ActivationSpanIsCreatedOnFirstCall() + { + Started.Clear(); + + using var parent = ActivitySources.ApplicationGrainSource.StartActivity("test-parent"); + parent?.Start(); + try + { + var grain = _fixture.GrainFactory.GetGrain(Random.Shared.Next()); + // First call should force activation + _ = await grain.GetActivityId(); + + // Expect at least one activation-related activity + var activationActivities = Started.Where(a => a.Source.Name == ActivitySources.LifecycleActivitySourceName).ToList(); + Assert.True(activationActivities.Count > 0, "Expected activation tracing activity to be created, but none were observed."); + + // Verify all expected spans are present and properly parented under test-parent + var testParentTraceId = parent.TraceId.ToString(); + + // Find the placement span - should be parented to the grain call which is parented to test-parent + var placementSpan = Started.FirstOrDefault(a => a.OperationName == ActivityNames.PlaceGrain); + Assert.NotNull(placementSpan); + Assert.Equal(testParentTraceId, placementSpan.TraceId.ToString()); + + // Find the placement filter span - should share the same trace ID as test-parent + var placementFilterSpan = Started.FirstOrDefault(a => a.OperationName == ActivityNames.FilterPlacementCandidates); + Assert.Null(placementFilterSpan); + + // Find the activation span - should be parented to the grain call which is parented to test-parent + var activationSpan = Started.FirstOrDefault(a => a.OperationName == ActivityNames.ActivateGrain); + Assert.NotNull(activationSpan); + Assert.Equal(testParentTraceId, activationSpan.TraceId.ToString()); + + // Find the OnActivateAsync span - should be parented to the activation span + var onActivateSpan = Started.FirstOrDefault(a => a.OperationName == ActivityNames.OnActivate); + Assert.Null(onActivateSpan); + + // Find the directory register span - should be parented to activation span + var directoryRegisterSpan = Started.FirstOrDefault(a => a.OperationName == ActivityNames.RegisterDirectoryEntry); + Assert.NotNull(directoryRegisterSpan); + Assert.Equal(testParentTraceId, directoryRegisterSpan.TraceId.ToString()); + Assert.Equal(activationSpan.SpanId.ToString(), directoryRegisterSpan.ParentSpanId.ToString()); + } + finally + { + parent.Stop(); + AssertNoApplicationSpansParentedByRuntimeSpans(); + PrintActivityDiagnostics(); + } + } + + [Fact] + [TestCategory("BVT")] + public async Task ActivationSpanIncludesFilter() + { + Started.Clear(); + + using var parent = ActivitySources.ApplicationGrainSource.StartActivity("test-parent-filter"); + parent?.Start(); + try + { + var grain = _fixture.GrainFactory.GetGrain(Random.Shared.Next()); + // First call should force activation + _ = await grain.GetActivityId(); + + // Expect at least one activation-related activity + var activationActivities = Started.Where(a => a.Source.Name == ActivitySources.LifecycleActivitySourceName).ToList(); + Assert.True(activationActivities.Count > 0, "Expected activation tracing activity to be created, but none were observed."); + + // Verify all expected spans are present and properly parented under test-parent + var testParentTraceId = parent.TraceId.ToString(); + + // Find the placement span - should be parented to the grain call which is parented to test-parent + var placementSpan = Started.FirstOrDefault(a => a.OperationName == ActivityNames.PlaceGrain); + Assert.NotNull(placementSpan); + Assert.Equal(testParentTraceId, placementSpan.TraceId.ToString()); + + // Find the placement filter span - should share the same trace ID as test-parent + var placementFilterSpan = Started.FirstOrDefault(a => a.OperationName == ActivityNames.FilterPlacementCandidates); + Assert.NotNull(placementFilterSpan); + Assert.Equal(testParentTraceId, placementFilterSpan.TraceId.ToString()); + Assert.Equal("TracingTestPlacementFilterStrategy", placementFilterSpan.Tags.FirstOrDefault(t => t.Key == "orleans.placement.filter.type").Value); + + // Find the activation span - should be parented to the grain call which is parented to test-parent + var activationSpan = Started.FirstOrDefault(a => a.OperationName == ActivityNames.ActivateGrain); + Assert.NotNull(activationSpan); + Assert.Equal(testParentTraceId, activationSpan.TraceId.ToString()); + + // Find the OnActivateAsync span - should be parented to the activation span + var onActivateSpan = Started.FirstOrDefault(a => a.OperationName == ActivityNames.OnActivate); + Assert.NotNull(onActivateSpan); + Assert.Equal(testParentTraceId, onActivateSpan.TraceId.ToString()); + Assert.Equal(activationSpan.SpanId.ToString(), onActivateSpan.ParentSpanId.ToString()); + + // Find the directory register span - should be parented to activation span + var directoryRegisterSpan = Started.FirstOrDefault(a => a.OperationName == ActivityNames.RegisterDirectoryEntry); + Assert.NotNull(directoryRegisterSpan); + Assert.Equal(testParentTraceId, directoryRegisterSpan.TraceId.ToString()); + Assert.Equal(activationSpan.SpanId.ToString(), directoryRegisterSpan.ParentSpanId.ToString()); + } + finally + { + parent.Stop(); + AssertNoApplicationSpansParentedByRuntimeSpans(); + PrintActivityDiagnostics(); + } + } + + [Fact] + [TestCategory("BVT")] + public async Task PersistentStateReadSpanIsCreatedDuringActivation() + { + Started.Clear(); + + using var parent = ActivitySources.ApplicationGrainSource.StartActivity("test-parent-storage"); + parent?.Start(); + try + { + var grain = _fixture.GrainFactory.GetGrain(Random.Shared.Next()); + // First call should force activation which triggers state read + _ = await grain.GetActivityId(); + + // Expect at least one activation-related activity + var activationActivities = Started.Where(a => a.Source.Name == ActivitySources.LifecycleActivitySourceName).ToList(); + Assert.True(activationActivities.Count > 0, "Expected activation tracing activity to be created, but none were observed."); + + // Verify all expected spans are present and properly parented under test-parent + var testParentTraceId = parent.TraceId.ToString(); + + // Find the activation span - should be parented to the grain call which is parented to test-parent + var activationSpan = Started.FirstOrDefault(a => a.OperationName == ActivityNames.ActivateGrain && a.Tags.First(kv => kv.Key == "orleans.grain.type").Value == "persistentstateactivity"); + Assert.NotNull(activationSpan); + Assert.Equal(testParentTraceId, activationSpan.TraceId.ToString()); + + // Find the storage read span - should share the same trace ID as test-parent + var storageReadSpan = Started.FirstOrDefault(a => a.OperationName == ActivityNames.StorageRead); + Assert.NotNull(storageReadSpan); + Assert.Equal(testParentTraceId, storageReadSpan.TraceId.ToString()); + + // Verify storage read span has expected tags + Assert.Equal("MemoryGrainStorage", storageReadSpan.Tags.FirstOrDefault(t => t.Key == "orleans.storage.provider").Value); + Assert.Equal("state", storageReadSpan.Tags.FirstOrDefault(t => t.Key == "orleans.storage.state.name").Value); + Assert.Equal("PersistentStateActivityGrainState", storageReadSpan.Tags.FirstOrDefault(t => t.Key == "orleans.storage.state.type").Value); + + // Verify the grain ID tag is present + var grainIdTag = storageReadSpan.Tags.FirstOrDefault(t => t.Key == "orleans.grain.id").Value; + Assert.NotNull(grainIdTag); + } + finally + { + parent.Stop(); + AssertNoApplicationSpansParentedByRuntimeSpans(); + PrintActivityDiagnostics(); + } + } + + /// + /// Tests that dehydrate and rehydrate spans are created during grain migration. + /// Verifies that the migration process creates proper tracing spans for both + /// dehydration (on the source silo) and rehydration (on the target silo). + /// + [Fact] + [TestCategory("BVT")] + public async Task MigrationSpansAreCreatedDuringGrainMigration() + { + Started.Clear(); + + using var parent = ActivitySources.ApplicationGrainSource.StartActivity("test-parent-migration"); + parent?.Start(); + try + { + // Create a grain and set some state + var grain = _fixture.GrainFactory.GetGrain(Random.Shared.Next()); + var expectedState = Random.Shared.Next(); + await grain.SetState(expectedState); + var originalAddress = await grain.GetGrainAddress(); + var originalHost = originalAddress.SiloAddress; + + // Find a different silo to migrate to + var targetHost = _fixture.HostedCluster.GetActiveSilos() + .Select(s => s.SiloAddress) + .First(address => address != originalHost); + + // Trigger migration with a placement hint to coerce the placement director to use the target silo + RequestContext.Set(IPlacementDirector.PlacementHintKey, targetHost); + await grain.Cast().MigrateOnIdle(); + + // Verify the state was preserved (this also waits for migration to complete) + var newState = await grain.GetState(); + Assert.Equal(expectedState, newState); + + // Give some time for all activities to complete + await Task.Delay(500); + + var testParentTraceId = parent.TraceId.ToString(); + + // Verify dehydrate span was created + var dehydrateSpans = Started.Where(a => a.OperationName == ActivityNames.ActivationDehydrate).ToList(); + Assert.True(dehydrateSpans.Count > 0, "Expected at least one dehydrate span to be created during migration"); + + var dehydrateSpan = dehydrateSpans.First(); + Assert.NotNull(dehydrateSpan.Tags.FirstOrDefault(t => t.Key == "orleans.grain.id").Value); + Assert.NotNull(dehydrateSpan.Tags.FirstOrDefault(t => t.Key == "orleans.silo.id").Value); + Assert.NotNull(dehydrateSpan.Tags.FirstOrDefault(t => t.Key == "orleans.activation.id").Value); + // Verify target silo tag is present + Assert.Equal(targetHost.ToString(), dehydrateSpan.Tags.FirstOrDefault(t => t.Key == "orleans.migration.target.silo").Value); + // Verify dehydrate span is parented to the migration request trace + Assert.Equal(testParentTraceId, dehydrateSpan.TraceId.ToString()); + + // Verify rehydrate span was created on the target silo + var rehydrateSpans = Started.Where(a => a.OperationName == ActivityNames.ActivationRehydrate).ToList(); + Assert.True(rehydrateSpans.Count > 0, "Expected at least one rehydrate span to be created during migration"); + + var rehydrateSpan = rehydrateSpans.First(); + Assert.NotNull(rehydrateSpan.Tags.FirstOrDefault(t => t.Key == "orleans.grain.id").Value); + Assert.NotNull(rehydrateSpan.Tags.FirstOrDefault(t => t.Key == "orleans.silo.id").Value); + Assert.NotNull(rehydrateSpan.Tags.FirstOrDefault(t => t.Key == "orleans.activation.id").Value); + // Verify the rehydrate span has the previous registration tag + Assert.NotNull(rehydrateSpan.Tags.FirstOrDefault(t => t.Key == "orleans.rehydrate.previousRegistration").Value); + } + finally + { + parent?.Stop(); + AssertNoApplicationSpansParentedByRuntimeSpans(); + PrintActivityDiagnostics(); + } + } + + /// + /// Tests that dehydrate and rehydrate spans are created during migration of a grain with persistent state. + /// Verifies that IPersistentState participates in migration and creates proper tracing spans. + /// + [Fact] + [TestCategory("BVT")] + public async Task MigrationSpansAreCreatedForGrainWithPersistentState() + { + Started.Clear(); + + using var parent = ActivitySources.ApplicationGrainSource.StartActivity("test-parent-migration-persistent"); + parent?.Start(); + try + { + // Create a grain with persistent state and set some state + var grain = _fixture.GrainFactory.GetGrain(Random.Shared.Next()); + var expectedStateA = Random.Shared.Next(); + var expectedStateB = Random.Shared.Next(); + await grain.SetState(expectedStateA, expectedStateB); + var originalAddress = await grain.GetGrainAddress(); + var originalHost = originalAddress.SiloAddress; + + // Find a different silo to migrate to + var targetHost = _fixture.HostedCluster.GetActiveSilos() + .Select(s => s.SiloAddress) + .First(address => address != originalHost); + + // Trigger migration with a placement hint + RequestContext.Set(IPlacementDirector.PlacementHintKey, targetHost); + await grain.Cast().MigrateOnIdle(); + + // Wait for migration to complete + GrainAddress newAddress; + do + { + await Task.Delay(100); + newAddress = await grain.GetGrainAddress(); + } while (newAddress.ActivationId == originalAddress.ActivationId); + + // Verify the grain migrated to the target silo + Assert.Equal(targetHost, newAddress.SiloAddress); + + // Verify the state was preserved + var (actualA, actualB) = await grain.GetState(); + Assert.Equal(expectedStateA, actualA); + Assert.Equal(expectedStateB, actualB); + + // Give some time for all activities to complete + await Task.Delay(500); + + // Verify dehydrate span was NOT created (grain doesn't implement IGrainMigrationParticipant) + var dehydrateSpans = Started.Where(a => a.OperationName == ActivityNames.ActivationDehydrate).ToList(); + Assert.True(dehydrateSpans.Count == 0, $"Expected no dehydrate spans for grain without IGrainMigrationParticipant, but found {dehydrateSpans.Count}"); + + // Verify rehydrate span was NOT created + var rehydrateSpans = Started.Where(a => a.OperationName == ActivityNames.ActivationRehydrate).ToList(); + Assert.True(rehydrateSpans.Count == 0, $"Expected no rehydrate spans for grain without IGrainMigrationParticipant, but found {rehydrateSpans.Count}"); + + // Verify storage read span was NOT created during rehydration (state is transferred via migration context) + // Note: Storage read should NOT happen during migration - the state is transferred in-memory + var storageReadSpansAfterMigration = Started.Where(a => a.OperationName == ActivityNames.StorageRead).ToList(); + // During migration, storage should not be read because state is transferred via dehydration context + // The storage read only happens on fresh activation, not on rehydration + + Assert.Equal(2, storageReadSpansAfterMigration.Count); + } + finally + { + parent.Stop(); + AssertNoApplicationSpansParentedByRuntimeSpans(); + PrintActivityDiagnostics(); + } + } + + /// + /// Tests that dehydrate and rehydrate spans are NOT created during migration of a grain + /// that does not implement IGrainMigrationParticipant. + /// + [Fact] + [TestCategory("BVT")] + public async Task DehydrateAndRehydrateSpansAreNotCreatedForGrainWithoutMigrationParticipant() + { + Started.Clear(); + + using var parent = ActivitySources.ApplicationGrainSource.StartActivity("test-parent-no-migration-participant"); + parent?.Start(); + try + { + // Create a grain that doesn't implement IGrainMigrationParticipant + var grain = _fixture.GrainFactory.GetGrain(Random.Shared.Next()); + var expectedState = Random.Shared.Next(); + await grain.SetState(expectedState); + var originalAddress = await grain.GetGrainAddress(); + var originalHost = originalAddress.SiloAddress; + + // Find a different silo to migrate to + var targetHost = _fixture.HostedCluster.GetActiveSilos() + .Select(s => s.SiloAddress) + .First(address => address != originalHost); + + // Trigger migration with a placement hint to coerce the placement director to use the target silo + RequestContext.Set(IPlacementDirector.PlacementHintKey, targetHost); + await grain.Cast().MigrateOnIdle(); + + // Make a call to ensure grain is activated on target silo + // Note: State won't be preserved since grain doesn't participate in migration + _ = await grain.GetState(); + + // Give some time for all activities to complete + await Task.Delay(500); + + // Verify dehydrate span was NOT created (grain doesn't implement IGrainMigrationParticipant) + var dehydrateSpans = Started.Where(a => a.OperationName == ActivityNames.ActivationDehydrate).ToList(); + Assert.True(dehydrateSpans.Count == 0, $"Expected no dehydrate spans for grain without IGrainMigrationParticipant, but found {dehydrateSpans.Count}"); + + // Verify rehydrate span was NOT created + var rehydrateSpans = Started.Where(a => a.OperationName == ActivityNames.ActivationRehydrate).ToList(); + Assert.True(rehydrateSpans.Count == 0, $"Expected no rehydrate spans for grain without IGrainMigrationParticipant, but found {rehydrateSpans.Count}"); + + // Verify that activation span WAS created (the grain was still activated on the new silo) + var activationSpans = Started.Where(a => a.OperationName == ActivityNames.ActivateGrain).ToList(); + Assert.True(activationSpans.Count > 0, "Expected at least one activation span for the migrated grain"); + } + finally + { + parent?.Stop(); + AssertNoApplicationSpansParentedByRuntimeSpans(); + PrintActivityDiagnostics(); + } + } + + /// + /// Tests that appropriate tracing spans are created for IAsyncEnumerable grain calls with multiple elements. + /// Verifies that: + /// 1. A session span is created with the original method name (GetActivityDataStream) + /// 2. StartEnumeration, MoveNext, and DisposeAsync spans are nested under the session span + /// 3. All spans share the same trace context + /// + [Fact] + [TestCategory("BVT")] + public async Task AsyncEnumerableSpansAreCreatedForMultipleElements() + { + Started.Clear(); + + using var parent = ActivitySources.ApplicationGrainSource.StartActivity("test-parent-async-enumerable"); + parent?.Start(); + try + { + var grain = _fixture.GrainFactory.GetGrain(Random.Shared.Next()); + const int elementCount = 5; + + var values = new List(); + await foreach (var entry in grain.GetActivityDataStream(elementCount).WithBatchSize(1)) + { + values.Add(entry); + } + + // Verify we received all elements + Assert.Equal(elementCount, values.Count); + + // Verify all expected spans are present and properly parented under test-parent + var testParentTraceId = parent.TraceId.ToString(); + var testParentSpanId = parent.SpanId.ToString(); + + // Find all activities with the ApplicationGrainActivitySourceName + var applicationSpans = Started.Where(a => a.Source.Name == ActivitySources.ApplicationGrainActivitySourceName).ToList(); + + // Find the session span (the logical method call span) + // This should have the method name from the grain interface (e.g., "IAsyncEnumerableActivityGrain/GetActivityDataStream") + var sessionSpans = applicationSpans.Where(a => a.OperationName.Contains("GetActivityDataStream")).ToList(); + Assert.True(sessionSpans.Count >= 1, "Expected at least one session span with GetActivityDataStream operation name"); + + var sessionSpan = sessionSpans.First(); + Assert.Equal(testParentTraceId, sessionSpan.TraceId.ToString()); + Assert.Equal(testParentSpanId, sessionSpan.ParentSpanId.ToString()); + + // Verify the session span has the request ID tag + var requestIdTag = sessionSpan.Tags.FirstOrDefault(t => t.Key == "orleans.async_enumerable.request_id").Value; + Assert.NotNull(requestIdTag); + + var sessionSpanId = sessionSpan.SpanId.ToString(); + + // Find all spans (including runtime spans) to verify parenting + var allSpans = Started.ToList(); + + // Find the StartEnumeration span - should be nested under the session span (in RuntimeActivitySourceName) + // Filter to only client-side spans (those directly parented to the session span) + var startEnumerationSpans = allSpans + .Where(a => a.OperationName.Contains("StartEnumeration") && a.ParentSpanId.ToString() == sessionSpanId) + .ToList(); + Assert.True(startEnumerationSpans.Count >= 1, "Expected at least one StartEnumeration span parented to session span"); + + var startEnumerationSpan = startEnumerationSpans.First(); + Assert.Equal(testParentTraceId, startEnumerationSpan.TraceId.ToString()); + + // Find MoveNext spans - should be nested under the session span (in RuntimeActivitySourceName) + // Filter to only client-side spans (those directly parented to the session span) + var moveNextSpans = allSpans + .Where(a => a.OperationName.Contains("MoveNext") && a.ParentSpanId.ToString() == sessionSpanId) + .ToList(); + Assert.True(moveNextSpans.Count >= 1, $"Expected at least one MoveNext span parented to session span, found {moveNextSpans.Count}"); + + // All client-side MoveNext spans should share the same trace ID + foreach (var moveNextSpan in moveNextSpans) + { + Assert.Equal(testParentTraceId, moveNextSpan.TraceId.ToString()); + } + + // Find DisposeAsync span - should be nested under the session span (in RuntimeActivitySourceName) + // Filter to only client-side spans (those directly parented to the session span) + var disposeSpans = allSpans + .Where(a => a.OperationName.Contains("DisposeAsync") && a.ParentSpanId.ToString() == sessionSpanId) + .ToList(); + Assert.True(disposeSpans.Count >= 1, "Expected at least one DisposeAsync span parented to session span"); + + var disposeSpan = disposeSpans.First(); + Assert.Equal(testParentTraceId, disposeSpan.TraceId.ToString()); + + // Verify each ActivityData received has activity information + // (verifying trace context was propagated into the grain during enumeration) + foreach (var activityData in values) + { + Assert.NotNull(activityData); + Assert.NotNull(activityData.Id); + } + } + finally + { + parent?.Stop(); + AssertNoApplicationSpansParentedByRuntimeSpans(); + PrintActivityDiagnostics(); + } + } + + /// + /// Asserts that no spans from ApplicationGrainActivitySourceName have parents from RuntimeActivitySourceName. + /// This ensures that if only ApplicationGrainActivitySourceName has been added (without RuntimeActivitySourceName), + /// there won't be any hanging traces put at root because of missing RuntimeActivitySourceName spans + /// that would otherwise propagate the trace context. + /// + private void AssertNoApplicationSpansParentedByRuntimeSpans() + { + var activities = Started.ToList(); + var activityById = activities + .Where(a => a.Id is not null) + .ToDictionary(a => a.Id!); + + var applicationSpans = activities + .Where(a => a.Source.Name == ActivitySources.ApplicationGrainActivitySourceName) + .ToList(); + + var violations = new List<(Activity Child, Activity Parent)>(); + + foreach (var appSpan in applicationSpans) + { + if (appSpan.ParentId is not null && activityById.TryGetValue(appSpan.ParentId, out var parentActivity)) + { + if (parentActivity.Source.Name == ActivitySources.RuntimeActivitySourceName) + { + violations.Add((appSpan, parentActivity)); + } + } + } + + if (violations.Count > 0) + { + var sb = new StringBuilder(); + sb.AppendLine($"Found {violations.Count} ApplicationGrainActivitySourceName span(s) with RuntimeActivitySourceName parent(s):"); + foreach (var (child, violationParent) in violations) + { + sb.AppendLine($" - Application span '{child.OperationName}' (Id: {child.Id}) has Runtime parent '{violationParent.OperationName}' (Id: {violationParent.Id})"); + } + Assert.Fail(sb.ToString()); + } + } + + private void PrintActivityDiagnostics() + { + var activities = Started.ToList(); + if (activities.Count == 0) + { + _output.WriteLine("No activities captured."); + return; + } + + var sb = new StringBuilder(); + sb.AppendLine(); + sb.AppendLine("╔══════════════════════════════════════════════════════════════════════════════╗"); + sb.AppendLine("║ CAPTURED ACTIVITIES DIAGNOSTIC ║"); + sb.AppendLine("╠══════════════════════════════════════════════════════════════════════════════╣"); + sb.AppendLine($"║ Total Activities: {activities.Count,-59}║"); + sb.AppendLine("╚══════════════════════════════════════════════════════════════════════════════╝"); + sb.AppendLine(); + + // Group by source + var bySource = activities.GroupBy(a => a.Source.Name).OrderBy(g => g.Key); + + foreach (var sourceGroup in bySource) + { + sb.AppendLine($"┌─ Source: {sourceGroup.Key}"); + sb.AppendLine("│"); + + var sourceActivities = sourceGroup.OrderBy(a => a.StartTimeUtc).ToList(); + for (int i = 0; i < sourceActivities.Count; i++) + { + var activity = sourceActivities[i]; + var isLast = i == sourceActivities.Count - 1; + var prefix = isLast ? "└──" : "├──"; + var continuePrefix = isLast ? " " : "│ "; + + sb.AppendLine($"│ {prefix} [{activity.OperationName}]"); + sb.AppendLine($"│ {continuePrefix} ID: {activity.Id ?? "(null)"}"); + + if (activity.ParentId is not null) + { + sb.AppendLine($"│ {continuePrefix} Parent: {activity.ParentId}"); + } + else + { + sb.AppendLine($"│ {continuePrefix} Parent: (root)"); + } + + sb.AppendLine($"│ {continuePrefix} Duration: {activity.Duration.TotalMilliseconds:F2}ms"); + sb.AppendLine($"│ {continuePrefix} Status: {activity.Status}"); + + var tags = activity.Tags.ToList(); + if (tags.Count > 0) + { + sb.AppendLine($"│ {continuePrefix} Tags:"); + foreach (var tag in tags) + { + sb.AppendLine($"│ {continuePrefix} • {tag.Key}: {tag.Value}"); + } + } + + sb.AppendLine("│"); + } + + sb.AppendLine(); + } + + // Print hierarchy view + sb.AppendLine("═══════════════════════════════════════════════════════════════════════════════"); + sb.AppendLine(" ACTIVITY HIERARCHY "); + sb.AppendLine("═══════════════════════════════════════════════════════════════════════════════"); + sb.AppendLine(); + + var activityById = activities.Where(a => a.Id is not null).ToDictionary(a => a.Id!); + var roots = activities.Where(a => a.ParentId is null || !activityById.ContainsKey(a.ParentId)).ToList(); + + foreach (var root in roots.OrderBy(a => a.StartTimeUtc)) + { + PrintActivityTree(sb, root, activityById, activities, "", true); + } + + _output.WriteLine(sb.ToString()); + } + + private static void PrintActivityTree( + StringBuilder sb, + Activity activity, + Dictionary activityById, + List allActivities, + string indent, + bool isLast) + { + var marker = isLast ? "└── " : "├── "; + var durationStr = activity.Duration.TotalMilliseconds > 0 + ? $" ({activity.Duration.TotalMilliseconds:F2}ms)" + : ""; + + sb.AppendLine($"{indent}{marker}[{activity.Source.Name}] {activity.OperationName}{durationStr}"); + + var children = allActivities + .Where(a => a.ParentId == activity.Id) + .OrderBy(a => a.StartTimeUtc) + .ToList(); + + var childIndent = indent + (isLast ? " " : "│ "); + + for (int i = 0; i < children.Count; i++) + { + PrintActivityTree(sb, children[i], activityById, allActivities, childIndent, i == children.Count - 1); + } + } + } + + #region Test Placement Filter for Tracing + + /// + /// Test placement filter attribute for tracing tests. + /// + public class TracingTestPlacementFilterAttribute() : PlacementFilterAttribute(new TracingTestPlacementFilterStrategy()); + + /// + /// Test placement filter strategy for tracing tests. + /// + public class TracingTestPlacementFilterStrategy() : PlacementFilterStrategy(order: 1) + { + } + + /// + /// Test placement filter director that simply passes through all silos. + /// + public class TracingTestPlacementFilterDirector : IPlacementFilterDirector + { + public IEnumerable Filter(PlacementFilterStrategy filterStrategy, PlacementTarget target, IEnumerable silos) + { + return silos; + } + } + + /// + /// Test grain interface with a placement filter for tracing tests. + /// + public interface IFilteredActivityGrain : IGrainWithIntegerKey + { + Task GetActivityId(); + } + + /// + /// Test grain implementation with a placement filter for tracing tests. + /// + [TracingTestPlacementFilter] + public class FilteredActivityGrain : Grain, IFilteredActivityGrain + { + public Task GetActivityId() + { + var activity = Activity.Current; + if (activity is null) + { + return Task.FromResult(default(ActivityData)); + } + + var result = new ActivityData() + { + Id = activity.Id, + TraceState = activity.TraceStateString, + Baggage = activity.Baggage.ToList(), + }; + + return Task.FromResult(result); + } + } + + #endregion + + #region Test Grain with Persistent State for tracing + + /// + /// Test grain interface with persistent state for tracing tests. + /// + public interface IPersistentStateActivityGrain : IGrainWithIntegerKey + { + Task GetActivityId(); + Task GetStateValue(); + } + + /// + /// Test grain state for persistent state tracing tests. + /// + [GenerateSerializer] + public class PersistentStateActivityGrainState + { + [Id(0)] + public int Value { get; set; } + } + + /// + /// Test grain implementation with persistent state for tracing tests. + /// + [TracingTestPlacementFilter] + public class PersistentStateActivityGrain : Grain, IPersistentStateActivityGrain + { + private readonly IPersistentState _state; + + public PersistentStateActivityGrain( + [PersistentState("state")] IPersistentState state) + { + _state = state; + } + + public Task GetActivityId() + { + var activity = Activity.Current; + if (activity is null) + { + return Task.FromResult(default(ActivityData)); + } + + var result = new ActivityData() + { + Id = activity.Id, + TraceState = activity.TraceStateString, + Baggage = activity.Baggage.ToList(), + }; + + return Task.FromResult(result); + } + + public Task GetStateValue() + { + return Task.FromResult(_state.State.Value); + } + } + + #endregion + + #region Test Grain for Migration Tracing + + /// + /// Test grain interface for migration tracing tests. + /// + public interface IMigrationTracingTestGrain : IGrainWithIntegerKey + { + ValueTask GetGrainAddress(); + ValueTask SetState(int state); + ValueTask GetState(); + } + + /// + /// Test grain state for migration tracing tests. + /// + [GenerateSerializer] + public class MigrationTracingTestGrainState + { + [Id(0)] + public int Value { get; set; } + } + + /// + /// Test grain implementation for migration tracing tests. + /// Implements IGrainMigrationParticipant to participate in migration. + /// Uses RandomPlacement to allow migration to different silos. + /// + [RandomPlacement] + public class MigrationTracingTestGrain : Grain, IMigrationTracingTestGrain, IGrainMigrationParticipant + { + private int _state; + + public ValueTask GetState() => new(_state); + + public ValueTask SetState(int state) + { + _state = state; + return default; + } + + public void OnDehydrate(IDehydrationContext migrationContext) + { + migrationContext.TryAddValue("state", _state); + } + + public void OnRehydrate(IRehydrationContext migrationContext) + { + migrationContext.TryGetValue("state", out _state); + } + + public ValueTask GetGrainAddress() => new(GrainContext.Address); + } + + /// + /// Test grain interface for migration tracing tests without IGrainMigrationParticipant. + /// + public interface ISimpleMigrationTracingTestGrain : IGrainWithIntegerKey + { + ValueTask GetGrainAddress(); + ValueTask SetState(int state); + ValueTask GetState(); + } + + /// + /// Test grain implementation for migration tracing tests that does NOT implement IGrainMigrationParticipant. + /// Uses RandomPlacement to allow migration to different silos. + /// This grain will lose its state during migration since it doesn't participate in dehydration/rehydration. + /// + [RandomPlacement] + public class SimpleMigrationTracingTestGrain : Grain, ISimpleMigrationTracingTestGrain + { + private int _state; + + public ValueTask GetState() => new(_state); + + public ValueTask SetState(int state) + { + _state = state; + return default; + } + + public ValueTask GetGrainAddress() => new(GrainContext.Address); + } + + /// + /// Test grain interface with persistent state for migration tracing tests. + /// + public interface IMigrationPersistentStateTracingTestGrain : IGrainWithIntegerKey + { + ValueTask SetState(int a, int b); + ValueTask<(int A, int B)> GetState(); + ValueTask GetGrainAddress(); + } + + /// + /// Test grain implementation with IPersistentState for migration tracing tests. + /// Uses RandomPlacement to allow migration to different silos. + /// + [RandomPlacement] + public class MigrationPersistentStateTracingTestGrain : Grain, IMigrationPersistentStateTracingTestGrain + { + private readonly IPersistentState _stateA; + private readonly IPersistentState _stateB; + + public MigrationPersistentStateTracingTestGrain( + [PersistentState("a")] IPersistentState stateA, + [PersistentState("b")] IPersistentState stateB) + { + _stateA = stateA; + _stateB = stateB; + } + + public ValueTask<(int A, int B)> GetState() => new((_stateA.State.Value, _stateB.State.Value)); + + public ValueTask SetState(int a, int b) + { + _stateA.State.Value = a; + _stateB.State.Value = b; + return default; + } + + public ValueTask GetGrainAddress() => new(GrainContext.Address); + } + + #endregion +} diff --git a/test/Tester/ActivityPropagationTests.cs b/test/Tester/ActivityPropagationTests.cs index 8fe2285ce26..01af15e2a56 100644 --- a/test/Tester/ActivityPropagationTests.cs +++ b/test/Tester/ActivityPropagationTests.cs @@ -1,6 +1,6 @@ using System.Diagnostics; using Microsoft.Extensions.Configuration; -using Orleans.Runtime; +using Orleans.Diagnostics; using Orleans.TestingHost; using TestExtensions; using UnitTests.GrainInterfaces; @@ -31,7 +31,7 @@ static ActivityPropagationTests() // This listener specifically targets activities created by Orleans for grain calls Listener = new() { - ShouldListenTo = p => p.Name == ActivityPropagationGrainCallFilter.ApplicationGrainActivitySourceName, + ShouldListenTo = p => p.Name == ActivitySources.ApplicationGrainActivitySourceName, Sample = Sample, SampleUsingParentId = SampleUsingParentId, };