diff --git a/sandbox/MauiApp1/MainPage.xaml b/sandbox/MauiApp1/MainPage.xaml index 13462f1b..06eba702 100644 --- a/sandbox/MauiApp1/MainPage.xaml +++ b/sandbox/MauiApp1/MainPage.xaml @@ -1,6 +1,8 @@  @@ -36,8 +38,11 @@ x:Name="CounterBtn" Text="Click me" SemanticProperties.Hint="Counts the number of times you click" - Clicked="OnCounterClicked" + Command="{Binding DoProcessingCommand}" HorizontalOptions="Fill" /> + + diff --git a/sandbox/MauiApp1/MainPage.xaml.cs b/sandbox/MauiApp1/MainPage.xaml.cs index 4e2dac55..e604bec8 100644 --- a/sandbox/MauiApp1/MainPage.xaml.cs +++ b/sandbox/MauiApp1/MainPage.xaml.cs @@ -1,4 +1,5 @@ -using R3; +using System.Diagnostics; +using R3; namespace MauiApp1; @@ -7,15 +8,35 @@ public class BasicUsagesViewModel : IDisposable public BindableReactiveProperty Input { get; } public BindableReactiveProperty Output { get; } + public ReactiveCommand DoProcessingCommand { get; } + + public IReadOnlyBindableReactiveProperty IsProcessing { get; } + public BasicUsagesViewModel() { Input = new BindableReactiveProperty(""); Output = Input.Select(x => x.ToUpper()).ToBindableReactiveProperty(""); + + DoProcessingCommand = new(DoProcessingAsync, AwaitOperation.Parallel); + + IsProcessing = + this.DoProcessingCommand + .IsExecuting + .ToReadOnlyBindableReactiveProperty(false); + } + + private async ValueTask DoProcessingAsync(Unit input, CancellationToken token) + { + var now = DateTime.Now; + + Debug.WriteLine($"Starting processing of input: {now}"); + await Task.Delay(TimeSpan.FromSeconds(10), token); + Debug.WriteLine($"Finished processing of input: {now}"); } public void Dispose() { - Disposable.Dispose(Input, Output); + Disposable.Dispose(Input, Output, DoProcessingCommand, IsProcessing); } } @@ -27,14 +48,6 @@ public MainPage() { InitializeComponent(); - var startDate = DateTime.Now; - Observable.Interval(TimeSpan.FromMilliseconds(500)) - .Subscribe(x => - { - throw new InvalidOperationException("oppeke"); - Label1.Text = $"Timer: {(DateTime.Now - startDate).TotalMilliseconds}"; - }); - var frameCount = 0; Observable.IntervalFrame(1) .Subscribe(x => @@ -45,8 +58,11 @@ public MainPage() this.Loaded += (sender, args) => { var viewModel = new BasicUsagesViewModel(); + BindingContext = viewModel; }; + + } void OnCounterClicked(object sender, EventArgs e) diff --git a/sandbox/MauiApp1/MauiApp1.csproj b/sandbox/MauiApp1/MauiApp1.csproj index 9aa99769..e9916d3b 100644 --- a/sandbox/MauiApp1/MauiApp1.csproj +++ b/sandbox/MauiApp1/MauiApp1.csproj @@ -32,8 +32,8 @@ 1.0 1 - 11.0 - 13.1 + 14.0 + 15.0 21.0 10.0.17763.0 10.0.17763.0 diff --git a/src/R3/Operators/SelectAwait.cs b/src/R3/Operators/SelectAwait.cs index c20ab8cb..48a761ae 100644 --- a/src/R3/Operators/SelectAwait.cs +++ b/src/R3/Operators/SelectAwait.cs @@ -9,6 +9,12 @@ public static Observable SelectAwait(this Observable sou { return new SelectAwait(source, selector, awaitOperation, configureAwait, cancelOnCompleted, maxConcurrent); } + + /// This option is only valid for AwaitOperation.Parallel and AwaitOperation.SequentialParallel. It sets the number of concurrent executions. If set to -1, there is no limit. + public static Observable SelectAwait(this Observable source, TState state, Func> selector, AwaitOperation awaitOperation = AwaitOperation.Sequential, bool configureAwait = true, bool cancelOnCompleted = false, int maxConcurrent = -1) + { + return new SelectAwait(source, state, selector, awaitOperation, configureAwait, cancelOnCompleted, maxConcurrent); + } } internal sealed class SelectAwait(Observable source, Func> selector, AwaitOperation awaitOperation, bool configureAwait, bool cancelOnCompleted, int maxConcurrent) : Observable @@ -274,3 +280,267 @@ protected override void PublishOnCompleted(Result result) } } } + +internal sealed class SelectAwait(Observable source, TState state, Func> selector, AwaitOperation awaitOperation, bool configureAwait, bool cancelOnCompleted, int maxConcurrent) : Observable +{ + protected override IDisposable SubscribeCore(Observer observer) + { + switch (awaitOperation) + { + case AwaitOperation.Sequential: + return source.Subscribe(new SelectAwaitSequential(observer, state, selector, configureAwait, cancelOnCompleted)); + case AwaitOperation.Drop: + return source.Subscribe(new SelectAwaitDrop(observer, state, selector, configureAwait, cancelOnCompleted)); + case AwaitOperation.Switch: + return source.Subscribe(new SelectAwaitSwitch(observer, state, selector, configureAwait, cancelOnCompleted)); + case AwaitOperation.Parallel: + if (maxConcurrent == -1) + { + return source.Subscribe(new SelectAwaitParallel(observer, state, selector, configureAwait, cancelOnCompleted)); + } + else + { + if (maxConcurrent == 0 || maxConcurrent < -1) throw new ArgumentException("maxConcurrent must be a -1 or greater than 1."); + return source.Subscribe(new SelectAwaitParallelConcurrentLimit(observer, state, selector, configureAwait, cancelOnCompleted, maxConcurrent)); + } + case AwaitOperation.SequentialParallel: + if (maxConcurrent == -1) + { + return source.Subscribe(new SelectAwaitSequentialParallel(observer, state, selector, configureAwait, cancelOnCompleted)); + } + else + { + if (maxConcurrent == 0 || maxConcurrent < -1) throw new ArgumentException("maxConcurrent must be a -1 or greater than 1."); + return source.Subscribe(new SelectAwaitSequentialParallelConcurrentLimit(observer, state, selector, configureAwait, cancelOnCompleted, maxConcurrent)); + } + case AwaitOperation.ThrottleFirstLast: + return source.Subscribe(new SelectAwaitThrottleFirstLast(observer, state, selector, configureAwait, cancelOnCompleted)); + default: + throw new ArgumentException(); + } + } + + sealed class SelectAwaitSequential(Observer observer, TState state, Func> selector, bool configureAwait, bool cancelOnCompleted) + : AwaitOperationSequentialObserver(configureAwait, cancelOnCompleted) + { +#if NET6_0_OR_GREATER + [AsyncMethodBuilderAttribute(typeof(PoolingAsyncValueTaskMethodBuilder))] +#endif + protected override async ValueTask OnNextAsync(T value, CancellationToken cancellationToken, bool configureAwait) + { + var v = await selector(value, state, cancellationToken).ConfigureAwait(configureAwait); + observer.OnNext(v); + } + + protected override void OnErrorResumeCore(Exception error) + { + observer.OnErrorResume(error); + } + + protected override void PublishOnCompleted(Result result) + { + observer.OnCompleted(result); + } + } + + sealed class SelectAwaitDrop(Observer observer, TState state, Func> selector, bool configureAwait, bool cancelOnCompleted) + : AwaitOperationDropObserver(configureAwait, cancelOnCompleted) + { +#if NET6_0_OR_GREATER + [AsyncMethodBuilderAttribute(typeof(PoolingAsyncValueTaskMethodBuilder))] +#endif + protected override async ValueTask OnNextAsync(T value, CancellationToken cancellationToken, bool configureAwait) + { + var v = await selector(value, state, cancellationToken).ConfigureAwait(configureAwait); + observer.OnNext(v); + } + + protected override void OnErrorResumeCore(Exception error) + { + observer.OnErrorResume(error); + } + + protected override void PublishOnCompleted(Result result) + { + observer.OnCompleted(result); + } + } + + sealed class SelectAwaitParallel(Observer observer, TState state, Func> selector, bool configureAwait, bool cancelOnCompleted) + : AwaitOperationParallelObserver(configureAwait, cancelOnCompleted) + { + +#if NET6_0_OR_GREATER + [AsyncMethodBuilderAttribute(typeof(PoolingAsyncValueTaskMethodBuilder))] +#endif + protected override async ValueTask OnNextAsync(T value, CancellationToken cancellationToken, bool configureAwait) + { + var v = await selector(value, state, cancellationToken).ConfigureAwait(configureAwait); + lock (gate) + { + observer.OnNext(v); + } + } + + protected override void OnErrorResumeCore(Exception error) + { + lock (gate) + { + observer.OnErrorResume(error); + } + } + + protected override void PublishOnCompleted(Result result) + { + lock (gate) + { + observer.OnCompleted(result); + } + } + } + + + sealed class SelectAwaitSwitch(Observer observer, TState state, Func> selector, bool configureAwait, bool cancelOnCompleted) + : AwaitOperationSwitchObserver(configureAwait, cancelOnCompleted) + { + protected override void OnErrorResumeCore(Exception error) + { + lock (gate) + { + observer.OnErrorResume(error); + } + } + + protected override async ValueTask OnNextAsync(T value, CancellationToken cancellationToken, bool configureAwait) + { + var v = await selector(value, state, cancellationToken).ConfigureAwait(configureAwait); + lock (gate) + { + if (!cancellationToken.IsCancellationRequested) + { + observer.OnNext(v); + } + } + } + + protected override void PublishOnCompleted(Result result) + { + lock (gate) + { + observer.OnCompleted(result); + } + } + } + + sealed class SelectAwaitSequentialParallel(Observer observer, TState state, Func> selector, bool configureAwait, bool cancelOnCompleted) + : AwaitOperationSequentialParallelObserver(configureAwait, cancelOnCompleted) + { + +#if NET6_0_OR_GREATER + [AsyncMethodBuilderAttribute(typeof(PoolingAsyncValueTaskMethodBuilder))] +#endif + protected override ValueTask OnNextTaskAsync(T value, CancellationToken cancellationToken, bool configureAwait) + { + return selector(value, state, cancellationToken); + } + + protected override void PublishOnNext(T _, TResult result) + { + observer.OnNext(result); + } + + protected override void OnErrorResumeCore(Exception error) + { + observer.OnErrorResume(error); + } + + protected override void PublishOnCompleted(Result result) + { + observer.OnCompleted(result); + } + } + + sealed class SelectAwaitParallelConcurrentLimit(Observer observer, TState state, Func> selector, bool configureAwait, bool cancelOnCompleted, int maxConcurrent) + : AwaitOperationParallelConcurrentLimitObserver(configureAwait, cancelOnCompleted, maxConcurrent) + { + +#if NET6_0_OR_GREATER + [AsyncMethodBuilderAttribute(typeof(PoolingAsyncValueTaskMethodBuilder))] +#endif + protected override async ValueTask OnNextAsync(T value, CancellationToken cancellationToken, bool configureAwait) + { + var v = await selector(value, state, cancellationToken).ConfigureAwait(configureAwait); + lock (gate) + { + observer.OnNext(v); + } + } + + protected override void OnErrorResumeCore(Exception error) + { + lock (gate) + { + observer.OnErrorResume(error); + } + } + + protected override void PublishOnCompleted(Result result) + { + lock (gate) + { + observer.OnCompleted(result); + } + } + } + + sealed class SelectAwaitSequentialParallelConcurrentLimit(Observer observer, TState state, Func> selector, bool configureAwait, bool cancelOnCompleted, int maxConcurrent) + : AwaitOperationSequentialParallelConcurrentLimitObserver(configureAwait, cancelOnCompleted, maxConcurrent) + { + +#if NET6_0_OR_GREATER + [AsyncMethodBuilderAttribute(typeof(PoolingAsyncValueTaskMethodBuilder))] +#endif + protected override ValueTask OnNextTaskAsyncCore(T value, CancellationToken cancellationToken, bool configureAwait) + { + return selector(value, state, cancellationToken); + } + + protected override void PublishOnNext(T _, TResult result) + { + observer.OnNext(result); + } + + protected override void OnErrorResumeCore(Exception error) + { + observer.OnErrorResume(error); + } + + protected override void PublishOnCompleted(Result result) + { + observer.OnCompleted(result); + } + } + + sealed class SelectAwaitThrottleFirstLast(Observer observer, TState state, Func> selector, bool configureAwait, bool cancelOnCompleted) + : AwaitOperationThrottleFirstLastObserver(configureAwait, cancelOnCompleted) + { +#if NET6_0_OR_GREATER + [AsyncMethodBuilderAttribute(typeof(PoolingAsyncValueTaskMethodBuilder))] +#endif + protected override async ValueTask OnNextAsync(T value, CancellationToken cancellationToken, bool configureAwait) + { + var v = await selector(value, state, cancellationToken).ConfigureAwait(configureAwait); + observer.OnNext(v); + } + + protected override void OnErrorResumeCore(Exception error) + { + observer.OnErrorResume(error); + } + + protected override void PublishOnCompleted(Result result) + { + observer.OnCompleted(result); + } + } +} diff --git a/src/R3/Operators/SubscribeAwait.cs b/src/R3/Operators/SubscribeAwait.cs index 8563573e..d8a63460 100644 --- a/src/R3/Operators/SubscribeAwait.cs +++ b/src/R3/Operators/SubscribeAwait.cs @@ -46,6 +46,47 @@ public static IDisposable SubscribeAwait(this Observable source, FuncThis option is only valid for AwaitOperation.Parallel and AwaitOperation.SequentialParallel. It sets the number of concurrent executions. If set to -1, there is no limit. + public static IDisposable SubscribeAwait(this Observable source, TState state, Func onNextAsync, AwaitOperation awaitOperation = AwaitOperation.Sequential, bool configureAwait = true, bool cancelOnCompleted = false, int maxConcurrent = -1) + { + return SubscribeAwait(source, state, onNextAsync, Stubs.HandleException, Stubs.HandleResult, awaitOperation, configureAwait, cancelOnCompleted, maxConcurrent); + } + + /// This option is only valid for AwaitOperation.Parallel and AwaitOperation.SequentialParallel. It sets the number of concurrent executions. If set to -1, there is no limit. + public static IDisposable SubscribeAwait(this Observable source, TState state, Func onNextAsync, Action onCompleted, AwaitOperation awaitOperation = AwaitOperation.Sequential, bool configureAwait = true, bool cancelOnCompleted = false, int maxConcurrent = -1) + { + return SubscribeAwait(source, state, onNextAsync, Stubs.HandleException, onCompleted, awaitOperation, configureAwait, cancelOnCompleted, maxConcurrent); + } + + public static IDisposable SubscribeAwait(this Observable source, TState state, Func onNextAsync, Action onErrorResume, Action onCompleted, AwaitOperation awaitOperation = AwaitOperation.Sequential, bool configureAwait = true, bool cancelOnCompleted = false, int maxConcurrent = -1) + { + switch (awaitOperation) + { + case AwaitOperation.Sequential: + return source.Subscribe(new SubscribeAwaitSequential(state, onNextAsync, onErrorResume, onCompleted, configureAwait, cancelOnCompleted)); + case AwaitOperation.Drop: + return source.Subscribe(new SubscribeAwaitDrop(state, onNextAsync, onErrorResume, onCompleted, configureAwait, cancelOnCompleted)); + case AwaitOperation.Parallel: + if (maxConcurrent == -1) + { + return source.Subscribe(new SubscribeAwaitParallel(state, onNextAsync, onErrorResume, onCompleted, configureAwait, cancelOnCompleted)); + } + else + { + if (maxConcurrent == 0 || maxConcurrent < -1) throw new ArgumentException("maxConcurrent must be a -1 or greater than 1."); + return source.Subscribe(new SubscribeAwaitParallelConcurrentLimit(state, onNextAsync, onErrorResume, onCompleted, configureAwait, cancelOnCompleted, maxConcurrent)); + } + case AwaitOperation.Switch: + return source.Subscribe(new SubscribeAwaitSwitch(state, onNextAsync, onErrorResume, onCompleted, configureAwait, cancelOnCompleted)); + case AwaitOperation.SequentialParallel: + throw new ArgumentException("SubscribeAwait does not support SequentialParallel. Use Sequential for sequential operation, use parallel for parallel operation instead."); + case AwaitOperation.ThrottleFirstLast: + return source.Subscribe(new SubscribeAwaitThrottleFirstLast(state, onNextAsync, onErrorResume, onCompleted, configureAwait, cancelOnCompleted)); + default: + throw new ArgumentException(); + } + } } internal sealed class SubscribeAwaitSequential(Func onNextAsync, Action onErrorResume, Action onCompleted, bool configureAwait, bool cancelOnCompleted) @@ -67,6 +108,25 @@ protected override void PublishOnCompleted(Result result) } } +internal sealed class SubscribeAwaitSequential(TState state, Func onNextAsync, Action onErrorResume, Action onCompleted, bool configureAwait, bool cancelOnCompleted) + : AwaitOperationSequentialObserver(configureAwait, cancelOnCompleted) +{ + protected override ValueTask OnNextAsync(T value, CancellationToken cancellationToken, bool configureAwait) + { + return onNextAsync(value, state, cancellationToken); + } + + protected override void OnErrorResumeCore(Exception error) + { + onErrorResume(error, state); + } + + protected override void PublishOnCompleted(Result result) + { + onCompleted(result, state); + } +} + internal sealed class SubscribeAwaitDrop(Func onNextAsync, Action onErrorResume, Action onCompleted, bool configureAwait, bool cancelOnCompleted) : AwaitOperationDropObserver(configureAwait, cancelOnCompleted) { @@ -86,6 +146,25 @@ protected override void PublishOnCompleted(Result result) } } +internal sealed class SubscribeAwaitDrop(TState state, Func onNextAsync, Action onErrorResume, Action onCompleted, bool configureAwait, bool cancelOnCompleted) + : AwaitOperationDropObserver(configureAwait, cancelOnCompleted) +{ + protected override ValueTask OnNextAsync(T value, CancellationToken cancellationToken, bool configureAwait) + { + return onNextAsync(value, state, cancellationToken); + } + + protected override void OnErrorResumeCore(Exception error) + { + onErrorResume(error, state); + } + + protected override void PublishOnCompleted(Result result) + { + onCompleted(result, state); + } +} + sealed class SubscribeAwaitParallel(Func onNextAsync, Action onErrorResume, Action onCompleted, bool configureAwait, bool cancelOnCompleted) : AwaitOperationParallelObserver(configureAwait, cancelOnCompleted) { @@ -111,6 +190,31 @@ protected override void PublishOnCompleted(Result result) } } +sealed class SubscribeAwaitParallel(TState state, Func onNextAsync, Action onErrorResume, Action onCompleted, bool configureAwait, bool cancelOnCompleted) + : AwaitOperationParallelObserver(configureAwait, cancelOnCompleted) +{ + protected override ValueTask OnNextAsync(T value, CancellationToken cancellationToken, bool configureAwait) + { + return onNextAsync(value, state, cancellationToken); + } + + protected override void OnErrorResumeCore(Exception error) + { + lock (gate) + { + onErrorResume(error, state); + } + } + + protected override void PublishOnCompleted(Result result) + { + lock (gate) + { + onCompleted(result, state); + } + } +} + sealed class SubscribeAwaitSwitch(Func onNextAsync, Action onErrorResume, Action onCompleted, bool configureAwait, bool cancelOnCompleted) : AwaitOperationSwitchObserver(configureAwait, cancelOnCompleted) { @@ -136,6 +240,31 @@ protected override void PublishOnCompleted(Result result) } } +sealed class SubscribeAwaitSwitch(TState state, Func onNextAsync, Action onErrorResume, Action onCompleted, bool configureAwait, bool cancelOnCompleted) + : AwaitOperationSwitchObserver(configureAwait, cancelOnCompleted) +{ + protected override ValueTask OnNextAsync(T value, CancellationToken cancellationToken, bool configureAwait) + { + return onNextAsync(value, state, cancellationToken); + } + + protected override void OnErrorResumeCore(Exception error) + { + lock (gate) + { + onErrorResume(error, state); + } + } + + protected override void PublishOnCompleted(Result result) + { + lock (gate) + { + onCompleted(result, state); + } + } +} + sealed class SubscribeAwaitParallelConcurrentLimit(Func onNextAsync, Action onErrorResume, Action onCompleted, bool configureAwait, bool cancelOnCompleted, int maxConcurrent) : AwaitOperationParallelConcurrentLimitObserver(configureAwait, cancelOnCompleted, maxConcurrent) { @@ -161,6 +290,31 @@ protected override void PublishOnCompleted(Result result) } } +sealed class SubscribeAwaitParallelConcurrentLimit(TState state, Func onNextAsync, Action onErrorResume, Action onCompleted, bool configureAwait, bool cancelOnCompleted, int maxConcurrent) + : AwaitOperationParallelConcurrentLimitObserver(configureAwait, cancelOnCompleted, maxConcurrent) +{ + protected override ValueTask OnNextAsync(T value, CancellationToken cancellationToken, bool configureAwait) + { + return onNextAsync(value, state, cancellationToken); + } + + protected override void OnErrorResumeCore(Exception error) + { + lock (gate) + { + onErrorResume(error, state); + } + } + + protected override void PublishOnCompleted(Result result) + { + lock (gate) + { + onCompleted(result, state); + } + } +} + internal sealed class SubscribeAwaitThrottleFirstLast(Func onNextAsync, Action onErrorResume, Action onCompleted, bool configureAwait, bool cancelOnCompleted) : AwaitOperationThrottleFirstLastObserver(configureAwait, cancelOnCompleted) { @@ -179,3 +333,22 @@ protected override void PublishOnCompleted(Result result) onCompleted(result); } } + +internal sealed class SubscribeAwaitThrottleFirstLast(TState state, Func onNextAsync, Action onErrorResume, Action onCompleted, bool configureAwait, bool cancelOnCompleted) + : AwaitOperationThrottleFirstLastObserver(configureAwait, cancelOnCompleted) +{ + protected override ValueTask OnNextAsync(T value, CancellationToken cancellationToken, bool configureAwait) + { + return onNextAsync(value, state, cancellationToken); + } + + protected override void OnErrorResumeCore(Exception error) + { + onErrorResume(error, state); + } + + protected override void PublishOnCompleted(Result result) + { + onCompleted(result, state); + } +} diff --git a/src/R3/ReactiveCommand.cs b/src/R3/ReactiveCommand.cs index 89aeb8a4..20ca7f59 100644 --- a/src/R3/ReactiveCommand.cs +++ b/src/R3/ReactiveCommand.cs @@ -9,9 +9,13 @@ public class ReactiveCommand : Observable, ICommand, IDisposable CompleteState completeState; // struct(int, IntPtr) IDisposable subscription; // from canExecuteSource (and onNext). bool canExecute; // set from observable sequence + int executionCount; + readonly object gate = new(); public event EventHandler? CanExecuteChanged; + public ReactiveProperty IsExecuting { get; } = new(); + public ReactiveCommand() { this.list = new FreeListCore(this); @@ -23,14 +27,14 @@ public ReactiveCommand(Action execute) { this.list = new FreeListCore(this); this.canExecute = true; - this.subscription = this.Subscribe(execute); + this.subscription = this.Subscribe((HandleExecutionAction: (Action>)HandleExecution, Action: execute), static (value, state) => state.HandleExecutionAction(value, state.Action)); } public ReactiveCommand(Func executeAsync, AwaitOperation awaitOperation = AwaitOperation.Sequential, bool configureAwait = true, bool cancelOnCompleted = false, int maxSequential = -1) { this.list = new FreeListCore(this); this.canExecute = true; - this.subscription = this.SubscribeAwait(executeAsync, awaitOperation, configureAwait, cancelOnCompleted, maxSequential); + this.subscription = this.SubscribeAwait((HandleAsyncExecutionFunc: (Func, CancellationToken, ValueTask>)HandleAsyncExecution, Func: executeAsync), static (value, state, cancellationToken) => state.HandleAsyncExecutionFunc(value, state.Func, cancellationToken), awaitOperation, configureAwait, cancelOnCompleted, maxSequential); } public ReactiveCommand(Observable canExecuteSource, bool initialCanExecute) @@ -89,6 +93,56 @@ internal void CombineSubscription(IDisposable disposable) this.subscription = Disposable.Combine(this.subscription, disposable); } + private void HandleExecution(T value, Action execute) + { + try + { + lock (gate) + { + executionCount++; + IsExecuting.Value = executionCount > 0; + } + + execute(value); + } + finally + { + lock (gate) + { + executionCount--; + if (executionCount == 0) + { + IsExecuting.Value = false; + } + } + } + } + + private async ValueTask HandleAsyncExecution(T value, Func executeAsync, CancellationToken cancellationToken) + { + try + { + lock (gate) + { + executionCount++; + IsExecuting.Value = executionCount > 0; + } + + await executeAsync(value, cancellationToken); + } + finally + { + lock (gate) + { + executionCount--; + if (executionCount == 0) + { + IsExecuting.Value = false; + } + } + } + } + protected override IDisposable SubscribeCore(Observer observer) { var result = completeState.TryGetResult(); @@ -130,6 +184,7 @@ public void Dispose(bool callOnCompleted) } } + IsExecuting.Dispose(); list.Dispose(); subscription.Dispose(); } @@ -166,12 +221,16 @@ public class ReactiveCommand : Observable, ICommand, I CompleteState completeState; // struct(int, IntPtr) bool canExecute; // set from observable sequence IDisposable subscription; + int executionCount; + readonly object gate = new(); readonly Func? convert; // for sync SingleAssignmentSubject? asyncInput; // for async public event EventHandler? CanExecuteChanged; + public ReactiveProperty IsExecuting { get; } = new(); + public ReactiveCommand(Func convert) { this.list = new FreeListCore(this); @@ -185,7 +244,7 @@ public ReactiveCommand(Func> conve this.list = new FreeListCore(this); this.canExecute = true; this.asyncInput = new SingleAssignmentSubject(); - this.subscription = asyncInput.SelectAwait(convertAsync, awaitOperation, configureAwait, cancelOnCompleted, maxSequential).Subscribe(this, static (x, state) => + this.subscription = asyncInput.SelectAwait((Command: this, ConvertAsync: convertAsync), static (value, state, cancellationToken) => state.Command.HandleAsyncExecution(value, state.ConvertAsync, cancellationToken), awaitOperation, configureAwait, cancelOnCompleted, maxSequential).Subscribe(this, static (x, state) => { if (state.completeState.IsCompleted) return; @@ -280,6 +339,56 @@ public void Execute(TInput parameter) } } + private void HandleExecution(TInput value, Action execute) + { + try + { + lock (gate) + { + executionCount++; + IsExecuting.Value = executionCount > 0; + } + + execute(value); + } + finally + { + lock (gate) + { + executionCount--; + if (executionCount == 0) + { + IsExecuting.Value = false; + } + } + } + } + + private async ValueTask HandleAsyncExecution(TInput value, Func> executeAsync, CancellationToken cancellationToken) + { + try + { + lock (gate) + { + executionCount++; + IsExecuting.Value = executionCount > 0; + } + + return await executeAsync(value, cancellationToken); + } + finally + { + lock (gate) + { + executionCount--; + if (executionCount == 0) + { + IsExecuting.Value = false; + } + } + } + } + protected override IDisposable SubscribeCore(Observer observer) { var result = completeState.TryGetResult(); @@ -420,7 +529,7 @@ public static ReactiveCommand ToReactiveCommand( { var command = new ReactiveCommand(canExecuteSource, initialCanExecute); - var subscription = command.SubscribeAwait(async (x, ct) => await executeAsync(x, ct), awaitOperation, configureAwait, cancelOnCompleted, maxSequential); + var subscription = command.SubscribeAwait(executeAsync, static async (x, func, ct) => await func(x, ct), awaitOperation, configureAwait, cancelOnCompleted, maxSequential); command.CombineSubscription(subscription); return command;