diff --git a/src/RxTelegram.Bot/Api/LongpollingUpdateTracker.cs b/src/RxTelegram.Bot/Api/LongpollingUpdateTracker.cs index c356e71..51d6c10 100644 --- a/src/RxTelegram.Bot/Api/LongpollingUpdateTracker.cs +++ b/src/RxTelegram.Bot/Api/LongpollingUpdateTracker.cs @@ -17,19 +17,17 @@ public class LongpollingUpdateTracker(ITelegramBot telegramBot) private int _isRunning = NotRunning; private CancellationTokenSource _cancellationTokenSource; UpdateType[] _trackedUpdateTypes = []; - List> _observers = new List>(); + readonly List> _observers = new List>(); - private IEnumerable GetTrackingUpdateTypes() - => _trackedUpdateTypes; public void Set(IEnumerable types) { - if (types == null && _trackedUpdateTypes == null) - return; - if (types == null) { - _trackedUpdateTypes = null; - _cancellationTokenSource?.Cancel(); + if (_trackedUpdateTypes != null) + { + _trackedUpdateTypes = null; + _cancellationTokenSource?.Cancel(); + } return; } @@ -47,20 +45,28 @@ internal async Task RunUpdateSafe() _cancellationTokenSource = new CancellationTokenSource(); await RunUpdate(); } - catch (Exception) + catch (Exception ex) { // ignored + ExceptionHelpers.ThrowIfFatal(ex); } finally { - Volatile.Write(ref _isRunning, NotRunning); + _cancellationTokenSource.Dispose(); _cancellationTokenSource = null; + + if (!_observers.Any()) + Volatile.Write(ref _isRunning, NotRunning); + else + RunUpdateTask(); } + void RunUpdateTask() => Task.Run(RunUpdateSafe); } - + + int? offset = null; // Offset must be preserved for all errors except TaskCanceledException. + // Using a local variable may cause duplicates if an exception occurs. internal async Task RunUpdate() { - int? offset = null; while (_observers.Count != 0) { @@ -76,8 +82,9 @@ internal async Task RunUpdate() Timeout = 60, // if there is a null value in the list, it means that all updates are allowed - AllowedUpdates = GetTrackingUpdateTypes() ?? null + AllowedUpdates = _trackedUpdateTypes ?? null }; + var result = await _telegramBot.GetUpdate(getUpdate, _cancellationTokenSource.Token); if (!result.Any()) { @@ -85,18 +92,16 @@ internal async Task RunUpdate() continue; } - offset = result.Max(x => x.UpdateId) + 1; NotifyObservers(result); + offset = result.Last().UpdateId + 1; } catch (TaskCanceledException) { - // create new token and check observers offset = null; _cancellationTokenSource = new CancellationTokenSource(); } catch (Exception exception) { - // unexpected exception report them to the observers and cancel run update OnException(exception); throw; } @@ -110,8 +115,27 @@ internal void NotifyObservers(Update[] updates) } internal void OnException(Exception exception) { - for (int oid = 0; oid != _observers.Count; ++oid) - _observers[oid].OnError(exception); + IObserver[] current; + lock (_observers) + { + current = _observers.ToArray(); // Caching current observers to prevent + // notifying those who subscribed after an error occurred. + _observers.Clear(); + } + + for (int oid = 0; oid != current.Length; ++oid) + { + try + { + current[oid].OnError(exception); + } + catch (Exception ex) + { + // Ignore exceptions from observers without an error handler, + // as it would break the process and propagate the exception to the outer scope. + ExceptionHelpers.ThrowIfFatal(ex); + } + } } internal void Remove(IObserver observer) { @@ -128,14 +152,12 @@ internal void Remove(IObserver observer) } public IDisposable Subscribe(IObserver observer) { - if(observer==null) + if (observer == null) throw new ArgumentNullException(nameof(observer)); + lock (_observers) { - if (!_observers.Contains(observer)) - { - _observers.Add(observer); - } + _observers.Add(observer); } if (Interlocked.Exchange(ref _isRunning, Running) == NotRunning) diff --git a/src/RxTelegram.Bot/Api/UpdateDistributor.cs b/src/RxTelegram.Bot/Api/UpdateDistributor.cs index c324d9e..adc9399 100644 --- a/src/RxTelegram.Bot/Api/UpdateDistributor.cs +++ b/src/RxTelegram.Bot/Api/UpdateDistributor.cs @@ -3,6 +3,7 @@ using RxTelegram.Bot.Interface.InlineMode; using RxTelegram.Bot.Interface.Payments; using RxTelegram.Bot.Interface.Setup; +using RxTelegram.Bot.Utils.Rx; using System; using System.Collections.Generic; using System.Linq; @@ -17,11 +18,14 @@ namespace RxTelegram.Bot.Api; public sealed class UpdateDistributor : IUpdateManager, IDisposable { - private IObservable _tracker; #region Observable Update properties - private Dictionary _updateInfos = new Dictionary(); - private UpdateTypeInfo _update = new UpdateTypeInfo(); + private readonly Dictionary _updateInfos; + private readonly UpdateTypeInfo _updateInfo; + private readonly IEnumerable _trackedTypes; + private readonly ReactiveProperty> _tracker; + private bool _isDisposed = false; + private readonly object _lock; public IObservable CallbackQuery => Selector(UpdateType.CallbackQuery, _update => _update.CallbackQuery); public IObservable ChannelPost => Selector(UpdateType.ChannelPost, _update => _update.ChannelPost); public IObservable ChatBoost => Selector(UpdateType.ChatBoost, _update => _update.ChatBoost); @@ -38,9 +42,7 @@ public sealed class UpdateDistributor : IUpdateManager, IDisposable public IObservable PreCheckoutQuery => Selector(UpdateType.PreCheckoutQuery, _update => _update.PreCheckoutQuery); public IObservable RemovedChatBoost => Selector(UpdateType.RemovedChatBoost, _update => _update.RemovedChatBoost); public IObservable ShippingQuery => Selector(UpdateType.ShippingQuery, _update => _update.ShippingQuery); - public IObservable Update => (IObservable)(_update.Observer ??= new UpdateSubject(x => x, - onSubscribe: AddGeneralListener, - onDispose: RemoveGeneralListener)); + public IObservable Update => Selector(null, _update => _update); #endregion #if NETSTANDARD2_1 @@ -65,101 +67,85 @@ public sealed class UpdateDistributor : IUpdateManager, IDisposable #endif public UpdateDistributor(IObservable updateTracker) { + _lock = new(); _updateInfos = Enum.GetValues(typeof(UpdateType)) .Cast() .ToDictionary(x => x, _ => new UpdateTypeInfo()); - Set(updateTracker); - } - private void AddGeneralListener() - { - ++_update.Listeners; + _updateInfo = new UpdateTypeInfo(); - (_tracker as ITrackerSetup)?.Set(null); + _trackedTypes = _updateInfos.Where(x => x.Value.Listeners != 0) + .Select(x => x.Key); - _update.Subscription ??= _tracker.Subscribe(_update.Observer); + _tracker = new ReactiveProperty>(updateTracker); + Set(updateTracker); } - private void AddListener(UpdateType type) + private void AddListener(UpdateType? type) { - var updateType = _updateInfos[type]; - ++updateType.Listeners; - - UpdateTrackerTypes(); + lock (_lock) + { + var info = GetInfo(type); + ++info.Listeners; - updateType.Subscription ??= _tracker.Subscribe(updateType.Observer); + if (info.Listeners != 1) return; + UpdateTrackerTypes(); + } } - private void RemoveGeneralListener() - { - --_update.Listeners; - _update.Subscription?.Dispose(); - _update.Subscription = null; + private UpdateTypeInfo GetInfo(UpdateType? type) + => type == null ? _updateInfo : _updateInfos[(UpdateType)type]; - UpdateTrackerTypes(); - } - private void RemoveListener(UpdateType type) + private void RemoveListener(UpdateType? type) { - var updateType = _updateInfos[type]; - --updateType.Listeners; - _update.Subscription?.Dispose(); - _update.Subscription = null; + lock (_lock) + { + var info = GetInfo(type); + --info.Listeners; + if (info.Listeners != 0) return; - UpdateTrackerTypes(); + UpdateTrackerTypes(); + } } - public IObservable Selector(UpdateType updateType, Func propertySelector) + public IObservable Selector(UpdateType? type, Func propertySelector) + where T : class { - var info = _updateInfos[updateType]; - if (info.Observer != null) - return (IObservable)info; - - var subject = new UpdateSubject(propertySelector, - onSubscribe: () => AddListener(updateType), - onDispose: () => RemoveListener(updateType)); - info.Observer = subject; - info.Subscription = _tracker.Subscribe(info.Observer); - return subject; + return _tracker.Switch().Select(propertySelector) + .Where(x => x != null) + .DoOnSubscribe(() => AddListener(type)) + .Finally(() => RemoveListener(type)); } + public void Set(IObservable tracker) { - //Setup current tracker to listen all messages before change to a new one - (_tracker as ITrackerSetup)?.Set(null); - DisposeTrackerSubcription(); - _tracker = tracker; + // Configure the current tracker to listen for all types of updates + // before switching to a new one + (_tracker.Current as ITrackerSetup)?.Set(null); + + _tracker.OnNext(tracker); UpdateTrackerTypes(); - SubscribeToTracker(); } - public void DisposeTrackerSubcription() - { - _update.Subscription?.Dispose(); - foreach (var info in _updateInfos.Values) - info.Subscription?.Dispose(); - } - public void SubscribeToTracker() + private void UpdateTrackerTypes() { - if(_update.Observer!=null) - _update.Subscription = _tracker.Subscribe(_update.Observer); - foreach (var info in _updateInfos.Values.Where(x=>x.Observer!=null)) - info.Subscription = _tracker.Subscribe(info.Observer); + if (_tracker.Current is not ITrackerSetup setup) return; + + setup.Set(_updateInfo.Listeners != 0 || !_trackedTypes.Any() ? + null : _trackedTypes); } - private void UpdateTrackerTypes() + + public void Dispose() => Dispose(true); + void Dispose(bool explicitDisposing) { - if (_tracker is not ITrackerSetup) return; + if (_isDisposed) return; - IEnumerable types = null; - if (_update.Listeners == 0) - { - types = _updateInfos.Where(x => x.Value.Listeners != 0).Select(x => x.Key); - if (!types.Any()) - types = null; - } - (_tracker as ITrackerSetup).Set(types); + if (explicitDisposing) + _tracker.Dispose(); + + _isDisposed = true; } - public void Dispose() => DisposeTrackerSubcription(); + ~UpdateDistributor() => Dispose(false); - private class UpdateTypeInfo + sealed private class UpdateTypeInfo { public int Listeners { get; set; } = 0; - public IObserver Observer { get; set; } = null; - public IDisposable Subscription { get; set; } = null; } } \ No newline at end of file diff --git a/src/RxTelegram.Bot/Interface/Setup/UpdateSubject.cs b/src/RxTelegram.Bot/Interface/Setup/UpdateSubject.cs deleted file mode 100644 index 4bdc2fe..0000000 --- a/src/RxTelegram.Bot/Interface/Setup/UpdateSubject.cs +++ /dev/null @@ -1,8 +0,0 @@ -using RxTelegram.Bot.Utils.Rx; -using System; - -namespace RxTelegram.Bot.Interface.Setup; - -public class UpdateSubject(Func selector, Action onSubscribe, Action onDispose) - : CustomSubject(selector, onSubscribe, onDispose) -{ } diff --git a/src/RxTelegram.Bot/Utils/Rx/CustomSubject.cs b/src/RxTelegram.Bot/Utils/Rx/CustomSubject.cs deleted file mode 100644 index 3c29cca..0000000 --- a/src/RxTelegram.Bot/Utils/Rx/CustomSubject.cs +++ /dev/null @@ -1,55 +0,0 @@ -using System; -using System.Collections.Generic; - -namespace RxTelegram.Bot.Utils.Rx; - -/// -/// Simple subject that multicasts observable values -/// It can use on subscribe and on dispose action -/// -/// Incoming type that the subject listens to -/// Result type that the subject emits -public class CustomSubject : IObserver, IObservable -{ - private readonly Func _selector; - private readonly Action _onSubscribe; - private readonly Action _onDispose; - private readonly List> _observers = new List>(); - public CustomSubject(Func selector, Action onSubscribe = null, Action onDispose = null) - { - if (selector == null) - throw new ArgumentNullException(nameof(selector)); - - this._selector = selector; - this._onSubscribe = onSubscribe; - this._onDispose = onDispose; - } - - public void OnCompleted() - { - for (int oid = 0; oid != _observers.Count; ++oid) - _observers[oid].OnCompleted(); - } - public void OnError(Exception error) - { - for (int oid = 0; oid != _observers.Count; ++oid) - _observers[oid].OnError(error); - } - public void OnNext(TInput value) - { - var result = _selector(value); - if (result == null) return; - for (int oid = 0; oid != _observers.Count; ++oid) - _observers[oid].OnNext(result); - } - public IDisposable Subscribe(IObserver observer) - { - _observers.Add(observer); - _onSubscribe?.Invoke(); - return new DisposableAction(() => - { - _onDispose?.Invoke(); - _observers.Remove(observer); - }); - } -} diff --git a/src/RxTelegram.Bot/Utils/Rx/DisposableAction.cs b/src/RxTelegram.Bot/Utils/Rx/DisposableAction.cs index 283864a..ec11ec8 100644 --- a/src/RxTelegram.Bot/Utils/Rx/DisposableAction.cs +++ b/src/RxTelegram.Bot/Utils/Rx/DisposableAction.cs @@ -1,8 +1,10 @@ using System; + namespace RxTelegram.Bot.Utils.Rx; -public class DisposableAction : IDisposable +sealed public class DisposableAction : IDisposable { + static public DisposableAction Empty { get; } = new DisposableAction(() => { }); private readonly Action action; public DisposableAction(Action action) @@ -13,4 +15,4 @@ public DisposableAction(Action action) } public void Dispose() => action(); -} +} \ No newline at end of file diff --git a/src/RxTelegram.Bot/Utils/Rx/DoOnDisposeObservable.cs b/src/RxTelegram.Bot/Utils/Rx/DoOnDisposeObservable.cs new file mode 100644 index 0000000..4b7094c --- /dev/null +++ b/src/RxTelegram.Bot/Utils/Rx/DoOnDisposeObservable.cs @@ -0,0 +1,26 @@ +using System; + +namespace RxTelegram.Bot.Utils.Rx; + +internal class DoOnDisposeObservable : IObservable +{ + private readonly IObservable _source; + private readonly Action _onDispose; + + public DoOnDisposeObservable(IObservable source, Action onDispose) + { + this._source = source ?? throw new ArgumentNullException(nameof(source)); + this._onDispose = onDispose ?? throw new ArgumentNullException(nameof(onDispose)); + } + + public IDisposable Subscribe(IObserver observer) + { + + var subscription = _source.Subscribe(observer); + return new DisposableAction(() => + { + subscription.Dispose(); + _onDispose(); + }); + } +} \ No newline at end of file diff --git a/src/RxTelegram.Bot/Utils/Rx/DoOnSubscribeObservable.cs b/src/RxTelegram.Bot/Utils/Rx/DoOnSubscribeObservable.cs new file mode 100644 index 0000000..82f2c10 --- /dev/null +++ b/src/RxTelegram.Bot/Utils/Rx/DoOnSubscribeObservable.cs @@ -0,0 +1,23 @@ +using System; + +namespace RxTelegram.Bot.Utils.Rx; + +internal class DoOnSubscribeObservable : IObservable +{ + private readonly IObservable _source; + private readonly Action _onSbuscribe; + + public DoOnSubscribeObservable(IObservable source, Action onSubscribe) + { + this._source = source ?? throw new ArgumentNullException(nameof(source)); + this._onSbuscribe = onSubscribe ?? throw new ArgumentNullException(nameof(onSubscribe)); + } + + public IDisposable Subscribe(IObserver observer) + { + + var subscription = _source.Subscribe(observer); + _onSbuscribe(); + return subscription; + } +} \ No newline at end of file diff --git a/src/RxTelegram.Bot/Utils/Rx/ExceptionHelpers.cs b/src/RxTelegram.Bot/Utils/Rx/ExceptionHelpers.cs new file mode 100644 index 0000000..4c6c1db --- /dev/null +++ b/src/RxTelegram.Bot/Utils/Rx/ExceptionHelpers.cs @@ -0,0 +1,14 @@ +using System; +using System.Threading; + +namespace RxTelegram.Bot.Utils.Rx; + +internal static class ExceptionHelpers +{ + internal static void ThrowIfFatal(Exception ex) + { + if (ex is OutOfMemoryException || ex is StackOverflowException + || ex is ThreadAbortException) + throw ex; + } +} \ No newline at end of file diff --git a/src/RxTelegram.Bot/Utils/Rx/FinallyObservable.cs b/src/RxTelegram.Bot/Utils/Rx/FinallyObservable.cs new file mode 100644 index 0000000..f4e1444 --- /dev/null +++ b/src/RxTelegram.Bot/Utils/Rx/FinallyObservable.cs @@ -0,0 +1,77 @@ +using System; +using System.Threading; + +namespace RxTelegram.Bot.Utils.Rx; + +internal class FinallyObservable : IObservable +{ + private readonly IObservable _source; + private readonly Action _onTerminate; + + public FinallyObservable(IObservable source, Action onTerminate) + { + _source = source ?? throw new ArgumentNullException(nameof(source)); + _onTerminate = onTerminate ?? throw new ArgumentNullException(nameof(onTerminate)); + } + + public IDisposable Subscribe(IObserver observer) + { + if (observer == null) + throw new ArgumentNullException(nameof(observer)); + + var finallyObserver = new FinallyObserver(observer, _onTerminate); + var subscription = _source.Subscribe(finallyObserver); + + return new DisposableAction(() => finallyObserver.Dispose(subscription)); + } + + private class FinallyObserver : IObserver + { + private readonly IObserver _observer; + private readonly Action _onTerminate; + private int _terminated; + + public FinallyObserver(IObserver observer, Action onTerminate) + { + _observer = observer; + _onTerminate = onTerminate; + } + public void Dispose(IDisposable subscription) + { + subscription.Dispose(); + Terminate(); + } + + public void OnCompleted() + { + _observer.OnCompleted(); + Terminate(); + } + + public void OnError(Exception error) + { + try + { + _observer.OnError(error); + } + catch (Exception ex) + { + ExceptionHelpers.ThrowIfFatal(ex); + } + finally + { + Terminate(); + } + } + + public void OnNext(T value) => _observer.OnNext(value); + + private void Terminate() + { + if (Interlocked.Exchange(ref _terminated, 1) == 0) + { + _onTerminate(); + } + } + } +} \ No newline at end of file diff --git a/src/RxTelegram.Bot/Utils/Rx/Interfaces/ISubject.cs b/src/RxTelegram.Bot/Utils/Rx/Interfaces/ISubject.cs new file mode 100644 index 0000000..24db1d7 --- /dev/null +++ b/src/RxTelegram.Bot/Utils/Rx/Interfaces/ISubject.cs @@ -0,0 +1,6 @@ +using System; + +namespace RxTelegram.Bot.Utils.Rx.Interfaces; + +public interface ISubject : IObserver, IObservable { } +public interface ISubject : ISubject { } \ No newline at end of file diff --git a/src/RxTelegram.Bot/Utils/Rx/ObservableOperators.cs b/src/RxTelegram.Bot/Utils/Rx/ObservableOperators.cs new file mode 100644 index 0000000..efa9fbf --- /dev/null +++ b/src/RxTelegram.Bot/Utils/Rx/ObservableOperators.cs @@ -0,0 +1,19 @@ +using System; + +namespace RxTelegram.Bot.Utils.Rx; + +static public class ObservableOperators +{ + static public IObservable DoOnDispose(this IObservable source, Action onDispose) + => new DoOnDisposeObservable(source, onDispose); + static public IObservable DoOnSubscribe(this IObservable source, Action onSubscribe) + => new DoOnSubscribeObservable(source, onSubscribe); + static public IObservable Finally(this IObservable source, Action onTerminate) + => new FinallyObservable(source,onTerminate); + static public IObservable Select(this IObservable source, Func selector) + => new SelectObservable(source, selector); + static public IObservable Switch(this IObservable> source) + => new SwitchObservable(source); + static public IObservable Where(this IObservable source, Func predicate) + => new WhereObservable(source, predicate); +} \ No newline at end of file diff --git a/src/RxTelegram.Bot/Utils/Rx/ReactiveProperty.cs b/src/RxTelegram.Bot/Utils/Rx/ReactiveProperty.cs new file mode 100644 index 0000000..c98d9ad --- /dev/null +++ b/src/RxTelegram.Bot/Utils/Rx/ReactiveProperty.cs @@ -0,0 +1,131 @@ +using RxTelegram.Bot.Utils.Rx.Interfaces; +using System; +using System.Collections.Generic; +using System.Threading; + +namespace RxTelegram.Bot.Utils.Rx; + +public class ReactiveProperty : ISubject, IDisposable +{ + static private readonly List> Terminated = []; + private readonly object _lock = new object(); + protected Exception _error; + private T _current; + public T Current + { + get => _current; + protected set { _current = value; HasValue = true; } + } + public bool HasValue { get; private set; } + private List> _observers; + public bool IsDisposed { get; protected set; } + public bool IsError => _error != null; + public Exception Error => _error; + public ReactiveProperty() + { + _error = null; + _observers = new List>(); + } + public ReactiveProperty(T initValue) : this() + { + Current = initValue; + } + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } + + void Dispose(bool explicitDisposing) + { + if (IsDisposed) return; + + if (explicitDisposing) + DisposeManaged(); + + IsDisposed = true; + } + protected virtual void DisposeManaged() + { + OnCompleted(); + Interlocked.Exchange(ref _observers, Terminated); + } + public void OnCompleted() + { + lock (_lock) + { + ThrowIfDisposed(); + foreach (var observer in _observers) + observer.OnCompleted(); + + _observers = Terminated; + } + } + public void OnError(Exception error) + { + lock (_lock) + { + ThrowIfDisposed(); + _error = error; + Current = default; + foreach (var observer in _observers) + { + try + { + observer.OnError(error); + } + catch (Exception ex) + { + //ignore + // if observer doesn't have exception handler block + // it will lead to breaking process and throwing exception to outer scope + ExceptionHelpers.ThrowIfFatal(ex); + } + } + + _observers = Terminated; + } + } + public void OnNext(T value) + { + lock (_lock) + { + ThrowIfDisposed(); + Current = value; + + foreach (var observer in _observers) + observer.OnNext(value); + } + } + public IDisposable Subscribe(IObserver observer) + { + lock (_lock) + { + ThrowIfDisposed(); + if (_observers == Terminated) + { + observer.OnCompleted(); + return DisposableAction.Empty; + } + + _observers.Add(observer); + if (HasValue) + observer.OnNext(Current); + + return new DisposableAction(() => + { + lock (_lock) + { + _observers.Remove(observer); + } + }); + } + } + + protected void ThrowIfDisposed() + { + if (IsDisposed) + throw new ObjectDisposedException(nameof(ReactiveProperty)); + } + ~ReactiveProperty() => Dispose(false); +} \ No newline at end of file diff --git a/src/RxTelegram.Bot/Utils/Rx/SelectObservable.cs b/src/RxTelegram.Bot/Utils/Rx/SelectObservable.cs new file mode 100644 index 0000000..8638d25 --- /dev/null +++ b/src/RxTelegram.Bot/Utils/Rx/SelectObservable.cs @@ -0,0 +1,51 @@ +using System; + +namespace RxTelegram.Bot.Utils.Rx; +internal class SelectObservable : IObservable +{ + private readonly IObservable _source; + internal readonly Func _selector; + + public SelectObservable(IObservable source, Func selector) + { + this._source = source ?? throw new ArgumentNullException(nameof(source)); + this._selector = selector ?? throw new ArgumentNullException(nameof(selector)); + } + + public IDisposable Subscribe(IObserver observer) + { + if (observer == null) + throw new ArgumentNullException(nameof(observer)); + var selectObserver = new SelectObserver(observer, _selector); + return _source.Subscribe(selectObserver); + } + private class SelectObserver : IObserver + { + private readonly IObserver _observer; + private readonly Func _selector; + + public SelectObserver(IObserver observer, Func selector) + { + this._observer = observer; + this._selector = selector; + } + + public void OnCompleted() => _observer.OnCompleted(); + public void OnError(Exception error) => _observer.OnError(error); + public void OnNext(T value) + { + K result; + try + { + result = _selector(value); + } + catch (Exception ex) + { + _observer.OnError(ex); + return; + } + + _observer.OnNext(result); + } + } +} \ No newline at end of file diff --git a/src/RxTelegram.Bot/Utils/Rx/SwitchObservable.cs b/src/RxTelegram.Bot/Utils/Rx/SwitchObservable.cs new file mode 100644 index 0000000..0bbbee9 --- /dev/null +++ b/src/RxTelegram.Bot/Utils/Rx/SwitchObservable.cs @@ -0,0 +1,89 @@ +using System; + +namespace RxTelegram.Bot.Utils.Rx; + +internal class SwitchObservable : IObservable +{ + private readonly IObservable> _source; + + public SwitchObservable(IObservable> source) + { + _source = source ?? throw new ArgumentNullException(nameof(source)); + } + public IDisposable Subscribe(IObserver observer) + { + var switchObserver = new SwitchObserver(observer); + var stream = _source.Subscribe(switchObserver); + return new DisposableAction(() => + { + stream.Dispose(); + switchObserver.Dispose(); + }); + } + + private class SwitchObserver : IObserver>, IDisposable + { + private readonly object _lock = new(); + private IObserver _observer; + private IDisposable _subscription; + private bool _isDisposed = false; + public SwitchObserver(IObserver observer) + { + this._observer = observer ?? throw new ArgumentNullException(nameof(observer)); + } + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } + + public void OnCompleted() + { + lock (_lock) + { + if (_isDisposed) return; + _observer?.OnCompleted(); + Dispose(); + } + } + + public void OnError(Exception error) + { + lock (_lock) + { + if (_isDisposed) return; + _observer?.OnError(error); + Dispose(); + } + } + + public void OnNext(IObservable stream) + { + if (stream == null) throw new ArgumentNullException(nameof(stream)); + + lock (_lock) + { + if (_isDisposed) + throw new ObjectDisposedException(nameof(SwitchObserver)); + + _subscription?.Dispose(); + _subscription = stream.Subscribe(_observer); + } + } + void Dispose(bool explicitDisposing) + { + if (_isDisposed) return; + + if (explicitDisposing) + { + _subscription?.Dispose(); + _subscription = null; + _observer = null; + } + + _isDisposed = true; + } + + ~SwitchObserver() => Dispose(false); + } +} \ No newline at end of file diff --git a/src/RxTelegram.Bot/Utils/Rx/WhereObservable.cs b/src/RxTelegram.Bot/Utils/Rx/WhereObservable.cs new file mode 100644 index 0000000..ff1802c --- /dev/null +++ b/src/RxTelegram.Bot/Utils/Rx/WhereObservable.cs @@ -0,0 +1,85 @@ +using System; + +namespace RxTelegram.Bot.Utils.Rx; + +internal class WhereObservable : IObservable +{ + private readonly IObservable _source; + internal readonly Func _predicate; + + public WhereObservable(IObservable source, Func predicate) + { + _source = source ?? throw new ArgumentNullException(nameof(source)); + _predicate = predicate ?? throw new ArgumentNullException(nameof(predicate)); + } + public IDisposable Subscribe(IObserver observer) + { + var where = new WhereObserver(observer, _predicate); + var subscription = _source.Subscribe(where); + return new DisposableAction(() => + { + subscription.Dispose(); + where.Dispose(); + }); + } + private class WhereObserver : IObserver, IDisposable + { + private readonly object _lock = new(); + private readonly Func _predicate; + private IObserver _observer; + private bool _isCompleted; + private bool _isDisposed; + + public WhereObserver(IObserver observer, Func predicate) + { + _observer = observer ?? throw new ArgumentNullException(nameof(observer)); + _predicate = predicate ?? throw new ArgumentNullException(nameof(predicate)); + } + + public void OnCompleted() + { + if (_isCompleted || _isDisposed) return; + _isCompleted = true; + _observer?.OnCompleted(); + } + + public void OnError(Exception error) + { + lock (_lock) + { + if (_isCompleted || _isDisposed) return; + _isCompleted = true; + _observer?.OnError(error); + } + } + + public void OnNext(T value) + { + lock (_lock) + { + if (_isCompleted || _isDisposed) return; + + bool isPassed = false; + try + { + isPassed = _predicate(value); + } + catch (Exception error) + { + OnError(error); + } + if (isPassed) + _observer.OnNext(value); + } + } + + public void Dispose() + { + lock (_lock) + { + _isDisposed = true; + _observer = null; + } + } + } +} \ No newline at end of file