Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions edge-agent/src/Microsoft.Azure.Devices.Edge.Agent.Core/Agent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ public class Agent

readonly IPlanner planner;
readonly IPlanRunner planRunner;
readonly ISuspendManager suspendManager;
readonly IReporter reporter;
readonly IConfigSource configSource;
readonly IModuleIdentityLifecycleManager moduleIdentityLifecycleManager;
readonly IEntityStore<string, string> configStore;
readonly IEnvironmentProvider environmentProvider;
readonly AsyncLock reconcileLock = new AsyncLock();
readonly ISerde<DeploymentConfigInfo> deploymentConfigInfoSerde;
readonly IEncryptionProvider encryptionProvider;
readonly IDeploymentMetrics deploymentMetrics;
Expand All @@ -43,6 +43,7 @@ public Agent(
IEnvironmentProvider environmentProvider,
IPlanner planner,
IPlanRunner planRunner,
ISuspendManager suspendManager,
IReporter reporter,
IModuleIdentityLifecycleManager moduleIdentityLifecycleManager,
IEntityStore<string, string> configStore,
Expand All @@ -55,6 +56,7 @@ public Agent(
this.configSource = Preconditions.CheckNotNull(configSource, nameof(configSource));
this.planner = Preconditions.CheckNotNull(planner, nameof(planner));
this.planRunner = Preconditions.CheckNotNull(planRunner, nameof(planRunner));
this.suspendManager = Preconditions.CheckNotNull(suspendManager, nameof(suspendManager));
this.reporter = Preconditions.CheckNotNull(reporter, nameof(reporter));
this.moduleIdentityLifecycleManager = Preconditions.CheckNotNull(moduleIdentityLifecycleManager, nameof(moduleIdentityLifecycleManager));
this.configStore = Preconditions.CheckNotNull(configStore, nameof(configStore));
Expand All @@ -73,6 +75,7 @@ public static async Task<Agent> Create(
IConfigSource configSource,
IPlanner planner,
IPlanRunner planRunner,
ISuspendManager suspendManager,
IReporter reporter,
IModuleIdentityLifecycleManager moduleIdentityLifecycleManager,
IEnvironmentProvider environmentProvider,
Expand Down Expand Up @@ -106,6 +109,7 @@ await deploymentConfigInfoJson.ForEachAsync(
environmentProvider,
planner,
planRunner,
suspendManager,
reporter,
moduleIdentityLifecycleManager,
configStore,
Expand All @@ -120,8 +124,13 @@ await deploymentConfigInfoJson.ForEachAsync(
public async Task ReconcileAsync(CancellationToken token)
{
ModuleSet moduleSetToReport = null;
using (await this.reconcileLock.LockAsync(token))
using (await this.suspendManager.BeginUpdateCycleAsync(token))
{
if (this.suspendManager.IsSuspended())
{
return;
}

try
{
Events.StartingReconcile();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright (c) Microsoft. All rights reserved.
namespace Microsoft.Azure.Devices.Edge.Agent.Core
{
using System;
using System.Threading;
using System.Threading.Tasks;

public interface ISuspendManager
{
Task<IDisposable> BeginUpdateCycleAsync(CancellationToken token);

bool IsSuspended();

Task SuspendUpdatesAsync(CancellationToken token);

Task ResumeUpdatesAsync(CancellationToken token);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright (c) Microsoft. All rights reserved.
namespace Microsoft.Azure.Devices.Edge.Agent.Core
{
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Devices.Edge.Util;
using Microsoft.Azure.Devices.Edge.Util.Concurrency;
using Microsoft.Extensions.Logging;

public class SuspendManager : ISuspendManager
{
static readonly TimeSpan maxSuspendPeriod = TimeSpan.FromMinutes(15);
readonly ISystemTime systemTime;
readonly AsyncLock reconcileLock = new AsyncLock();
DateTime suspendTime = DateTime.MaxValue;

public SuspendManager(ISystemTime systemTime)
{
this.systemTime = Preconditions.CheckNotNull(systemTime, nameof(systemTime));
}

public async Task<IDisposable> BeginUpdateCycleAsync(CancellationToken token)
=> await this.reconcileLock.LockAsync(token);

public bool IsSuspended()
{
if (this.suspendTime == DateTime.MaxValue)
{
return false;
}

TimeSpan elapsedTime = this.systemTime.UtcNow - this.suspendTime;
if (elapsedTime >= maxSuspendPeriod)
{
this.suspendTime = DateTime.MaxValue;
return false;
}

Events.UpdatesSuspended(elapsedTime, maxSuspendPeriod - elapsedTime);
return true;
}

public async Task SuspendUpdatesAsync(CancellationToken token)
{
// wait for update cycle to complete before returning
using var cycle = await this.reconcileLock.LockAsync(token);

// allow extending the suspend timeout each call
this.suspendTime = this.systemTime.UtcNow;
}

public Task ResumeUpdatesAsync(CancellationToken token)
{
this.suspendTime = DateTime.MaxValue;

// do not wait for update cycle
return Task.CompletedTask;
}

static class Events
{
const int IdStart = AgentEventIds.RestartManager + 50;
static readonly ILogger Log = Logger.Factory.CreateLogger<SuspendManager>();

enum EventIds
{
UpdatesSuspended = IdStart + 1
}

public static void UpdatesSuspended(TimeSpan elapsedTime, TimeSpan remainingTime)
{
Log.LogInformation(
(int)EventIds.UpdatesSuspended,
$"Updates have been suspended for {elapsedTime.Humanize()} (auto-resume in {remainingTime.Humanize()}).");
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright (c) Microsoft. All rights reserved.
namespace Microsoft.Azure.Devices.Edge.Agent.Core.Requests
{
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Devices.Edge.Util;

public class ResumeUpdatesRequestHandler : RequestHandlerBase<object, object>
{
readonly ISuspendManager suspendManager;

public override string RequestName => "ResumeUpdates";

public ResumeUpdatesRequestHandler(ISuspendManager suspendManager)
{
this.suspendManager = Preconditions.CheckNotNull(suspendManager, nameof(suspendManager));
}

protected override async Task<Option<object>> HandleRequestInternal(Option<object> payload, CancellationToken token)
{
await this.suspendManager.ResumeUpdatesAsync(token);

return Option.None<object>();
}

protected override Option<object> ParsePayload(Option<string> payloadJson)
=> Option.None<object>();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright (c) Microsoft. All rights reserved.
namespace Microsoft.Azure.Devices.Edge.Agent.Core.Requests
{
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Devices.Edge.Util;

public class SuspendUpdatesRequestHandler : RequestHandlerBase<object, object>
{
readonly ISuspendManager suspendManager;

public override string RequestName => "SuspendUpdates";

public SuspendUpdatesRequestHandler(ISuspendManager suspendManager)
{
this.suspendManager = Preconditions.CheckNotNull(suspendManager, nameof(suspendManager));
}

protected override async Task<Option<object>> HandleRequestInternal(Option<object> payload, CancellationToken token)
{
await this.suspendManager.SuspendUpdatesAsync(token);

return Option.None<object>();
}

protected override Option<object> ParsePayload(Option<string> payloadJson)
=> Option.None<object>();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,11 @@ protected override void Load(ContainerBuilder builder)
.As<Task<IEntityStore<string, string>>>()
.SingleInstance();

// ISuspendManager
builder.Register(c => new SuspendManager(SystemTime.Instance))
.As<ISuspendManager>()
.SingleInstance();

// IRestartManager
builder.Register(c => new RestartPolicyManager(this.maxRestartCount, this.coolOffTimeUnitInSeconds))
.As<IRestartPolicyManager>()
Expand Down Expand Up @@ -328,6 +333,7 @@ protected override void Load(ContainerBuilder builder)
var environmentProvider = c.Resolve<Task<IEnvironmentProvider>>();
var planner = c.Resolve<Task<IPlanner>>();
var planRunner = c.Resolve<IPlanRunner>();
var suspendManager = c.Resolve<ISuspendManager>();
var reporter = c.Resolve<IReporter>();
var moduleIdentityLifecycleManager = c.Resolve<IModuleIdentityLifecycleManager>();
var deploymentConfigInfoSerde = c.Resolve<ISerde<DeploymentConfigInfo>>();
Expand All @@ -338,6 +344,7 @@ protected override void Load(ContainerBuilder builder)
await configSource,
await planner,
planRunner,
suspendManager,
reporter,
moduleIdentityLifecycleManager,
await environmentProvider,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,26 @@ protected override void Load(ContainerBuilder builder)
.As<Task<IRequestHandler>>()
.SingleInstance();

// Task<IRequestHandler> - SuspendUpdatesRequestHandler
builder.Register(
c =>
{
IRequestHandler handler = new SuspendUpdatesRequestHandler(c.Resolve<ISuspendManager>());
return Task.FromResult(handler);
})
.As<Task<IRequestHandler>>()
.SingleInstance();

// Task<IRequestHandler> - ResumeUpdatesRequestHandler
builder.Register(
c =>
{
IRequestHandler handler = new ResumeUpdatesRequestHandler(c.Resolve<ISuspendManager>());
return Task.FromResult(handler);
})
.As<Task<IRequestHandler>>()
.SingleInstance();

// ISdkModuleClientProvider
builder.Register(c => new SdkModuleClientProvider())
.As<ISdkModuleClientProvider>()
Expand Down
Loading