diff --git a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs index 62183b622..df66d13c6 100644 --- a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs +++ b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs @@ -22,8 +22,6 @@ namespace DurableTask.AzureStorage using System.Threading; using System.Threading.Tasks; using Azure; - using Azure.Data.Tables; - using Azure.Storage.Blobs.Models; using Azure.Storage.Queues.Models; using DurableTask.AzureStorage.Messaging; using DurableTask.AzureStorage.Monitoring; @@ -35,7 +33,6 @@ namespace DurableTask.AzureStorage using DurableTask.Core.Exceptions; using DurableTask.Core.History; using DurableTask.Core.Query; - using Newtonsoft.Json; /// /// Orchestration service provider for the Durable Task Framework which uses Azure Storage as the durable store. diff --git a/src/DurableTask.AzureStorage/Partitioning/AppLeaseManager.cs b/src/DurableTask.AzureStorage/Partitioning/AppLeaseManager.cs index 9be4dc366..a5639d8b0 100644 --- a/src/DurableTask.AzureStorage/Partitioning/AppLeaseManager.cs +++ b/src/DurableTask.AzureStorage/Partitioning/AppLeaseManager.cs @@ -10,7 +10,7 @@ // See the License for the specific language governing permissions and // limitations under the License. // ---------------------------------------------------------------------------------- - +#nullable enable namespace DurableTask.AzureStorage.Partitioning { using System; @@ -18,7 +18,6 @@ namespace DurableTask.AzureStorage.Partitioning using System.Threading; using System.Threading.Tasks; using DurableTask.AzureStorage.Storage; - using Newtonsoft.Json; /// /// Class responsible for starting and stopping the partition manager. Also implements the app lease feature to ensure a single app's partition manager is started at a time. @@ -45,10 +44,10 @@ sealed class AppLeaseManager bool isLeaseOwner; int appLeaseIsStarted; - Task renewTask; - Task acquireTask; - CancellationTokenSource starterTokenSource; - CancellationTokenSource leaseRenewerCancellationTokenSource; + Task? renewTask; + Task? acquireTask; + CancellationTokenSource? starterTokenSource; + CancellationTokenSource? leaseRenewerCancellationTokenSource; public AppLeaseManager( AzureStorageClient azureStorageClient, @@ -271,7 +270,7 @@ async Task StopAppLeaseAsync() if (this.renewTask != null) { - this.leaseRenewerCancellationTokenSource.Cancel(); + this.leaseRenewerCancellationTokenSource!.Cancel(); await this.renewTask; } @@ -289,6 +288,10 @@ async Task TryAcquireAppLeaseAsync() bool leaseAcquired; if (appLeaseInfo.DesiredSwapId == this.appLeaseId) { + if (appLeaseInfo.OwnerId == null) + { + throw new Exception("App lease info is in an invalid state. DesiredSwapId is set but OwnerId is null."); + } leaseAcquired = await this.ChangeLeaseAsync(appLeaseInfo.OwnerId); } else @@ -428,7 +431,7 @@ async Task LeaseRenewer(CancellationToken cancellationToken) break; } - await Task.Delay(this.options.RenewInterval, this.leaseRenewerCancellationTokenSource.Token); + await Task.Delay(this.options.RenewInterval, this.leaseRenewerCancellationTokenSource!.Token); } catch (OperationCanceledException) { @@ -564,19 +567,24 @@ async Task UpdateAppLeaseInfoBlob(AppLeaseInfo appLeaseInfo) async Task GetAppLeaseInfoAsync() { + AppLeaseInfo? appLeaseInfo = null; if (await this.appLeaseInfoBlob.ExistsAsync()) { string serializedEventHubInfo = await this.appLeaseInfoBlob.DownloadTextAsync(); - return Utils.DeserializeFromJson(serializedEventHubInfo); + appLeaseInfo = Utils.DeserializeFromJson(serializedEventHubInfo); } - return null; + if (appLeaseInfo == null) + { + throw new Exception("App lease info blob does not exist."); + } + return appLeaseInfo; } private class AppLeaseInfo { - public string OwnerId { get; set; } - public string DesiredSwapId { get; set; } + public string? OwnerId { get; set; } + public string? DesiredSwapId { get; set; } } } } diff --git a/src/DurableTask.AzureStorage/Partitioning/BlobPartitionLeaseManager.cs b/src/DurableTask.AzureStorage/Partitioning/BlobPartitionLeaseManager.cs index 932f5bd7f..85b52f316 100644 --- a/src/DurableTask.AzureStorage/Partitioning/BlobPartitionLeaseManager.cs +++ b/src/DurableTask.AzureStorage/Partitioning/BlobPartitionLeaseManager.cs @@ -10,7 +10,7 @@ // See the License for the specific language governing permissions and // limitations under the License. // ---------------------------------------------------------------------------------- - +#nullable enable namespace DurableTask.AzureStorage.Partitioning { using System; @@ -55,7 +55,8 @@ public BlobPartitionLeaseManager( this.blobDirectoryName = leaseType; this.leaseInterval = this.settings.LeaseInterval; - this.Initialize(); + this.taskHubContainer = this.azureStorageClient.GetBlobContainerReference(this.leaseContainerName); + this.taskHubInfoBlob = this.taskHubContainer.GetBlobReference(TaskHubInfoBlobName); } public Task LeaseStoreExistsAsync(CancellationToken cancellationToken = default) @@ -129,7 +130,7 @@ public async Task CreateLeaseIfNotExistAsync(string partitionId, CancellationTok } } - public async Task GetLeaseAsync(string partitionId, CancellationToken cancellationToken = default) + public async Task GetLeaseAsync(string partitionId, CancellationToken cancellationToken = default) { Blob leaseBlob = this.taskHubContainer.GetBlobReference(partitionId, this.blobDirectoryName); if (await leaseBlob.ExistsAsync(cancellationToken)) @@ -260,7 +261,7 @@ public async Task CreateTaskHubInfoIfNotExistAsync(TaskHubInfo taskHubInfo, Canc internal async Task GetOrCreateTaskHubInfoAsync(TaskHubInfo newTaskHubInfo, bool checkIfStale, CancellationToken cancellationToken = default) { - TaskHubInfo currentTaskHubInfo = await this.GetTaskHubInfoAsync(cancellationToken); + TaskHubInfo? currentTaskHubInfo = await this.GetTaskHubInfoAsync(cancellationToken); if (currentTaskHubInfo != null) { if (checkIfStale && IsStale(currentTaskHubInfo, newTaskHubInfo)) @@ -297,14 +298,7 @@ private bool IsStale(TaskHubInfo currentTaskHubInfo, TaskHubInfo newTaskHubInfo) || !currentTaskHubInfo.PartitionCount.Equals(newTaskHubInfo.PartitionCount); } - void Initialize() - { - this.taskHubContainer = this.azureStorageClient.GetBlobContainerReference(this.leaseContainerName); - - this.taskHubInfoBlob = this.taskHubContainer.GetBlobReference(TaskHubInfoBlobName); - } - - async Task GetTaskHubInfoAsync(CancellationToken cancellationToken) + async Task GetTaskHubInfoAsync(CancellationToken cancellationToken) { if (await this.taskHubInfoBlob.ExistsAsync(cancellationToken)) { @@ -318,10 +312,17 @@ async Task GetTaskHubInfoAsync(CancellationToken cancellationToken) async Task DownloadLeaseBlob(Blob blob, CancellationToken cancellationToken) { using BlobDownloadStreamingResult result = await blob.DownloadStreamingAsync(cancellationToken); - BlobPartitionLease deserializedLease = Utils.DeserializeFromJson(result.Content); - deserializedLease.Blob = blob; + BlobPartitionLease? deserializedLease = Utils.DeserializeFromJson(result.Content); - return deserializedLease; + if (deserializedLease == null) + { + throw new Exception($"Failed to deserialize lease blob: {blob.Name}"); + } + else + { + deserializedLease.Blob = blob; + return deserializedLease; + } } static Exception HandleStorageException(Lease lease, DurableTaskStorageException storageException, bool ignoreLeaseLost = false) diff --git a/src/DurableTask.AzureStorage/Partitioning/ILeaseManager.cs b/src/DurableTask.AzureStorage/Partitioning/ILeaseManager.cs index cf24b7291..285ce7c69 100644 --- a/src/DurableTask.AzureStorage/Partitioning/ILeaseManager.cs +++ b/src/DurableTask.AzureStorage/Partitioning/ILeaseManager.cs @@ -10,7 +10,7 @@ // See the License for the specific language governing permissions and // limitations under the License. // ---------------------------------------------------------------------------------- - +#nullable enable namespace DurableTask.AzureStorage.Partitioning { using System.Collections.Generic; @@ -27,7 +27,7 @@ interface ILeaseManager where T : Lease Task CreateLeaseIfNotExistAsync(string partitionId, CancellationToken cancellationToken = default); - Task GetLeaseAsync(string partitionId, CancellationToken cancellationToken = default); + Task GetLeaseAsync(string partitionId, CancellationToken cancellationToken = default); Task RenewAsync(T lease, CancellationToken cancellationToken = default); diff --git a/src/DurableTask.AzureStorage/Utils.cs b/src/DurableTask.AzureStorage/Utils.cs index 3e9ca4524..d6916013d 100644 --- a/src/DurableTask.AzureStorage/Utils.cs +++ b/src/DurableTask.AzureStorage/Utils.cs @@ -10,7 +10,7 @@ // See the License for the specific language governing permissions and // limitations under the License. // ---------------------------------------------------------------------------------- - +#nullable enable namespace DurableTask.AzureStorage { using System; @@ -143,7 +143,7 @@ public static bool TryGetTaskScheduledId(HistoryEvent historyEvent, out int task /// should be "SayHelloActivity" /// /// - public static string GetTargetClassName(this string s) + public static string? GetTargetClassName(this string s) { if (s == null) { @@ -191,9 +191,9 @@ public static string SerializeToJson(JsonSerializer serializer, object payload) /// The serializer whose config will guide the deserialization. /// The JSON-string to deserialize. /// - public static T DeserializeFromJson(JsonSerializer serializer, string jsonString) + public static T? DeserializeFromJson(JsonSerializer serializer, string jsonString) { - T obj; + T? obj; using (var reader = new StringReader(jsonString)) using (var jsonReader = new JsonTextReader(reader)) { @@ -209,7 +209,7 @@ public static T DeserializeFromJson(JsonSerializer serializer, string jsonStr /// The type to deserialize the JSON string into. /// A stream of UTF-8 JSON. /// The deserialized value. - public static T DeserializeFromJson(Stream stream) + public static T? DeserializeFromJson(Stream stream) { return DeserializeFromJson(DefaultJsonSerializer, stream); } @@ -222,7 +222,7 @@ public static T DeserializeFromJson(Stream stream) /// The serializer whose config will guide the deserialization. /// A stream of UTF-8 JSON. /// The deserialized value. - public static T DeserializeFromJson(JsonSerializer serializer, Stream stream) + public static T? DeserializeFromJson(JsonSerializer serializer, Stream stream) { using var reader = new StreamReader(stream, Encoding.UTF8); using var jsonReader = new JsonTextReader(reader); @@ -237,7 +237,7 @@ public static T DeserializeFromJson(JsonSerializer serializer, Stream stream) /// The type to deserialize the JSON string into. /// The JSON-string to deserialize. /// - public static T DeserializeFromJson(string jsonString) + public static T? DeserializeFromJson(string jsonString) { return DeserializeFromJson(DefaultJsonSerializer, jsonString); } @@ -249,7 +249,7 @@ public static T DeserializeFromJson(string jsonString) /// The JSON-string to deserialize. /// The expected de-serialization type. /// - public static object DeserializeFromJson(string jsonString, Type type) + public static object? DeserializeFromJson(string jsonString, Type type) { return DeserializeFromJson(DefaultJsonSerializer, jsonString, type); } @@ -262,9 +262,9 @@ public static object DeserializeFromJson(string jsonString, Type type) /// The JSON-string to deserialize. /// The expected de-serialization type. /// - public static object DeserializeFromJson(JsonSerializer serializer, string jsonString, Type type) + public static object? DeserializeFromJson(JsonSerializer serializer, string jsonString, Type type) { - object obj; + object? obj; using (var reader = new StringReader(jsonString)) using (var jsonReader = new JsonTextReader(reader)) {