From 8a724552ae49e9c240cb7eddc73474e95bbda1f7 Mon Sep 17 00:00:00 2001 From: Michael Stonis Date: Fri, 29 Nov 2024 18:37:50 -0600 Subject: [PATCH 1/7] Add support for state in SubscribeAwait methods - Introduced new overloads of `SubscribeAwait` to accept a state parameter - Updated existing async operators to handle state --- src/R3/Operators/SubscribeAwait.cs | 173 +++++++++++++++++++++++++++++ 1 file changed, 173 insertions(+) diff --git a/src/R3/Operators/SubscribeAwait.cs b/src/R3/Operators/SubscribeAwait.cs index 8563573e..d8a63460 100644 --- a/src/R3/Operators/SubscribeAwait.cs +++ b/src/R3/Operators/SubscribeAwait.cs @@ -46,6 +46,47 @@ public static IDisposable SubscribeAwait(this Observable source, FuncThis option is only valid for AwaitOperation.Parallel and AwaitOperation.SequentialParallel. It sets the number of concurrent executions. If set to -1, there is no limit. + public static IDisposable SubscribeAwait(this Observable source, TState state, Func onNextAsync, AwaitOperation awaitOperation = AwaitOperation.Sequential, bool configureAwait = true, bool cancelOnCompleted = false, int maxConcurrent = -1) + { + return SubscribeAwait(source, state, onNextAsync, Stubs.HandleException, Stubs.HandleResult, awaitOperation, configureAwait, cancelOnCompleted, maxConcurrent); + } + + /// This option is only valid for AwaitOperation.Parallel and AwaitOperation.SequentialParallel. It sets the number of concurrent executions. If set to -1, there is no limit. + public static IDisposable SubscribeAwait(this Observable source, TState state, Func onNextAsync, Action onCompleted, AwaitOperation awaitOperation = AwaitOperation.Sequential, bool configureAwait = true, bool cancelOnCompleted = false, int maxConcurrent = -1) + { + return SubscribeAwait(source, state, onNextAsync, Stubs.HandleException, onCompleted, awaitOperation, configureAwait, cancelOnCompleted, maxConcurrent); + } + + public static IDisposable SubscribeAwait(this Observable source, TState state, Func onNextAsync, Action onErrorResume, Action onCompleted, AwaitOperation awaitOperation = AwaitOperation.Sequential, bool configureAwait = true, bool cancelOnCompleted = false, int maxConcurrent = -1) + { + switch (awaitOperation) + { + case AwaitOperation.Sequential: + return source.Subscribe(new SubscribeAwaitSequential(state, onNextAsync, onErrorResume, onCompleted, configureAwait, cancelOnCompleted)); + case AwaitOperation.Drop: + return source.Subscribe(new SubscribeAwaitDrop(state, onNextAsync, onErrorResume, onCompleted, configureAwait, cancelOnCompleted)); + case AwaitOperation.Parallel: + if (maxConcurrent == -1) + { + return source.Subscribe(new SubscribeAwaitParallel(state, onNextAsync, onErrorResume, onCompleted, configureAwait, cancelOnCompleted)); + } + else + { + if (maxConcurrent == 0 || maxConcurrent < -1) throw new ArgumentException("maxConcurrent must be a -1 or greater than 1."); + return source.Subscribe(new SubscribeAwaitParallelConcurrentLimit(state, onNextAsync, onErrorResume, onCompleted, configureAwait, cancelOnCompleted, maxConcurrent)); + } + case AwaitOperation.Switch: + return source.Subscribe(new SubscribeAwaitSwitch(state, onNextAsync, onErrorResume, onCompleted, configureAwait, cancelOnCompleted)); + case AwaitOperation.SequentialParallel: + throw new ArgumentException("SubscribeAwait does not support SequentialParallel. Use Sequential for sequential operation, use parallel for parallel operation instead."); + case AwaitOperation.ThrottleFirstLast: + return source.Subscribe(new SubscribeAwaitThrottleFirstLast(state, onNextAsync, onErrorResume, onCompleted, configureAwait, cancelOnCompleted)); + default: + throw new ArgumentException(); + } + } } internal sealed class SubscribeAwaitSequential(Func onNextAsync, Action onErrorResume, Action onCompleted, bool configureAwait, bool cancelOnCompleted) @@ -67,6 +108,25 @@ protected override void PublishOnCompleted(Result result) } } +internal sealed class SubscribeAwaitSequential(TState state, Func onNextAsync, Action onErrorResume, Action onCompleted, bool configureAwait, bool cancelOnCompleted) + : AwaitOperationSequentialObserver(configureAwait, cancelOnCompleted) +{ + protected override ValueTask OnNextAsync(T value, CancellationToken cancellationToken, bool configureAwait) + { + return onNextAsync(value, state, cancellationToken); + } + + protected override void OnErrorResumeCore(Exception error) + { + onErrorResume(error, state); + } + + protected override void PublishOnCompleted(Result result) + { + onCompleted(result, state); + } +} + internal sealed class SubscribeAwaitDrop(Func onNextAsync, Action onErrorResume, Action onCompleted, bool configureAwait, bool cancelOnCompleted) : AwaitOperationDropObserver(configureAwait, cancelOnCompleted) { @@ -86,6 +146,25 @@ protected override void PublishOnCompleted(Result result) } } +internal sealed class SubscribeAwaitDrop(TState state, Func onNextAsync, Action onErrorResume, Action onCompleted, bool configureAwait, bool cancelOnCompleted) + : AwaitOperationDropObserver(configureAwait, cancelOnCompleted) +{ + protected override ValueTask OnNextAsync(T value, CancellationToken cancellationToken, bool configureAwait) + { + return onNextAsync(value, state, cancellationToken); + } + + protected override void OnErrorResumeCore(Exception error) + { + onErrorResume(error, state); + } + + protected override void PublishOnCompleted(Result result) + { + onCompleted(result, state); + } +} + sealed class SubscribeAwaitParallel(Func onNextAsync, Action onErrorResume, Action onCompleted, bool configureAwait, bool cancelOnCompleted) : AwaitOperationParallelObserver(configureAwait, cancelOnCompleted) { @@ -111,6 +190,31 @@ protected override void PublishOnCompleted(Result result) } } +sealed class SubscribeAwaitParallel(TState state, Func onNextAsync, Action onErrorResume, Action onCompleted, bool configureAwait, bool cancelOnCompleted) + : AwaitOperationParallelObserver(configureAwait, cancelOnCompleted) +{ + protected override ValueTask OnNextAsync(T value, CancellationToken cancellationToken, bool configureAwait) + { + return onNextAsync(value, state, cancellationToken); + } + + protected override void OnErrorResumeCore(Exception error) + { + lock (gate) + { + onErrorResume(error, state); + } + } + + protected override void PublishOnCompleted(Result result) + { + lock (gate) + { + onCompleted(result, state); + } + } +} + sealed class SubscribeAwaitSwitch(Func onNextAsync, Action onErrorResume, Action onCompleted, bool configureAwait, bool cancelOnCompleted) : AwaitOperationSwitchObserver(configureAwait, cancelOnCompleted) { @@ -136,6 +240,31 @@ protected override void PublishOnCompleted(Result result) } } +sealed class SubscribeAwaitSwitch(TState state, Func onNextAsync, Action onErrorResume, Action onCompleted, bool configureAwait, bool cancelOnCompleted) + : AwaitOperationSwitchObserver(configureAwait, cancelOnCompleted) +{ + protected override ValueTask OnNextAsync(T value, CancellationToken cancellationToken, bool configureAwait) + { + return onNextAsync(value, state, cancellationToken); + } + + protected override void OnErrorResumeCore(Exception error) + { + lock (gate) + { + onErrorResume(error, state); + } + } + + protected override void PublishOnCompleted(Result result) + { + lock (gate) + { + onCompleted(result, state); + } + } +} + sealed class SubscribeAwaitParallelConcurrentLimit(Func onNextAsync, Action onErrorResume, Action onCompleted, bool configureAwait, bool cancelOnCompleted, int maxConcurrent) : AwaitOperationParallelConcurrentLimitObserver(configureAwait, cancelOnCompleted, maxConcurrent) { @@ -161,6 +290,31 @@ protected override void PublishOnCompleted(Result result) } } +sealed class SubscribeAwaitParallelConcurrentLimit(TState state, Func onNextAsync, Action onErrorResume, Action onCompleted, bool configureAwait, bool cancelOnCompleted, int maxConcurrent) + : AwaitOperationParallelConcurrentLimitObserver(configureAwait, cancelOnCompleted, maxConcurrent) +{ + protected override ValueTask OnNextAsync(T value, CancellationToken cancellationToken, bool configureAwait) + { + return onNextAsync(value, state, cancellationToken); + } + + protected override void OnErrorResumeCore(Exception error) + { + lock (gate) + { + onErrorResume(error, state); + } + } + + protected override void PublishOnCompleted(Result result) + { + lock (gate) + { + onCompleted(result, state); + } + } +} + internal sealed class SubscribeAwaitThrottleFirstLast(Func onNextAsync, Action onErrorResume, Action onCompleted, bool configureAwait, bool cancelOnCompleted) : AwaitOperationThrottleFirstLastObserver(configureAwait, cancelOnCompleted) { @@ -179,3 +333,22 @@ protected override void PublishOnCompleted(Result result) onCompleted(result); } } + +internal sealed class SubscribeAwaitThrottleFirstLast(TState state, Func onNextAsync, Action onErrorResume, Action onCompleted, bool configureAwait, bool cancelOnCompleted) + : AwaitOperationThrottleFirstLastObserver(configureAwait, cancelOnCompleted) +{ + protected override ValueTask OnNextAsync(T value, CancellationToken cancellationToken, bool configureAwait) + { + return onNextAsync(value, state, cancellationToken); + } + + protected override void OnErrorResumeCore(Exception error) + { + onErrorResume(error, state); + } + + protected override void PublishOnCompleted(Result result) + { + onCompleted(result, state); + } +} From d165a988c0132d9fd456565fc151df82f99f8fa6 Mon Sep 17 00:00:00 2001 From: Michael Stonis Date: Sat, 30 Nov 2024 01:24:09 -0600 Subject: [PATCH 2/7] Add execution tracking to ReactiveCommand - Introduced `IsExecuting` property to track command execution state. - Added `executionCount` field for managing concurrent executions. - Updated synchronous and asynchronous execution methods to handle the new execution logic. - Ensured proper disposal of resources related to command execution. --- R3.sln | 1 + sandbox/MauiApp1/MainPage.xaml | 7 +++++- sandbox/MauiApp1/MainPage.xaml.cs | 36 ++++++++++++++++++++-------- sandbox/MauiApp1/MauiApp1.csproj | 4 ++-- src/R3/ReactiveCommand.cs | 40 +++++++++++++++++++++++++++++-- 5 files changed, 73 insertions(+), 15 deletions(-) diff --git a/R3.sln b/R3.sln index 6ca7a828..67510c1d 100644 --- a/R3.sln +++ b/R3.sln @@ -320,6 +320,7 @@ Global {2CD257D7-DF21-4D60-AC05-747D83236E5A}.Release|x86.ActiveCfg = Release|Any CPU {2CD257D7-DF21-4D60-AC05-747D83236E5A}.Release|x86.Build.0 = Release|Any CPU {2CD257D7-DF21-4D60-AC05-747D83236E5A}.Release|x86.Deploy.0 = Release|Any CPU + {2CD257D7-DF21-4D60-AC05-747D83236E5A}.Debug|Any CPU.Build.0 = Debug|Any CPU {F1D6609C-AA33-4099-8932-BADBCB935FBD}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {F1D6609C-AA33-4099-8932-BADBCB935FBD}.Debug|Any CPU.Build.0 = Debug|Any CPU {F1D6609C-AA33-4099-8932-BADBCB935FBD}.Debug|ARM64.ActiveCfg = Debug|Any CPU diff --git a/sandbox/MauiApp1/MainPage.xaml b/sandbox/MauiApp1/MainPage.xaml index 13462f1b..06eba702 100644 --- a/sandbox/MauiApp1/MainPage.xaml +++ b/sandbox/MauiApp1/MainPage.xaml @@ -1,6 +1,8 @@  @@ -36,8 +38,11 @@ x:Name="CounterBtn" Text="Click me" SemanticProperties.Hint="Counts the number of times you click" - Clicked="OnCounterClicked" + Command="{Binding DoProcessingCommand}" HorizontalOptions="Fill" /> + + diff --git a/sandbox/MauiApp1/MainPage.xaml.cs b/sandbox/MauiApp1/MainPage.xaml.cs index 4e2dac55..e604bec8 100644 --- a/sandbox/MauiApp1/MainPage.xaml.cs +++ b/sandbox/MauiApp1/MainPage.xaml.cs @@ -1,4 +1,5 @@ -using R3; +using System.Diagnostics; +using R3; namespace MauiApp1; @@ -7,15 +8,35 @@ public class BasicUsagesViewModel : IDisposable public BindableReactiveProperty Input { get; } public BindableReactiveProperty Output { get; } + public ReactiveCommand DoProcessingCommand { get; } + + public IReadOnlyBindableReactiveProperty IsProcessing { get; } + public BasicUsagesViewModel() { Input = new BindableReactiveProperty(""); Output = Input.Select(x => x.ToUpper()).ToBindableReactiveProperty(""); + + DoProcessingCommand = new(DoProcessingAsync, AwaitOperation.Parallel); + + IsProcessing = + this.DoProcessingCommand + .IsExecuting + .ToReadOnlyBindableReactiveProperty(false); + } + + private async ValueTask DoProcessingAsync(Unit input, CancellationToken token) + { + var now = DateTime.Now; + + Debug.WriteLine($"Starting processing of input: {now}"); + await Task.Delay(TimeSpan.FromSeconds(10), token); + Debug.WriteLine($"Finished processing of input: {now}"); } public void Dispose() { - Disposable.Dispose(Input, Output); + Disposable.Dispose(Input, Output, DoProcessingCommand, IsProcessing); } } @@ -27,14 +48,6 @@ public MainPage() { InitializeComponent(); - var startDate = DateTime.Now; - Observable.Interval(TimeSpan.FromMilliseconds(500)) - .Subscribe(x => - { - throw new InvalidOperationException("oppeke"); - Label1.Text = $"Timer: {(DateTime.Now - startDate).TotalMilliseconds}"; - }); - var frameCount = 0; Observable.IntervalFrame(1) .Subscribe(x => @@ -45,8 +58,11 @@ public MainPage() this.Loaded += (sender, args) => { var viewModel = new BasicUsagesViewModel(); + BindingContext = viewModel; }; + + } void OnCounterClicked(object sender, EventArgs e) diff --git a/sandbox/MauiApp1/MauiApp1.csproj b/sandbox/MauiApp1/MauiApp1.csproj index 9aa99769..e9916d3b 100644 --- a/sandbox/MauiApp1/MauiApp1.csproj +++ b/sandbox/MauiApp1/MauiApp1.csproj @@ -32,8 +32,8 @@ 1.0 1 - 11.0 - 13.1 + 14.0 + 15.0 21.0 10.0.17763.0 10.0.17763.0 diff --git a/src/R3/ReactiveCommand.cs b/src/R3/ReactiveCommand.cs index 89aeb8a4..fa7e8eb9 100644 --- a/src/R3/ReactiveCommand.cs +++ b/src/R3/ReactiveCommand.cs @@ -9,9 +9,12 @@ public class ReactiveCommand : Observable, ICommand, IDisposable CompleteState completeState; // struct(int, IntPtr) IDisposable subscription; // from canExecuteSource (and onNext). bool canExecute; // set from observable sequence + int executionCount; public event EventHandler? CanExecuteChanged; + public ReactiveProperty IsExecuting { get; } = new(); + public ReactiveCommand() { this.list = new FreeListCore(this); @@ -23,14 +26,14 @@ public ReactiveCommand(Action execute) { this.list = new FreeListCore(this); this.canExecute = true; - this.subscription = this.Subscribe(execute); + this.subscription = this.Subscribe((Command: this, Action: execute), static (value, state) => state.Command.HandleExecution(value, state.Action)); } public ReactiveCommand(Func executeAsync, AwaitOperation awaitOperation = AwaitOperation.Sequential, bool configureAwait = true, bool cancelOnCompleted = false, int maxSequential = -1) { this.list = new FreeListCore(this); this.canExecute = true; - this.subscription = this.SubscribeAwait(executeAsync, awaitOperation, configureAwait, cancelOnCompleted, maxSequential); + this.subscription = this.SubscribeAwait((Command: this, Func: executeAsync), static (value, state, cancellationToken) => state.Command.HandleAsyncExecution(value, state.Func, cancellationToken), awaitOperation, configureAwait, cancelOnCompleted, maxSequential); } public ReactiveCommand(Observable canExecuteSource, bool initialCanExecute) @@ -89,6 +92,38 @@ internal void CombineSubscription(IDisposable disposable) this.subscription = Disposable.Combine(this.subscription, disposable); } + private void HandleExecution(T value, Action execute) + { + try + { + IsExecuting.Value = Interlocked.Increment(ref executionCount) > 0; + execute(value); + } + finally + { + if (Interlocked.Decrement(ref executionCount) == 0) + { + IsExecuting.Value = false; + } + } + } + + private async ValueTask HandleAsyncExecution(T value, Func executeAsync, CancellationToken cancellationToken) + { + try + { + IsExecuting.Value = Interlocked.Increment(ref executionCount) > 0; + await executeAsync(value, cancellationToken); + } + finally + { + if (Interlocked.Decrement(ref executionCount) == 0) + { + IsExecuting.Value = false; + } + } + } + protected override IDisposable SubscribeCore(Observer observer) { var result = completeState.TryGetResult(); @@ -130,6 +165,7 @@ public void Dispose(bool callOnCompleted) } } + IsExecuting.Dispose(); list.Dispose(); subscription.Dispose(); } From 26f62f13f0f808043da336ce572436b562c668fc Mon Sep 17 00:00:00 2001 From: Michael Stonis Date: Fri, 6 Dec 2024 11:39:06 -0600 Subject: [PATCH 3/7] Add TState support to SelectAwait functionality - Introduced a new overload for SelectAwait that includes a state parameter. - Added multiple classes to handle different AwaitOperation types with the new TState feature. - Implemented error handling and completion logic in the new classes. - Enhanced concurrency control with maxConcurrent option for parallel operations. --- src/R3/Operators/SelectAwait.cs | 270 ++++++++++++++++++++++++++++++++ 1 file changed, 270 insertions(+) diff --git a/src/R3/Operators/SelectAwait.cs b/src/R3/Operators/SelectAwait.cs index c20ab8cb..48a761ae 100644 --- a/src/R3/Operators/SelectAwait.cs +++ b/src/R3/Operators/SelectAwait.cs @@ -9,6 +9,12 @@ public static Observable SelectAwait(this Observable sou { return new SelectAwait(source, selector, awaitOperation, configureAwait, cancelOnCompleted, maxConcurrent); } + + /// This option is only valid for AwaitOperation.Parallel and AwaitOperation.SequentialParallel. It sets the number of concurrent executions. If set to -1, there is no limit. + public static Observable SelectAwait(this Observable source, TState state, Func> selector, AwaitOperation awaitOperation = AwaitOperation.Sequential, bool configureAwait = true, bool cancelOnCompleted = false, int maxConcurrent = -1) + { + return new SelectAwait(source, state, selector, awaitOperation, configureAwait, cancelOnCompleted, maxConcurrent); + } } internal sealed class SelectAwait(Observable source, Func> selector, AwaitOperation awaitOperation, bool configureAwait, bool cancelOnCompleted, int maxConcurrent) : Observable @@ -274,3 +280,267 @@ protected override void PublishOnCompleted(Result result) } } } + +internal sealed class SelectAwait(Observable source, TState state, Func> selector, AwaitOperation awaitOperation, bool configureAwait, bool cancelOnCompleted, int maxConcurrent) : Observable +{ + protected override IDisposable SubscribeCore(Observer observer) + { + switch (awaitOperation) + { + case AwaitOperation.Sequential: + return source.Subscribe(new SelectAwaitSequential(observer, state, selector, configureAwait, cancelOnCompleted)); + case AwaitOperation.Drop: + return source.Subscribe(new SelectAwaitDrop(observer, state, selector, configureAwait, cancelOnCompleted)); + case AwaitOperation.Switch: + return source.Subscribe(new SelectAwaitSwitch(observer, state, selector, configureAwait, cancelOnCompleted)); + case AwaitOperation.Parallel: + if (maxConcurrent == -1) + { + return source.Subscribe(new SelectAwaitParallel(observer, state, selector, configureAwait, cancelOnCompleted)); + } + else + { + if (maxConcurrent == 0 || maxConcurrent < -1) throw new ArgumentException("maxConcurrent must be a -1 or greater than 1."); + return source.Subscribe(new SelectAwaitParallelConcurrentLimit(observer, state, selector, configureAwait, cancelOnCompleted, maxConcurrent)); + } + case AwaitOperation.SequentialParallel: + if (maxConcurrent == -1) + { + return source.Subscribe(new SelectAwaitSequentialParallel(observer, state, selector, configureAwait, cancelOnCompleted)); + } + else + { + if (maxConcurrent == 0 || maxConcurrent < -1) throw new ArgumentException("maxConcurrent must be a -1 or greater than 1."); + return source.Subscribe(new SelectAwaitSequentialParallelConcurrentLimit(observer, state, selector, configureAwait, cancelOnCompleted, maxConcurrent)); + } + case AwaitOperation.ThrottleFirstLast: + return source.Subscribe(new SelectAwaitThrottleFirstLast(observer, state, selector, configureAwait, cancelOnCompleted)); + default: + throw new ArgumentException(); + } + } + + sealed class SelectAwaitSequential(Observer observer, TState state, Func> selector, bool configureAwait, bool cancelOnCompleted) + : AwaitOperationSequentialObserver(configureAwait, cancelOnCompleted) + { +#if NET6_0_OR_GREATER + [AsyncMethodBuilderAttribute(typeof(PoolingAsyncValueTaskMethodBuilder))] +#endif + protected override async ValueTask OnNextAsync(T value, CancellationToken cancellationToken, bool configureAwait) + { + var v = await selector(value, state, cancellationToken).ConfigureAwait(configureAwait); + observer.OnNext(v); + } + + protected override void OnErrorResumeCore(Exception error) + { + observer.OnErrorResume(error); + } + + protected override void PublishOnCompleted(Result result) + { + observer.OnCompleted(result); + } + } + + sealed class SelectAwaitDrop(Observer observer, TState state, Func> selector, bool configureAwait, bool cancelOnCompleted) + : AwaitOperationDropObserver(configureAwait, cancelOnCompleted) + { +#if NET6_0_OR_GREATER + [AsyncMethodBuilderAttribute(typeof(PoolingAsyncValueTaskMethodBuilder))] +#endif + protected override async ValueTask OnNextAsync(T value, CancellationToken cancellationToken, bool configureAwait) + { + var v = await selector(value, state, cancellationToken).ConfigureAwait(configureAwait); + observer.OnNext(v); + } + + protected override void OnErrorResumeCore(Exception error) + { + observer.OnErrorResume(error); + } + + protected override void PublishOnCompleted(Result result) + { + observer.OnCompleted(result); + } + } + + sealed class SelectAwaitParallel(Observer observer, TState state, Func> selector, bool configureAwait, bool cancelOnCompleted) + : AwaitOperationParallelObserver(configureAwait, cancelOnCompleted) + { + +#if NET6_0_OR_GREATER + [AsyncMethodBuilderAttribute(typeof(PoolingAsyncValueTaskMethodBuilder))] +#endif + protected override async ValueTask OnNextAsync(T value, CancellationToken cancellationToken, bool configureAwait) + { + var v = await selector(value, state, cancellationToken).ConfigureAwait(configureAwait); + lock (gate) + { + observer.OnNext(v); + } + } + + protected override void OnErrorResumeCore(Exception error) + { + lock (gate) + { + observer.OnErrorResume(error); + } + } + + protected override void PublishOnCompleted(Result result) + { + lock (gate) + { + observer.OnCompleted(result); + } + } + } + + + sealed class SelectAwaitSwitch(Observer observer, TState state, Func> selector, bool configureAwait, bool cancelOnCompleted) + : AwaitOperationSwitchObserver(configureAwait, cancelOnCompleted) + { + protected override void OnErrorResumeCore(Exception error) + { + lock (gate) + { + observer.OnErrorResume(error); + } + } + + protected override async ValueTask OnNextAsync(T value, CancellationToken cancellationToken, bool configureAwait) + { + var v = await selector(value, state, cancellationToken).ConfigureAwait(configureAwait); + lock (gate) + { + if (!cancellationToken.IsCancellationRequested) + { + observer.OnNext(v); + } + } + } + + protected override void PublishOnCompleted(Result result) + { + lock (gate) + { + observer.OnCompleted(result); + } + } + } + + sealed class SelectAwaitSequentialParallel(Observer observer, TState state, Func> selector, bool configureAwait, bool cancelOnCompleted) + : AwaitOperationSequentialParallelObserver(configureAwait, cancelOnCompleted) + { + +#if NET6_0_OR_GREATER + [AsyncMethodBuilderAttribute(typeof(PoolingAsyncValueTaskMethodBuilder))] +#endif + protected override ValueTask OnNextTaskAsync(T value, CancellationToken cancellationToken, bool configureAwait) + { + return selector(value, state, cancellationToken); + } + + protected override void PublishOnNext(T _, TResult result) + { + observer.OnNext(result); + } + + protected override void OnErrorResumeCore(Exception error) + { + observer.OnErrorResume(error); + } + + protected override void PublishOnCompleted(Result result) + { + observer.OnCompleted(result); + } + } + + sealed class SelectAwaitParallelConcurrentLimit(Observer observer, TState state, Func> selector, bool configureAwait, bool cancelOnCompleted, int maxConcurrent) + : AwaitOperationParallelConcurrentLimitObserver(configureAwait, cancelOnCompleted, maxConcurrent) + { + +#if NET6_0_OR_GREATER + [AsyncMethodBuilderAttribute(typeof(PoolingAsyncValueTaskMethodBuilder))] +#endif + protected override async ValueTask OnNextAsync(T value, CancellationToken cancellationToken, bool configureAwait) + { + var v = await selector(value, state, cancellationToken).ConfigureAwait(configureAwait); + lock (gate) + { + observer.OnNext(v); + } + } + + protected override void OnErrorResumeCore(Exception error) + { + lock (gate) + { + observer.OnErrorResume(error); + } + } + + protected override void PublishOnCompleted(Result result) + { + lock (gate) + { + observer.OnCompleted(result); + } + } + } + + sealed class SelectAwaitSequentialParallelConcurrentLimit(Observer observer, TState state, Func> selector, bool configureAwait, bool cancelOnCompleted, int maxConcurrent) + : AwaitOperationSequentialParallelConcurrentLimitObserver(configureAwait, cancelOnCompleted, maxConcurrent) + { + +#if NET6_0_OR_GREATER + [AsyncMethodBuilderAttribute(typeof(PoolingAsyncValueTaskMethodBuilder))] +#endif + protected override ValueTask OnNextTaskAsyncCore(T value, CancellationToken cancellationToken, bool configureAwait) + { + return selector(value, state, cancellationToken); + } + + protected override void PublishOnNext(T _, TResult result) + { + observer.OnNext(result); + } + + protected override void OnErrorResumeCore(Exception error) + { + observer.OnErrorResume(error); + } + + protected override void PublishOnCompleted(Result result) + { + observer.OnCompleted(result); + } + } + + sealed class SelectAwaitThrottleFirstLast(Observer observer, TState state, Func> selector, bool configureAwait, bool cancelOnCompleted) + : AwaitOperationThrottleFirstLastObserver(configureAwait, cancelOnCompleted) + { +#if NET6_0_OR_GREATER + [AsyncMethodBuilderAttribute(typeof(PoolingAsyncValueTaskMethodBuilder))] +#endif + protected override async ValueTask OnNextAsync(T value, CancellationToken cancellationToken, bool configureAwait) + { + var v = await selector(value, state, cancellationToken).ConfigureAwait(configureAwait); + observer.OnNext(v); + } + + protected override void OnErrorResumeCore(Exception error) + { + observer.OnErrorResume(error); + } + + protected override void PublishOnCompleted(Result result) + { + observer.OnCompleted(result); + } + } +} From 389a7684e2d60754d7346ba786c6a94889a5e1d6 Mon Sep 17 00:00:00 2001 From: Michael Stonis Date: Fri, 6 Dec 2024 11:39:44 -0600 Subject: [PATCH 4/7] Refactor execution handling for commands - Added support for being notified when IsExecuting has been changed --- src/R3/ReactiveCommand.cs | 89 +++++++++++++++++++++++++++++++++++---- 1 file changed, 81 insertions(+), 8 deletions(-) diff --git a/src/R3/ReactiveCommand.cs b/src/R3/ReactiveCommand.cs index fa7e8eb9..61770198 100644 --- a/src/R3/ReactiveCommand.cs +++ b/src/R3/ReactiveCommand.cs @@ -10,6 +10,7 @@ public class ReactiveCommand : Observable, ICommand, IDisposable IDisposable subscription; // from canExecuteSource (and onNext). bool canExecute; // set from observable sequence int executionCount; + readonly object gate = new(); public event EventHandler? CanExecuteChanged; @@ -96,14 +97,23 @@ private void HandleExecution(T value, Action execute) { try { - IsExecuting.Value = Interlocked.Increment(ref executionCount) > 0; + lock (gate) + { + executionCount++; + IsExecuting.Value = executionCount > 0; + } + execute(value); } finally { - if (Interlocked.Decrement(ref executionCount) == 0) + lock (gate) { - IsExecuting.Value = false; + executionCount--; + if (executionCount == 0) + { + IsExecuting.Value = false; + } } } } @@ -112,14 +122,23 @@ private async ValueTask HandleAsyncExecution(T value, Func 0; + lock (gate) + { + executionCount++; + IsExecuting.Value = executionCount > 0; + } + await executeAsync(value, cancellationToken); } finally { - if (Interlocked.Decrement(ref executionCount) == 0) + lock (gate) { - IsExecuting.Value = false; + executionCount--; + if (executionCount == 0) + { + IsExecuting.Value = false; + } } } } @@ -202,12 +221,16 @@ public class ReactiveCommand : Observable, ICommand, I CompleteState completeState; // struct(int, IntPtr) bool canExecute; // set from observable sequence IDisposable subscription; + int executionCount; + readonly object gate = new(); readonly Func? convert; // for sync SingleAssignmentSubject? asyncInput; // for async public event EventHandler? CanExecuteChanged; + public ReactiveProperty IsExecuting { get; } = new(); + public ReactiveCommand(Func convert) { this.list = new FreeListCore(this); @@ -221,7 +244,7 @@ public ReactiveCommand(Func> conve this.list = new FreeListCore(this); this.canExecute = true; this.asyncInput = new SingleAssignmentSubject(); - this.subscription = asyncInput.SelectAwait(convertAsync, awaitOperation, configureAwait, cancelOnCompleted, maxSequential).Subscribe(this, static (x, state) => + this.subscription = asyncInput.SelectAwait((Command: this, ConvertAsync: convertAsync), static (value, state, cancellationToken) => state.Command.HandleAsyncExecution(value, state.ConvertAsync, cancellationToken), awaitOperation, configureAwait, cancelOnCompleted, maxSequential).Subscribe(this, static (x, state) => { if (state.completeState.IsCompleted) return; @@ -316,6 +339,56 @@ public void Execute(TInput parameter) } } + private void HandleExecution(TInput value, Action execute) + { + try + { + lock (gate) + { + executionCount++; + IsExecuting.Value = executionCount > 0; + } + + execute(value); + } + finally + { + lock (gate) + { + executionCount--; + if (executionCount == 0) + { + IsExecuting.Value = false; + } + } + } + } + + private async ValueTask HandleAsyncExecution(TInput value, Func> executeAsync, CancellationToken cancellationToken) + { + try + { + lock (gate) + { + executionCount++; + IsExecuting.Value = executionCount > 0; + } + + return await executeAsync(value, cancellationToken); + } + finally + { + lock (gate) + { + executionCount--; + if (executionCount == 0) + { + IsExecuting.Value = false; + } + } + } + } + protected override IDisposable SubscribeCore(Observer observer) { var result = completeState.TryGetResult(); @@ -456,7 +529,7 @@ public static ReactiveCommand ToReactiveCommand( { var command = new ReactiveCommand(canExecuteSource, initialCanExecute); - var subscription = command.SubscribeAwait(async (x, ct) => await executeAsync(x, ct), awaitOperation, configureAwait, cancelOnCompleted, maxSequential); + var subscription = command.SubscribeAwait(executeAsync, async (x, func, ct) => await func(x, ct), awaitOperation, configureAwait, cancelOnCompleted, maxSequential); command.CombineSubscription(subscription); return command; From dbf590ace8dba734cbd3725382a682eda3ce475b Mon Sep 17 00:00:00 2001 From: Michael Stonis Date: Fri, 6 Dec 2024 15:54:36 -0600 Subject: [PATCH 5/7] Refactor command execution handling - Updated subscription logic for command execution. - Improved async handling in subscription methods. --- src/R3/ReactiveCommand.cs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/R3/ReactiveCommand.cs b/src/R3/ReactiveCommand.cs index 61770198..4be2fdfe 100644 --- a/src/R3/ReactiveCommand.cs +++ b/src/R3/ReactiveCommand.cs @@ -27,14 +27,14 @@ public ReactiveCommand(Action execute) { this.list = new FreeListCore(this); this.canExecute = true; - this.subscription = this.Subscribe((Command: this, Action: execute), static (value, state) => state.Command.HandleExecution(value, state.Action)); + this.subscription = this.Subscribe((HandleExecutionAction: (Action>)HandleExecution, Action: execute), static (value, state) => state.HandleExecutionAction(value, state.Action)); } public ReactiveCommand(Func executeAsync, AwaitOperation awaitOperation = AwaitOperation.Sequential, bool configureAwait = true, bool cancelOnCompleted = false, int maxSequential = -1) { this.list = new FreeListCore(this); this.canExecute = true; - this.subscription = this.SubscribeAwait((Command: this, Func: executeAsync), static (value, state, cancellationToken) => state.Command.HandleAsyncExecution(value, state.Func, cancellationToken), awaitOperation, configureAwait, cancelOnCompleted, maxSequential); + this.subscription = this.SubscribeAwait((HandleAsyncExecutionFunc: (Func, CancellationToken, ValueTask>)HandleAsyncExecution, Func: executeAsync), static (value, state, cancellationToken) => state.HandleAsyncExecutionFunc(value, state.Func, cancellationToken), awaitOperation, configureAwait, cancelOnCompleted, maxSequential); } public ReactiveCommand(Observable canExecuteSource, bool initialCanExecute) @@ -529,7 +529,7 @@ public static ReactiveCommand ToReactiveCommand( { var command = new ReactiveCommand(canExecuteSource, initialCanExecute); - var subscription = command.SubscribeAwait(executeAsync, async (x, func, ct) => await func(x, ct), awaitOperation, configureAwait, cancelOnCompleted, maxSequential); + var subscription = command.SubscribeAwait(executeAsync, static async (x, func, ct) => await func(x, ct), awaitOperation, configureAwait, cancelOnCompleted, maxSequential); command.CombineSubscription(subscription); return command; From 09ee7d19b36069537fa28a23b90ccbe6acc910ae Mon Sep 17 00:00:00 2001 From: Michael Stonis Date: Fri, 6 Dec 2024 16:12:55 -0600 Subject: [PATCH 6/7] remove solution change --- R3.sln | 1 - 1 file changed, 1 deletion(-) diff --git a/R3.sln b/R3.sln index 67510c1d..6ca7a828 100644 --- a/R3.sln +++ b/R3.sln @@ -320,7 +320,6 @@ Global {2CD257D7-DF21-4D60-AC05-747D83236E5A}.Release|x86.ActiveCfg = Release|Any CPU {2CD257D7-DF21-4D60-AC05-747D83236E5A}.Release|x86.Build.0 = Release|Any CPU {2CD257D7-DF21-4D60-AC05-747D83236E5A}.Release|x86.Deploy.0 = Release|Any CPU - {2CD257D7-DF21-4D60-AC05-747D83236E5A}.Debug|Any CPU.Build.0 = Debug|Any CPU {F1D6609C-AA33-4099-8932-BADBCB935FBD}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {F1D6609C-AA33-4099-8932-BADBCB935FBD}.Debug|Any CPU.Build.0 = Debug|Any CPU {F1D6609C-AA33-4099-8932-BADBCB935FBD}.Debug|ARM64.ActiveCfg = Debug|Any CPU From 1a41600d9ffb030c8f2e11d922393ad14c42670a Mon Sep 17 00:00:00 2001 From: Michael Stonis Date: Fri, 6 Dec 2024 16:15:56 -0600 Subject: [PATCH 7/7] remove reimplementation of generic --- src/R3/ReactiveCommand.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/R3/ReactiveCommand.cs b/src/R3/ReactiveCommand.cs index 4be2fdfe..20ca7f59 100644 --- a/src/R3/ReactiveCommand.cs +++ b/src/R3/ReactiveCommand.cs @@ -93,7 +93,7 @@ internal void CombineSubscription(IDisposable disposable) this.subscription = Disposable.Combine(this.subscription, disposable); } - private void HandleExecution(T value, Action execute) + private void HandleExecution(T value, Action execute) { try { @@ -118,7 +118,7 @@ private void HandleExecution(T value, Action execute) } } - private async ValueTask HandleAsyncExecution(T value, Func executeAsync, CancellationToken cancellationToken) + private async ValueTask HandleAsyncExecution(T value, Func executeAsync, CancellationToken cancellationToken) { try {