diff --git a/Directory.Packages.props b/Directory.Packages.props index 789dd1cad..0bdd626de 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -15,28 +15,26 @@ - - + - + - - - - - + + + + - - + + - - + + @@ -45,7 +43,31 @@ - + + + + + + + + + + + + + + + + + + + + + + + + + @@ -54,8 +76,6 @@ - - @@ -68,25 +88,24 @@ - + - + - - + - - + + - + @@ -104,16 +123,12 @@ - + - - - - + - + - - \ No newline at end of file + diff --git a/src/DurableTask.AzureServiceFabric/DurableTask.AzureServiceFabric.csproj b/src/DurableTask.AzureServiceFabric/DurableTask.AzureServiceFabric.csproj index 5ed626ae5..fbaf0eb1a 100644 --- a/src/DurableTask.AzureServiceFabric/DurableTask.AzureServiceFabric.csproj +++ b/src/DurableTask.AzureServiceFabric/DurableTask.AzureServiceFabric.csproj @@ -1,8 +1,9 @@  + - net462;net472 + net8.0;net462;net472 true Microsoft.Azure.DurableTask.AzureServiceFabric true @@ -14,29 +15,42 @@ Microsoft AnyCPU;x64 $(NoWarn);NU5104 - - + + + + + + + + + + + + + + - + + - + @@ -50,4 +64,5 @@ content/SBOM + diff --git a/src/DurableTask.AzureServiceFabric/Remote/DefaultStringPartitionHashing.cs b/src/DurableTask.AzureServiceFabric/Remote/DefaultStringPartitionHashing.cs index 619f41c95..f3e28a2b5 100644 --- a/src/DurableTask.AzureServiceFabric/Remote/DefaultStringPartitionHashing.cs +++ b/src/DurableTask.AzureServiceFabric/Remote/DefaultStringPartitionHashing.cs @@ -31,7 +31,7 @@ public Task GeneratePartitionHashCodeAsync(string value, CancellationToken long hashCode = 0; if (!string.IsNullOrEmpty(value)) { - using (var sha256 = SHA256Managed.Create()) + using (var sha256 = SHA256.Create()) { var bytes = Encoding.UTF8.GetBytes(value); var hash = sha256.ComputeHash(bytes); diff --git a/src/DurableTask.AzureServiceFabric/Remote/RemoteOrchestrationServiceClient.cs b/src/DurableTask.AzureServiceFabric/Remote/RemoteOrchestrationServiceClient.cs index 2143dd414..1144e55a9 100644 --- a/src/DurableTask.AzureServiceFabric/Remote/RemoteOrchestrationServiceClient.cs +++ b/src/DurableTask.AzureServiceFabric/Remote/RemoteOrchestrationServiceClient.cs @@ -11,28 +11,25 @@ // limitations under the License. // ---------------------------------------------------------------------------------- +using System; +using System.Collections.Generic; +using System.Linq; +using System.Net; +using System.Net.Http; +using System.Net.Http.Formatting; +using System.Net.Sockets; +using System.Threading; +using System.Threading.Tasks; +using System.Web; +using DurableTask.AzureServiceFabric.Exceptions; +using DurableTask.AzureServiceFabric.Models; +using DurableTask.Core; +using DurableTask.Core.Exceptions; +using Newtonsoft.Json; +using Newtonsoft.Json.Linq; + namespace DurableTask.AzureServiceFabric.Remote { - using System; - using System.Collections.Generic; - using System.Linq; - using System.Net; - using System.Net.Http; - using System.Net.Http.Formatting; - using System.Net.Sockets; - using System.Threading; - using System.Threading.Tasks; - using System.Web; - - using DurableTask.Core; - using DurableTask.Core.Exceptions; - using DurableTask.AzureServiceFabric.Exceptions; - using DurableTask.AzureServiceFabric.Models; - - using Newtonsoft.Json; - using Newtonsoft.Json.Linq; - using System.Web.Http.Results; - /// /// Allows to interact with a remote IOrchestrationServiceClient /// @@ -116,16 +113,14 @@ public async Task ForceTerminateTaskOrchestrationAsync(string instanceId, string instanceId.EnsureValidInstanceId(); var fragment = $"{this.GetOrchestrationFragment(instanceId)}?reason={HttpUtility.UrlEncode(reason)}"; - using (var response = await this.ExecuteRequestWithRetriesAsync( + using var response = await this.ExecuteRequestWithRetriesAsync( instanceId, async (baseUri) => await this.HttpClient.DeleteAsync(new Uri(baseUri, fragment)), - CancellationToken.None)) + CancellationToken.None); + if (!response.IsSuccessStatusCode) { - if (!response.IsSuccessStatusCode) - { - var message = await response.Content.ReadAsStringAsync(); - throw new RemoteServiceException($"Unable to terminate task instance. Error: {response.StatusCode}:{message}", response.StatusCode); - } + var message = await response.Content.ReadAsStringAsync(); + throw new RemoteServiceException($"Unable to terminate task instance. Error: {response.StatusCode}:{message}", response.StatusCode); } } @@ -154,7 +149,12 @@ public async Task> GetOrchestrationStateAsync(string i { instanceId.EnsureValidInstanceId(); +#if NETFRAMEWORK var fragment = $"{this.GetOrchestrationFragment(instanceId)}?allExecutions={allExecutions}"; +#else + var fragment = $"{this.GetOrchestrationFragmentAll(instanceId)}?allExecutions={allExecutions}"; +#endif + var stateString = await this.GetStringResponseAsync(instanceId, fragment, CancellationToken.None); var states = JsonConvert.DeserializeObject>(stateString); return states; @@ -278,6 +278,8 @@ public async Task WaitForOrchestrationAsync(string instanceI private string GetOrchestrationFragment(string orchestrationId) => $"orchestrations/{orchestrationId}"; + private string GetOrchestrationFragmentAll(string orchestrationId) => $"orchestrationsAll/{orchestrationId}"; + private string GetMessageFragment() => "messages"; private string GetMessageFragment(long messageId) => $"messages/{messageId}"; @@ -286,19 +288,17 @@ public async Task WaitForOrchestrationAsync(string instanceI private async Task GetStringResponseAsync(string instanceId, string fragment, CancellationToken cancellationToken) { - using (var response = await this.ExecuteRequestWithRetriesAsync( + using var response = await this.ExecuteRequestWithRetriesAsync( instanceId, async (baseUri) => await this.HttpClient.GetAsync(new Uri(baseUri, fragment)), - cancellationToken)) + cancellationToken); + string content = await response.Content.ReadAsStringAsync(); + if (response.IsSuccessStatusCode) { - string content = await response.Content.ReadAsStringAsync(); - if (response.IsSuccessStatusCode) - { - return content; - } - - throw new HttpRequestException($"Request failed with status code '{response.StatusCode}' and content '{content}'"); + return content; } + + throw new HttpRequestException($"Request failed with status code '{response.StatusCode}' and content '{content}'"); } private async Task PutJsonAsync(string instanceId, string fragment, object @object, CancellationToken cancellationToken) @@ -308,23 +308,21 @@ private async Task PutJsonAsync(string instanceId, string fragment, object @obje SerializerSettings = new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.All } }; - using (var result = await this.ExecuteRequestWithRetriesAsync( + using var result = await this.ExecuteRequestWithRetriesAsync( instanceId, async (baseUri) => await this.HttpClient.PutAsync(new Uri(baseUri, fragment), @object, mediaFormatter), - cancellationToken)) - { + cancellationToken); - // TODO: Improve exception handling - if (result.StatusCode == HttpStatusCode.Conflict) - { - throw await (result.Content?.ReadAsAsync() ?? Task.FromResult(new OrchestrationAlreadyExistsException())); - } + // TODO: Improve exception handling + if (result.StatusCode == HttpStatusCode.Conflict) + { + throw await (result.Content?.ReadAsAsync() ?? Task.FromResult(new OrchestrationAlreadyExistsException())); + } - if (!result.IsSuccessStatusCode) - { - var content = await (result.Content?.ReadAsStringAsync() ?? Task.FromResult(null)); - throw new RemoteServiceException($"CreateTaskOrchestrationAsync failed with status code {result.StatusCode}: {content}", result.StatusCode); - } + if (!result.IsSuccessStatusCode) + { + var content = await (result.Content?.ReadAsStringAsync() ?? Task.FromResult(null)); + throw new RemoteServiceException($"CreateTaskOrchestrationAsync failed with status code {result.StatusCode}: {content}", result.StatusCode); } } diff --git a/src/DurableTask.AzureServiceFabric/Service/TaskHubProxyListener.cs b/src/DurableTask.AzureServiceFabric/Service/TaskHubProxyListenerBase.cs similarity index 71% rename from src/DurableTask.AzureServiceFabric/Service/TaskHubProxyListener.cs rename to src/DurableTask.AzureServiceFabric/Service/TaskHubProxyListenerBase.cs index 86aba90f2..57c624328 100644 --- a/src/DurableTask.AzureServiceFabric/Service/TaskHubProxyListener.cs +++ b/src/DurableTask.AzureServiceFabric/Service/TaskHubProxyListenerBase.cs @@ -15,17 +15,10 @@ namespace DurableTask.AzureServiceFabric.Service { using System; using System.Fabric; - using System.Globalization; - using System.Linq; - using System.Net; - using System.Net.Sockets; using System.Threading; using System.Threading.Tasks; - - using DurableTask.Core; - using DurableTask.AzureServiceFabric; using DurableTask.AzureServiceFabric.Tracing; - + using DurableTask.Core; using Microsoft.ServiceFabric.Services.Communication.Runtime; using Microsoft.ServiceFabric.Services.Runtime; @@ -53,41 +46,40 @@ namespace DurableTask.AzureServiceFabric.Service /// An instance of this class is created for each service replica by the Service Fabric runtime. /// Listening on HTTP port will expose security risk, use this listener at your own discretation. /// - public sealed class TaskHubProxyListener : IServiceListener + public abstract class TaskHubProxyListenerBase : IServiceListener { readonly RegisterOrchestrations registerOrchestrations; readonly RegisterOrchestrations2 registerOrchestrations2; - readonly FabricOrchestrationProviderSettings fabricOrchestrationProviderSettings; FabricOrchestrationProviderFactory fabricProviderFactory; - FabricOrchestrationProvider fabricOrchestrationProvider; TaskHubWorker worker; TaskHubClient localClient; ReplicaRole currentRole; StatefulService statefulService; - bool enableHttps = true; /// - /// Creates instance of + /// Indicates whether HTTPS is enabled for the listener. /// - /// stateful service context - /// instance of - /// Delegate invoked before starting the worker. - [Obsolete] - public TaskHubProxyListener(StatefulServiceContext context, - FabricOrchestrationProviderSettings fabricOrchestrationProviderSettings, - RegisterOrchestrations registerOrchestrations) - { - this.fabricOrchestrationProviderSettings = fabricOrchestrationProviderSettings ?? throw new ArgumentNullException(nameof(fabricOrchestrationProviderSettings)); - this.registerOrchestrations = registerOrchestrations ?? throw new ArgumentNullException(nameof(registerOrchestrations)); - } + protected readonly bool enableHttps = false; + /// + /// Gets or sets the Fabric Orchestration Provider. + /// + /// + /// This provider is responsible for managing orchestration services and clients + /// within the Service Fabric environment. + /// + protected FabricOrchestrationProvider fabricOrchestrationProvider; + /// + /// Gets the settings for the Fabric Orchestration Provider. + /// + protected readonly FabricOrchestrationProviderSettings fabricOrchestrationProviderSettings; /// - /// Creates instance of + /// Creates instance of /// /// instance of /// Delegate invoked before starting the worker. /// Whether to enable https or http - public TaskHubProxyListener(FabricOrchestrationProviderSettings fabricOrchestrationProviderSettings, + public TaskHubProxyListenerBase(FabricOrchestrationProviderSettings fabricOrchestrationProviderSettings, RegisterOrchestrations registerOrchestrations, bool enableHttps = true) { @@ -97,7 +89,7 @@ public TaskHubProxyListener(FabricOrchestrationProviderSettings fabricOrchestrat } /// - /// Creates instance of + /// Creates instance of /// /// /// Use this constructor when there is a need to access @@ -106,7 +98,7 @@ public TaskHubProxyListener(FabricOrchestrationProviderSettings fabricOrchestrat /// instance of /// Delegate invoked before starting the worker. /// Whether to enable https or http - public TaskHubProxyListener(FabricOrchestrationProviderSettings fabricOrchestrationProviderSettings, + public TaskHubProxyListenerBase(FabricOrchestrationProviderSettings fabricOrchestrationProviderSettings, RegisterOrchestrations2 registerOrchestrations2, bool enableHttps = true) { @@ -115,14 +107,13 @@ public TaskHubProxyListener(FabricOrchestrationProviderSettings fabricOrchestrat this.enableHttps = enableHttps; } - /// - /// Handles node's role change. - /// - /// New for this service replica. - /// Cancellation token to monitor for cancellation requests. - /// - /// A that represents outstanding operation. - /// + /// + public void Initialize(StatefulService statefulService) + { + this.statefulService = statefulService; + } + + /// public async Task OnChangeRoleAsync(ReplicaRole newRole, CancellationToken cancellationToken) { ServiceFabricProviderEventSource.Tracing.LogFabricServiceInformation(this.statefulService, $"TaskHubProxyListener OnChangeRoleAsync, current role = {this.currentRole}, new role = {newRole}"); @@ -134,11 +125,7 @@ public async Task OnChangeRoleAsync(ReplicaRole newRole, CancellationToken cance ServiceFabricProviderEventSource.Tracing.LogFabricServiceInformation(this.statefulService, $"TaskHubProxyListener OnChangeRoleAsync, current role = {this.currentRole}"); } - /// - /// Handles OnCloseAsync event, shuts down the service. - /// - /// Cancellation token to monitor for cancellation requests. - /// A Task that represents outstanding operation. + /// public async Task OnCloseAsync(CancellationToken cancellationToken) { ServiceFabricProviderEventSource.Tracing.LogFabricServiceInformation(this.statefulService, "OnCloseAsync - will shutdown primary if not already done"); @@ -146,26 +133,8 @@ public async Task OnCloseAsync(CancellationToken cancellationToken) } /// - public ServiceReplicaListener CreateServiceReplicaListener() - { - return new ServiceReplicaListener(context => - { - var serviceEndpoint = context.CodePackageActivationContext.GetEndpoint(Constants.TaskHubProxyListenerEndpointName); - string ipAddress = context.NodeContext.IPAddressOrFQDN; -#if DEBUG - IPHostEntry entry = Dns.GetHostEntry(ipAddress); - IPAddress ipv4Address = entry.AddressList.FirstOrDefault( - address => (address.AddressFamily == AddressFamily.InterNetwork) && (!IPAddress.IsLoopback(address))); - ipAddress = ipv4Address.ToString(); -#endif + public abstract ServiceReplicaListener CreateServiceReplicaListener(); - EnsureFabricOrchestrationProviderIsInitialized(); - string protocol = this.enableHttps ? "https" : "http"; - string listeningAddress = string.Format(CultureInfo.InvariantCulture, "{0}://{1}:{2}/{3}/dtfx/", protocol, ipAddress, serviceEndpoint.Port, context.PartitionId); - - return new OwinCommunicationListener(new Startup(listeningAddress, this.fabricOrchestrationProvider)); - }, Constants.TaskHubProxyServiceName); - } /// public async Task OnRunAsync(CancellationToken cancellationToken) @@ -173,12 +142,6 @@ public async Task OnRunAsync(CancellationToken cancellationToken) await StartAsync(); } - /// - public void Initialize(StatefulService statefulService) - { - this.statefulService = statefulService; - } - /// public Task OnOpenAsync(ReplicaOpenMode openMode, CancellationToken cancellationToken) { @@ -207,6 +170,7 @@ async Task StartAsync() await this.worker.StartAsync(); this.worker.TaskActivityDispatcher.IncludeDetails = true; + this.worker.TaskOrchestrationDispatcher.IncludeDetails = true; } catch (Exception exception) { @@ -238,7 +202,14 @@ async Task StopAsync() } } - private void EnsureFabricOrchestrationProviderIsInitialized() + /// + /// Ensures that the Fabric Orchestration Provider is initialized. + /// + /// + /// This method checks if the Fabric Orchestration Provider is null and initializes it + /// using the FabricOrchestrationProviderFactory if necessary. + /// + protected void EnsureFabricOrchestrationProviderIsInitialized() { if (this.fabricOrchestrationProvider == null) { diff --git a/src/DurableTask.AzureServiceFabric/Service/ActivityLoggingMessageHandler.cs b/src/DurableTask.AzureServiceFabric/Service/netfx/ActivityLoggingMessageHandler.cs similarity index 100% rename from src/DurableTask.AzureServiceFabric/Service/ActivityLoggingMessageHandler.cs rename to src/DurableTask.AzureServiceFabric/Service/netfx/ActivityLoggingMessageHandler.cs diff --git a/src/DurableTask.AzureServiceFabric/Service/DefaultDependencyResolver.cs b/src/DurableTask.AzureServiceFabric/Service/netfx/DefaultDependencyResolver.cs similarity index 100% rename from src/DurableTask.AzureServiceFabric/Service/DefaultDependencyResolver.cs rename to src/DurableTask.AzureServiceFabric/Service/netfx/DefaultDependencyResolver.cs diff --git a/src/DurableTask.AzureServiceFabric/Service/FabricOrchestrationServiceController.cs b/src/DurableTask.AzureServiceFabric/Service/netfx/FabricOrchestrationServiceController.cs similarity index 100% rename from src/DurableTask.AzureServiceFabric/Service/FabricOrchestrationServiceController.cs rename to src/DurableTask.AzureServiceFabric/Service/netfx/FabricOrchestrationServiceController.cs diff --git a/src/DurableTask.AzureServiceFabric/Service/IOwinAppBuilder.cs b/src/DurableTask.AzureServiceFabric/Service/netfx/IOwinAppBuilder.cs similarity index 100% rename from src/DurableTask.AzureServiceFabric/Service/IOwinAppBuilder.cs rename to src/DurableTask.AzureServiceFabric/Service/netfx/IOwinAppBuilder.cs diff --git a/src/DurableTask.AzureServiceFabric/Service/OwinCommunicationListener.cs b/src/DurableTask.AzureServiceFabric/Service/netfx/OwinCommunicationListener.cs similarity index 100% rename from src/DurableTask.AzureServiceFabric/Service/OwinCommunicationListener.cs rename to src/DurableTask.AzureServiceFabric/Service/netfx/OwinCommunicationListener.cs diff --git a/src/DurableTask.AzureServiceFabric/Service/ProxyServiceExceptionHandler.cs b/src/DurableTask.AzureServiceFabric/Service/netfx/ProxyServiceExceptionHandler.cs similarity index 100% rename from src/DurableTask.AzureServiceFabric/Service/ProxyServiceExceptionHandler.cs rename to src/DurableTask.AzureServiceFabric/Service/netfx/ProxyServiceExceptionHandler.cs diff --git a/src/DurableTask.AzureServiceFabric/Service/ProxyServiceExceptionLogger.cs b/src/DurableTask.AzureServiceFabric/Service/netfx/ProxyServiceExceptionLogger.cs similarity index 100% rename from src/DurableTask.AzureServiceFabric/Service/ProxyServiceExceptionLogger.cs rename to src/DurableTask.AzureServiceFabric/Service/netfx/ProxyServiceExceptionLogger.cs diff --git a/src/DurableTask.AzureServiceFabric/Service/Startup.cs b/src/DurableTask.AzureServiceFabric/Service/netfx/Startup.cs similarity index 100% rename from src/DurableTask.AzureServiceFabric/Service/Startup.cs rename to src/DurableTask.AzureServiceFabric/Service/netfx/Startup.cs diff --git a/src/DurableTask.AzureServiceFabric/Service/netfx/TaskHubProxyListener.cs b/src/DurableTask.AzureServiceFabric/Service/netfx/TaskHubProxyListener.cs new file mode 100644 index 000000000..6b12d08b5 --- /dev/null +++ b/src/DurableTask.AzureServiceFabric/Service/netfx/TaskHubProxyListener.cs @@ -0,0 +1,94 @@ +// ---------------------------------------------------------------------------------- +// Copyright Microsoft Corporation +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ---------------------------------------------------------------------------------- + +namespace DurableTask.AzureServiceFabric.Service +{ + using System; + using System.Fabric; + using System.Globalization; + using System.Linq; + using System.Net; + using System.Net.Sockets; + + using DurableTask.Core; + using DurableTask.AzureServiceFabric; + + using Microsoft.ServiceFabric.Services.Communication.Runtime; + + /// + public sealed class TaskHubProxyListener : TaskHubProxyListenerBase + { + /// + /// Creates instance of + /// + /// stateful service context + /// instance of + /// Delegate invoked before starting the worker. + [Obsolete] + public TaskHubProxyListener(StatefulServiceContext context, + FabricOrchestrationProviderSettings fabricOrchestrationProviderSettings, + RegisterOrchestrations registerOrchestrations) : base(fabricOrchestrationProviderSettings, registerOrchestrations) + { + } + + /// + /// Creates instance of + /// + /// instance of + /// Delegate invoked before starting the worker. + /// Whether to enable https or http + public TaskHubProxyListener(FabricOrchestrationProviderSettings fabricOrchestrationProviderSettings, + RegisterOrchestrations registerOrchestrations, + bool enableHttps = true): base(fabricOrchestrationProviderSettings, registerOrchestrations, enableHttps) + { + } + + /// + /// Creates instance of + /// + /// + /// Use this constructor when there is a need to access + /// when registering orchestration artifacts with + /// + /// instance of + /// Delegate invoked before starting the worker. + /// Whether to enable https or http + public TaskHubProxyListener(FabricOrchestrationProviderSettings fabricOrchestrationProviderSettings, + RegisterOrchestrations2 registerOrchestrations2, + bool enableHttps = true) : base(fabricOrchestrationProviderSettings, registerOrchestrations2, enableHttps) + { + } + + /// + public override ServiceReplicaListener CreateServiceReplicaListener() + { + return new ServiceReplicaListener(context => + { + var serviceEndpoint = context.CodePackageActivationContext.GetEndpoint(Constants.TaskHubProxyListenerEndpointName); + string ipAddress = context.NodeContext.IPAddressOrFQDN; +#if DEBUG + IPHostEntry entry = Dns.GetHostEntry(ipAddress); + IPAddress ipv4Address = entry.AddressList.FirstOrDefault( + address => (address.AddressFamily == AddressFamily.InterNetwork) && (!IPAddress.IsLoopback(address))); + ipAddress = ipv4Address.ToString(); +#endif + + EnsureFabricOrchestrationProviderIsInitialized(); + string protocol = this.enableHttps ? "https" : "http"; + string listeningAddress = string.Format(CultureInfo.InvariantCulture, "{0}://{1}:{2}/{3}/dtfx/", protocol, ipAddress, serviceEndpoint.Port, context.PartitionId); + + return new OwinCommunicationListener(new Startup(listeningAddress, this.fabricOrchestrationProvider)); + }, Constants.TaskHubProxyServiceName); + } + } +} diff --git a/src/DurableTask.AzureServiceFabric/Service/netstd/FabricOrchestrationServiceController.cs b/src/DurableTask.AzureServiceFabric/Service/netstd/FabricOrchestrationServiceController.cs new file mode 100644 index 000000000..5a4293109 --- /dev/null +++ b/src/DurableTask.AzureServiceFabric/Service/netstd/FabricOrchestrationServiceController.cs @@ -0,0 +1,188 @@ +// ---------------------------------------------------------------------------------- +// Copyright Microsoft Corporation +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ---------------------------------------------------------------------------------- + +namespace DurableTask.AzureServiceFabric.Service +{ + using System; + using System.Collections.Generic; + using System.Threading.Tasks; + using DurableTask.AzureServiceFabric.Models; + using DurableTask.Core; + using DurableTask.Core.Exceptions; + using Microsoft.AspNetCore.Mvc; + + /// + /// A Web Api controller that provides TaskHubClient operations. + /// + [ApiExplorerSettings(IgnoreApi = true)] + public class FabricOrchestrationServiceController : ControllerBase + { + private readonly IOrchestrationServiceClient orchestrationServiceClient; + + /// + /// Creates an instance of FabricOrchestrationServiceController for given OrchestrationServiceClient + /// + /// IOrchestrationServiceClient instance + public FabricOrchestrationServiceController(IOrchestrationServiceClient orchestrationServiceClient) + { + this.orchestrationServiceClient = orchestrationServiceClient; + } + + /// + /// Creates a task orchestration. + /// + /// Orchestration Id + /// Parameters for creating task orchestration. + /// Will throw an OrchestrationAlreadyExistsException exception + /// If any orchestration with the same instance Id exists in the instance store and it has a status specified in dedupeStatuses. + /// object. + [HttpPut] + [Route("orchestrations/{orchestrationId}")] + public async Task CreateTaskOrchestration([FromRoute] string orchestrationId, [FromBody] CreateTaskOrchestrationParameters parameters) + { + parameters.TaskMessage.OrchestrationInstance.InstanceId.EnsureValidInstanceId(); + if (!orchestrationId.Equals(parameters.TaskMessage.OrchestrationInstance.InstanceId)) + { + return BadRequest($"OrchestrationId from Uri {orchestrationId} doesn't match with the one from body {parameters.TaskMessage.OrchestrationInstance.InstanceId}"); + } + + try + { + if (parameters.DedupeStatuses == null) + { + await this.orchestrationServiceClient.CreateTaskOrchestrationAsync(parameters.TaskMessage); + } + else + { + await this.orchestrationServiceClient.CreateTaskOrchestrationAsync(parameters.TaskMessage, parameters.DedupeStatuses); + } + + return Ok(); + } + catch (OrchestrationAlreadyExistsException ex) + { + return Conflict(ex.ToString()); + } + catch (NotSupportedException ex) + { + return BadRequest(ex.ToString()); + } + } + + /// + /// Sends an orchestration message to TaskHubClient. + /// + /// + /// Message to send + /// object. + [HttpPost] + [Route("messages/{messageId}")] + public async Task SendTaskOrchestrationMessage([FromRoute] long messageId, [FromBody] TaskMessage message) + { + if (messageId != message.SequenceNumber) + { + return Conflict(); + } + + message.OrchestrationInstance.InstanceId.EnsureValidInstanceId(); + await this.orchestrationServiceClient.SendTaskOrchestrationMessageAsync(message); + return Ok(); + } + + /// + /// Sends an array of orchestration messages to TaskHubClient. + /// + /// Message to send + /// object. + [HttpPost] + [Route("messages")] + public async Task SendTaskOrchestrationMessageBatch([FromBody] TaskMessage[] messages) + { + await this.orchestrationServiceClient.SendTaskOrchestrationMessageBatchAsync(messages); + return Ok(); + } + + /// + /// Gets the state of orchestration. + /// + /// Instance id of the orchestration + /// Execution id of the orchestration + /// object. + [HttpGet] + [Route("orchestrations/{orchestrationId}")] + public async Task GetOrchestrationState([FromRoute] string orchestrationId, string executionId) + { + orchestrationId.EnsureValidInstanceId(); + var state = await this.orchestrationServiceClient.GetOrchestrationStateAsync(orchestrationId, executionId); + return state; + } + + /// + /// Gets the state of orchestration. + /// + /// Instance id of the orchestration + /// True if method should fetch all executions of the instance, false if the method should only fetch the most recent execution + /// List of . + [HttpGet] + [Route("orchestrationsAll/{orchestrationId}")] + public async Task> GetOrchestrationState([FromRoute] string orchestrationId, bool allExecutions) + { + orchestrationId.EnsureValidInstanceId(); + var state = await this.orchestrationServiceClient.GetOrchestrationStateAsync(orchestrationId, allExecutions); + return state; + } + + /// + /// Terminates an orchestration. + /// + /// Instance id of the orchestration + /// Execution id of the orchestration + /// object. + [HttpDelete] + [Route("orchestrations/{orchestrationId}")] + public async Task ForceTerminateTaskOrchestration([FromRoute] string orchestrationId, string reason) + { + orchestrationId.EnsureValidInstanceId(); + await this.orchestrationServiceClient.ForceTerminateTaskOrchestrationAsync(orchestrationId, reason); + return Ok(); + } + + /// + /// Gets the history of orchestration. + /// + /// Instance id of the orchestration + /// Execution id of the orchestration + /// Orchestration history + [HttpGet] + [Route("history/{orchestrationId}")] + public async Task GetOrchestrationHistory([FromRoute] string orchestrationId, string executionId) + { + orchestrationId.EnsureValidInstanceId(); + var result = await this.orchestrationServiceClient.GetOrchestrationHistoryAsync(orchestrationId, executionId); + return result; + } + + /// + /// Purges orchestration instance state and history for orchestrations older than the specified threshold time. + /// + /// Purge history parameters + /// object. + [HttpPost] + [Route("history")] + public async Task PurgeOrchestrationHistory([FromBody] PurgeOrchestrationHistoryParameters purgeParameters) + { + await this.orchestrationServiceClient.PurgeOrchestrationHistoryAsync(purgeParameters.ThresholdDateTimeUtc, purgeParameters.TimeRangeFilterType); + return Ok(); + } + } +} diff --git a/src/DurableTask.AzureServiceFabric/Service/netstd/Startup.cs b/src/DurableTask.AzureServiceFabric/Service/netstd/Startup.cs new file mode 100644 index 000000000..cd16a6b67 --- /dev/null +++ b/src/DurableTask.AzureServiceFabric/Service/netstd/Startup.cs @@ -0,0 +1,61 @@ +// ---------------------------------------------------------------------------------- +// Copyright Microsoft Corporation +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ---------------------------------------------------------------------------------- + +namespace DurableTask.AzureServiceFabric.Service +{ + using System; + using DurableTask.AzureServiceFabric; + using DurableTask.Core; + using Microsoft.AspNetCore.Builder; + using Microsoft.AspNetCore.Hosting; + using Microsoft.Extensions.DependencyInjection; + using Newtonsoft.Json; + class Startup + { + readonly FabricOrchestrationProvider fabricOrchestrationProvider; + + public Startup(FabricOrchestrationProvider fabricOrchestrationProvider) + { + this.fabricOrchestrationProvider = fabricOrchestrationProvider ?? throw new ArgumentNullException(nameof(fabricOrchestrationProvider)); + } + + /// + /// this method gets called by the runtime. Use this method to add services to the container. + /// + /// Services to be configured. + public void ConfigureServices(IServiceCollection services) + { + services.AddSingleton(this.fabricOrchestrationProvider.OrchestrationServiceClient); + services.AddTransient(); + + services.AddControllers().AddNewtonsoftJson(options => + { + options.SerializerSettings.TypeNameHandling = TypeNameHandling.All; + }); + } + + + public void Configure(IApplicationBuilder appBuilder, IWebHostEnvironment env) + { + appBuilder.UseRouting(); + appBuilder.UseAuthorization(); + appBuilder.UseEndpoints(endpoints => + { + + endpoints.MapControllers(); + + }); + } + + } +} diff --git a/src/DurableTask.AzureServiceFabric/Service/netstd/TaskHubProxyListener.cs b/src/DurableTask.AzureServiceFabric/Service/netstd/TaskHubProxyListener.cs new file mode 100644 index 000000000..b862e0e7d --- /dev/null +++ b/src/DurableTask.AzureServiceFabric/Service/netstd/TaskHubProxyListener.cs @@ -0,0 +1,101 @@ +// ---------------------------------------------------------------------------------- +// Copyright Microsoft Corporation +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ---------------------------------------------------------------------------------- + +namespace DurableTask.AzureServiceFabric.Service +{ + using System; + using System.Globalization; + using System.IO; + using System.Linq; + using System.Net; + using System.Net.Sockets; + using DurableTask.AzureServiceFabric; + using DurableTask.Core; + using Microsoft.AspNetCore.Hosting; + using Microsoft.AspNetCore.Server.Kestrel.Core; + using Microsoft.ServiceFabric.Services.Communication.AspNetCore; + using Microsoft.ServiceFabric.Services.Communication.Runtime; + + /// + public sealed class TaskHubProxyListener : TaskHubProxyListenerBase + { + readonly Action options; + + /// + /// Creates instance of + /// + /// instance of + /// Delegate invoked before starting the worker. + /// Kestrel Server Options. + /// Whether to enable https or http + public TaskHubProxyListener(FabricOrchestrationProviderSettings fabricOrchestrationProviderSettings, + RegisterOrchestrations registerOrchestrations, Action options, + bool enableHttps = true) : base(fabricOrchestrationProviderSettings, registerOrchestrations, enableHttps) + { + this.options = options ?? throw new ArgumentNullException(nameof(options)); + } + + /// + /// Creates instance of + /// + /// + /// Use this constructor when there is a need to access + /// when registering orchestration artifacts with + /// + /// instance of + /// Delegate invoked before starting the worker. + /// + /// Whether to enable https or http + public TaskHubProxyListener(FabricOrchestrationProviderSettings fabricOrchestrationProviderSettings, + RegisterOrchestrations2 registerOrchestrations2, Action options, + bool enableHttps = true) : base(fabricOrchestrationProviderSettings, registerOrchestrations2, enableHttps) + { + this.options = options ?? throw new ArgumentNullException(nameof(options)); + } + + /// + public override ServiceReplicaListener CreateServiceReplicaListener() + { + return new ServiceReplicaListener(context => + new KestrelCommunicationListener(context, (url, listener) => + { + string ipAddress = context.NodeContext.IPAddressOrFQDN; + var serviceEndpoint = context.CodePackageActivationContext.GetEndpoint(Constants.TaskHubProxyListenerEndpointName); +#if DEBUG + IPHostEntry entry = Dns.GetHostEntry(ipAddress); + IPAddress ipv4Address = entry.AddressList.FirstOrDefault( + address => (address.AddressFamily == AddressFamily.InterNetwork) && (!IPAddress.IsLoopback(address))); + ipAddress = ipv4Address.ToString(); +#endif + + EnsureFabricOrchestrationProviderIsInitialized(); + string protocol = this.enableHttps ? "https" : "http"; + // Port 0 is used for requesting system allocated dynamic port + string listeningAddress = string.Format(CultureInfo.InvariantCulture, "{0}://{1}:{2}/", protocol, ipAddress, serviceEndpoint.Port); + + return new WebHostBuilder().UseKestrel(options) + .UseContentRoot(Directory.GetCurrentDirectory()) + // The UseUniqueServiceUrl option injects "/partitionId/instanceId" + // to the end of the service fabric service endpoint as a unique identifier + // Example Endpoint: https://{MachineName}:{port}/{partitionId}/{instanceId} + .UseServiceFabricIntegration(listener, ServiceFabricIntegrationOptions.None) + .UseStartup(x => new Startup(this.fabricOrchestrationProvider)) + .UseUrls(listeningAddress) + .Build(); + }), + Constants.TaskHubProxyServiceName); + } + + + } +}