Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds support for IsExecuting to Reactive Commands #280

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion sandbox/MauiApp1/MainPage.xaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
<?xml version="1.0" encoding="utf-8" ?>
<ContentPage xmlns="http://schemas.microsoft.com/dotnet/2021/maui"
xmlns:x="http://schemas.microsoft.com/winfx/2009/xaml"
xmlns:mauiApp1="clr-namespace:MauiApp1"
x:DataType="mauiApp1:BasicUsagesViewModel"
x:Class="MauiApp1.MainPage">

<ScrollView>
Expand Down Expand Up @@ -36,8 +38,11 @@
x:Name="CounterBtn"
Text="Click me"
SemanticProperties.Hint="Counts the number of times you click"
Clicked="OnCounterClicked"
Command="{Binding DoProcessingCommand}"
HorizontalOptions="Fill" />

<Label Text="Commands Are Executing?" />
<Entry Text="{Binding IsProcessing.Value, Mode=OneWay, StringFormat='{0}'}" />
</VerticalStackLayout>
</ScrollView>

Expand Down
36 changes: 26 additions & 10 deletions sandbox/MauiApp1/MainPage.xaml.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using R3;
using System.Diagnostics;
using R3;

namespace MauiApp1;

Expand All @@ -7,15 +8,35 @@ public class BasicUsagesViewModel : IDisposable
public BindableReactiveProperty<string> Input { get; }
public BindableReactiveProperty<string> Output { get; }

public ReactiveCommand<Unit> DoProcessingCommand { get; }

public IReadOnlyBindableReactiveProperty<bool> IsProcessing { get; }

public BasicUsagesViewModel()
{
Input = new BindableReactiveProperty<string>("");
Output = Input.Select(x => x.ToUpper()).ToBindableReactiveProperty("");

DoProcessingCommand = new(DoProcessingAsync, AwaitOperation.Parallel);

IsProcessing =
this.DoProcessingCommand
.IsExecuting
.ToReadOnlyBindableReactiveProperty(false);
}

private async ValueTask DoProcessingAsync(Unit input, CancellationToken token)
{
var now = DateTime.Now;

Debug.WriteLine($"Starting processing of input: {now}");
await Task.Delay(TimeSpan.FromSeconds(10), token);
Debug.WriteLine($"Finished processing of input: {now}");
}

public void Dispose()
{
Disposable.Dispose(Input, Output);
Disposable.Dispose(Input, Output, DoProcessingCommand, IsProcessing);
}
}

Expand All @@ -27,14 +48,6 @@ public MainPage()
{
InitializeComponent();

var startDate = DateTime.Now;
Observable.Interval(TimeSpan.FromMilliseconds(500))
.Subscribe(x =>
{
throw new InvalidOperationException("oppeke");
Label1.Text = $"Timer: {(DateTime.Now - startDate).TotalMilliseconds}";
});

var frameCount = 0;
Observable.IntervalFrame(1)
.Subscribe(x =>
Expand All @@ -45,8 +58,11 @@ public MainPage()
this.Loaded += (sender, args) =>
{
var viewModel = new BasicUsagesViewModel();

BindingContext = viewModel;
};


}

void OnCounterClicked(object sender, EventArgs e)
Expand Down
4 changes: 2 additions & 2 deletions sandbox/MauiApp1/MauiApp1.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
<ApplicationDisplayVersion>1.0</ApplicationDisplayVersion>
<ApplicationVersion>1</ApplicationVersion>

<SupportedOSPlatformVersion Condition="$([MSBuild]::GetTargetPlatformIdentifier('$(TargetFramework)')) == 'ios'">11.0</SupportedOSPlatformVersion>
<SupportedOSPlatformVersion Condition="$([MSBuild]::GetTargetPlatformIdentifier('$(TargetFramework)')) == 'maccatalyst'">13.1</SupportedOSPlatformVersion>
<SupportedOSPlatformVersion Condition="$([MSBuild]::GetTargetPlatformIdentifier('$(TargetFramework)')) == 'ios'">14.0</SupportedOSPlatformVersion>
<SupportedOSPlatformVersion Condition="$([MSBuild]::GetTargetPlatformIdentifier('$(TargetFramework)')) == 'maccatalyst'">15.0</SupportedOSPlatformVersion>
<SupportedOSPlatformVersion Condition="$([MSBuild]::GetTargetPlatformIdentifier('$(TargetFramework)')) == 'android'">21.0</SupportedOSPlatformVersion>
<SupportedOSPlatformVersion Condition="$([MSBuild]::GetTargetPlatformIdentifier('$(TargetFramework)')) == 'windows'">10.0.17763.0</SupportedOSPlatformVersion>
<TargetPlatformMinVersion Condition="$([MSBuild]::GetTargetPlatformIdentifier('$(TargetFramework)')) == 'windows'">10.0.17763.0</TargetPlatformMinVersion>
Expand Down
270 changes: 270 additions & 0 deletions src/R3/Operators/SelectAwait.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@ public static Observable<TResult> SelectAwait<T, TResult>(this Observable<T> sou
{
return new SelectAwait<T, TResult>(source, selector, awaitOperation, configureAwait, cancelOnCompleted, maxConcurrent);
}

/// <param name="maxConcurrent">This option is only valid for AwaitOperation.Parallel and AwaitOperation.SequentialParallel. It sets the number of concurrent executions. If set to -1, there is no limit.</param>
public static Observable<TResult> SelectAwait<T, TResult, TState>(this Observable<T> source, TState state, Func<T, TState, CancellationToken, ValueTask<TResult>> selector, AwaitOperation awaitOperation = AwaitOperation.Sequential, bool configureAwait = true, bool cancelOnCompleted = false, int maxConcurrent = -1)
{
return new SelectAwait<T, TResult, TState>(source, state, selector, awaitOperation, configureAwait, cancelOnCompleted, maxConcurrent);
}
}

internal sealed class SelectAwait<T, TResult>(Observable<T> source, Func<T, CancellationToken, ValueTask<TResult>> selector, AwaitOperation awaitOperation, bool configureAwait, bool cancelOnCompleted, int maxConcurrent) : Observable<TResult>
Expand Down Expand Up @@ -274,3 +280,267 @@ protected override void PublishOnCompleted(Result result)
}
}
}

internal sealed class SelectAwait<T, TResult, TState>(Observable<T> source, TState state, Func<T, TState, CancellationToken, ValueTask<TResult>> selector, AwaitOperation awaitOperation, bool configureAwait, bool cancelOnCompleted, int maxConcurrent) : Observable<TResult>
{
protected override IDisposable SubscribeCore(Observer<TResult> observer)
{
switch (awaitOperation)
{
case AwaitOperation.Sequential:
return source.Subscribe(new SelectAwaitSequential(observer, state, selector, configureAwait, cancelOnCompleted));
case AwaitOperation.Drop:
return source.Subscribe(new SelectAwaitDrop(observer, state, selector, configureAwait, cancelOnCompleted));
case AwaitOperation.Switch:
return source.Subscribe(new SelectAwaitSwitch(observer, state, selector, configureAwait, cancelOnCompleted));
case AwaitOperation.Parallel:
if (maxConcurrent == -1)
{
return source.Subscribe(new SelectAwaitParallel(observer, state, selector, configureAwait, cancelOnCompleted));
}
else
{
if (maxConcurrent == 0 || maxConcurrent < -1) throw new ArgumentException("maxConcurrent must be a -1 or greater than 1.");
return source.Subscribe(new SelectAwaitParallelConcurrentLimit(observer, state, selector, configureAwait, cancelOnCompleted, maxConcurrent));
}
case AwaitOperation.SequentialParallel:
if (maxConcurrent == -1)
{
return source.Subscribe(new SelectAwaitSequentialParallel(observer, state, selector, configureAwait, cancelOnCompleted));
}
else
{
if (maxConcurrent == 0 || maxConcurrent < -1) throw new ArgumentException("maxConcurrent must be a -1 or greater than 1.");
return source.Subscribe(new SelectAwaitSequentialParallelConcurrentLimit(observer, state, selector, configureAwait, cancelOnCompleted, maxConcurrent));
}
case AwaitOperation.ThrottleFirstLast:
return source.Subscribe(new SelectAwaitThrottleFirstLast(observer, state, selector, configureAwait, cancelOnCompleted));
default:
throw new ArgumentException();
}
}

sealed class SelectAwaitSequential(Observer<TResult> observer, TState state, Func<T, TState, CancellationToken, ValueTask<TResult>> selector, bool configureAwait, bool cancelOnCompleted)
: AwaitOperationSequentialObserver<T>(configureAwait, cancelOnCompleted)
{
#if NET6_0_OR_GREATER
[AsyncMethodBuilderAttribute(typeof(PoolingAsyncValueTaskMethodBuilder))]
#endif
protected override async ValueTask OnNextAsync(T value, CancellationToken cancellationToken, bool configureAwait)
{
var v = await selector(value, state, cancellationToken).ConfigureAwait(configureAwait);
observer.OnNext(v);
}

protected override void OnErrorResumeCore(Exception error)
{
observer.OnErrorResume(error);
}

protected override void PublishOnCompleted(Result result)
{
observer.OnCompleted(result);
}
}

sealed class SelectAwaitDrop(Observer<TResult> observer, TState state, Func<T, TState, CancellationToken, ValueTask<TResult>> selector, bool configureAwait, bool cancelOnCompleted)
: AwaitOperationDropObserver<T>(configureAwait, cancelOnCompleted)
{
#if NET6_0_OR_GREATER
[AsyncMethodBuilderAttribute(typeof(PoolingAsyncValueTaskMethodBuilder))]
#endif
protected override async ValueTask OnNextAsync(T value, CancellationToken cancellationToken, bool configureAwait)
{
var v = await selector(value, state, cancellationToken).ConfigureAwait(configureAwait);
observer.OnNext(v);
}

protected override void OnErrorResumeCore(Exception error)
{
observer.OnErrorResume(error);
}

protected override void PublishOnCompleted(Result result)
{
observer.OnCompleted(result);
}
}

sealed class SelectAwaitParallel(Observer<TResult> observer, TState state, Func<T, TState, CancellationToken, ValueTask<TResult>> selector, bool configureAwait, bool cancelOnCompleted)
: AwaitOperationParallelObserver<T>(configureAwait, cancelOnCompleted)
{

#if NET6_0_OR_GREATER
[AsyncMethodBuilderAttribute(typeof(PoolingAsyncValueTaskMethodBuilder))]
#endif
protected override async ValueTask OnNextAsync(T value, CancellationToken cancellationToken, bool configureAwait)
{
var v = await selector(value, state, cancellationToken).ConfigureAwait(configureAwait);
lock (gate)
{
observer.OnNext(v);
}
}

protected override void OnErrorResumeCore(Exception error)
{
lock (gate)
{
observer.OnErrorResume(error);
}
}

protected override void PublishOnCompleted(Result result)
{
lock (gate)
{
observer.OnCompleted(result);
}
}
}


sealed class SelectAwaitSwitch(Observer<TResult> observer, TState state, Func<T, TState, CancellationToken, ValueTask<TResult>> selector, bool configureAwait, bool cancelOnCompleted)
: AwaitOperationSwitchObserver<T>(configureAwait, cancelOnCompleted)
{
protected override void OnErrorResumeCore(Exception error)
{
lock (gate)
{
observer.OnErrorResume(error);
}
}

protected override async ValueTask OnNextAsync(T value, CancellationToken cancellationToken, bool configureAwait)
{
var v = await selector(value, state, cancellationToken).ConfigureAwait(configureAwait);
lock (gate)
{
if (!cancellationToken.IsCancellationRequested)
{
observer.OnNext(v);
}
}
}

protected override void PublishOnCompleted(Result result)
{
lock (gate)
{
observer.OnCompleted(result);
}
}
}

sealed class SelectAwaitSequentialParallel(Observer<TResult> observer, TState state, Func<T, TState, CancellationToken, ValueTask<TResult>> selector, bool configureAwait, bool cancelOnCompleted)
: AwaitOperationSequentialParallelObserver<T, TResult>(configureAwait, cancelOnCompleted)
{

#if NET6_0_OR_GREATER
[AsyncMethodBuilderAttribute(typeof(PoolingAsyncValueTaskMethodBuilder))]
#endif
protected override ValueTask<TResult> OnNextTaskAsync(T value, CancellationToken cancellationToken, bool configureAwait)
{
return selector(value, state, cancellationToken);
}

protected override void PublishOnNext(T _, TResult result)
{
observer.OnNext(result);
}

protected override void OnErrorResumeCore(Exception error)
{
observer.OnErrorResume(error);
}

protected override void PublishOnCompleted(Result result)
{
observer.OnCompleted(result);
}
}

sealed class SelectAwaitParallelConcurrentLimit(Observer<TResult> observer, TState state, Func<T, TState, CancellationToken, ValueTask<TResult>> selector, bool configureAwait, bool cancelOnCompleted, int maxConcurrent)
: AwaitOperationParallelConcurrentLimitObserver<T>(configureAwait, cancelOnCompleted, maxConcurrent)
{

#if NET6_0_OR_GREATER
[AsyncMethodBuilderAttribute(typeof(PoolingAsyncValueTaskMethodBuilder))]
#endif
protected override async ValueTask OnNextAsync(T value, CancellationToken cancellationToken, bool configureAwait)
{
var v = await selector(value, state, cancellationToken).ConfigureAwait(configureAwait);
lock (gate)
{
observer.OnNext(v);
}
}

protected override void OnErrorResumeCore(Exception error)
{
lock (gate)
{
observer.OnErrorResume(error);
}
}

protected override void PublishOnCompleted(Result result)
{
lock (gate)
{
observer.OnCompleted(result);
}
}
}

sealed class SelectAwaitSequentialParallelConcurrentLimit(Observer<TResult> observer, TState state, Func<T, TState, CancellationToken, ValueTask<TResult>> selector, bool configureAwait, bool cancelOnCompleted, int maxConcurrent)
: AwaitOperationSequentialParallelConcurrentLimitObserver<T, TResult>(configureAwait, cancelOnCompleted, maxConcurrent)
{

#if NET6_0_OR_GREATER
[AsyncMethodBuilderAttribute(typeof(PoolingAsyncValueTaskMethodBuilder))]
#endif
protected override ValueTask<TResult> OnNextTaskAsyncCore(T value, CancellationToken cancellationToken, bool configureAwait)
{
return selector(value, state, cancellationToken);
}

protected override void PublishOnNext(T _, TResult result)
{
observer.OnNext(result);
}

protected override void OnErrorResumeCore(Exception error)
{
observer.OnErrorResume(error);
}

protected override void PublishOnCompleted(Result result)
{
observer.OnCompleted(result);
}
}

sealed class SelectAwaitThrottleFirstLast(Observer<TResult> observer, TState state, Func<T, TState, CancellationToken, ValueTask<TResult>> selector, bool configureAwait, bool cancelOnCompleted)
: AwaitOperationThrottleFirstLastObserver<T>(configureAwait, cancelOnCompleted)
{
#if NET6_0_OR_GREATER
[AsyncMethodBuilderAttribute(typeof(PoolingAsyncValueTaskMethodBuilder))]
#endif
protected override async ValueTask OnNextAsync(T value, CancellationToken cancellationToken, bool configureAwait)
{
var v = await selector(value, state, cancellationToken).ConfigureAwait(configureAwait);
observer.OnNext(v);
}

protected override void OnErrorResumeCore(Exception error)
{
observer.OnErrorResume(error);
}

protected override void PublishOnCompleted(Result result)
{
observer.OnCompleted(result);
}
}
}
Loading
Loading