diff --git a/src/RxTelegram.Bot/Api/ITrackerSetup.cs b/src/RxTelegram.Bot/Api/ITrackerSetup.cs new file mode 100644 index 0000000..0943df7 --- /dev/null +++ b/src/RxTelegram.Bot/Api/ITrackerSetup.cs @@ -0,0 +1,9 @@ +using RxTelegram.Bot.Interface.BaseTypes.Enums; +using System.Collections.Generic; + +namespace RxTelegram.Bot.Api; + +public interface ITrackerSetup +{ + void Set(IEnumerable types); +} diff --git a/src/RxTelegram.Bot/Api/IUpdateManager.cs b/src/RxTelegram.Bot/Api/IUpdateManager.cs index 56a611d..7020685 100644 --- a/src/RxTelegram.Bot/Api/IUpdateManager.cs +++ b/src/RxTelegram.Bot/Api/IUpdateManager.cs @@ -12,6 +12,12 @@ namespace RxTelegram.Bot.Api; public interface IUpdateManager { + /// + /// Allows to set custom updates tracker + /// + /// + void Set(IObservable tracker); + /// /// Updates of all Types. /// diff --git a/src/RxTelegram.Bot/Api/LongpollingUpdateTracker.cs b/src/RxTelegram.Bot/Api/LongpollingUpdateTracker.cs new file mode 100644 index 0000000..c356e71 --- /dev/null +++ b/src/RxTelegram.Bot/Api/LongpollingUpdateTracker.cs @@ -0,0 +1,148 @@ +using RxTelegram.Bot.Interface.BaseTypes.Enums; +using RxTelegram.Bot.Interface.Setup; +using RxTelegram.Bot.Utils.Rx; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; + +namespace RxTelegram.Bot.Api; +public class LongpollingUpdateTracker(ITelegramBot telegramBot) + : IObservable, ITrackerSetup +{ + private readonly ITelegramBot _telegramBot = telegramBot; + private const int NotRunning = 0; + private const int Running = 1; + private int _isRunning = NotRunning; + private CancellationTokenSource _cancellationTokenSource; + UpdateType[] _trackedUpdateTypes = []; + List> _observers = new List>(); + + private IEnumerable GetTrackingUpdateTypes() + => _trackedUpdateTypes; + public void Set(IEnumerable types) + { + if (types == null && _trackedUpdateTypes == null) + return; + + if (types == null) + { + _trackedUpdateTypes = null; + _cancellationTokenSource?.Cancel(); + return; + } + + if (_trackedUpdateTypes == null || !types.SequenceEqual(_trackedUpdateTypes)) + { + _trackedUpdateTypes = types.ToArray(); + _cancellationTokenSource?.Cancel(); + } + } + + internal async Task RunUpdateSafe() + { + try + { + _cancellationTokenSource = new CancellationTokenSource(); + await RunUpdate(); + } + catch (Exception) + { + // ignored + } + finally + { + Volatile.Write(ref _isRunning, NotRunning); + _cancellationTokenSource = null; + } + } + + internal async Task RunUpdate() + { + int? offset = null; + + while (_observers.Count != 0) + { + try + { + // if the token already canceled before the first request reset token + if (_cancellationTokenSource.IsCancellationRequested) + _cancellationTokenSource = new CancellationTokenSource(); + + var getUpdate = new GetUpdate + { + Offset = offset, + Timeout = 60, + + // if there is a null value in the list, it means that all updates are allowed + AllowedUpdates = GetTrackingUpdateTypes() ?? null + }; + var result = await _telegramBot.GetUpdate(getUpdate, _cancellationTokenSource.Token); + if (!result.Any()) + { + await Task.Delay(1000); + continue; + } + + offset = result.Max(x => x.UpdateId) + 1; + NotifyObservers(result); + } + catch (TaskCanceledException) + { + // create new token and check observers + offset = null; + _cancellationTokenSource = new CancellationTokenSource(); + } + catch (Exception exception) + { + // unexpected exception report them to the observers and cancel run update + OnException(exception); + throw; + } + } + } + internal void NotifyObservers(Update[] updates) + { + for (int uid = 0; uid != updates.Length; ++uid) + for (int oid = 0; oid != _observers.Count; ++oid) + _observers[oid].OnNext(updates[uid]); + } + internal void OnException(Exception exception) + { + for (int oid = 0; oid != _observers.Count; ++oid) + _observers[oid].OnError(exception); + } + internal void Remove(IObserver observer) + { + if (!_observers.Contains(observer)) + return; + + lock (_observers) + { + _observers.Remove(observer); + } + + if (!_observers.Any() && Volatile.Read(ref _isRunning) == Running) + _cancellationTokenSource?.Cancel(); + } + public IDisposable Subscribe(IObserver observer) + { + if(observer==null) + throw new ArgumentNullException(nameof(observer)); + lock (_observers) + { + if (!_observers.Contains(observer)) + { + _observers.Add(observer); + } + } + + if (Interlocked.Exchange(ref _isRunning, Running) == NotRunning) + { + Task.Run(RunUpdateSafe); + } + + return new DisposableAction(() => Remove(observer)); + } +} \ No newline at end of file diff --git a/src/RxTelegram.Bot/Api/UpdateDistributor.cs b/src/RxTelegram.Bot/Api/UpdateDistributor.cs new file mode 100644 index 0000000..c324d9e --- /dev/null +++ b/src/RxTelegram.Bot/Api/UpdateDistributor.cs @@ -0,0 +1,165 @@ +using RxTelegram.Bot.Interface.BaseTypes; +using RxTelegram.Bot.Interface.BaseTypes.Enums; +using RxTelegram.Bot.Interface.InlineMode; +using RxTelegram.Bot.Interface.Payments; +using RxTelegram.Bot.Interface.Setup; +using System; +using System.Collections.Generic; +using System.Linq; + +#if NETSTANDARD2_1 + +using RxTelegram.Bot.Utils; + +#endif + +namespace RxTelegram.Bot.Api; + +public sealed class UpdateDistributor : IUpdateManager, IDisposable +{ + private IObservable _tracker; + + #region Observable Update properties + private Dictionary _updateInfos = new Dictionary(); + private UpdateTypeInfo _update = new UpdateTypeInfo(); + public IObservable CallbackQuery => Selector(UpdateType.CallbackQuery, _update => _update.CallbackQuery); + public IObservable ChannelPost => Selector(UpdateType.ChannelPost, _update => _update.ChannelPost); + public IObservable ChatBoost => Selector(UpdateType.ChatBoost, _update => _update.ChatBoost); + public IObservable ChatJoinRequest => Selector(UpdateType.ChatJoinRequest, _update => _update.ChatJoinRequest); + public IObservable ChatMember => Selector(UpdateType.ChatMember, _update => _update.ChatMember); + public IObservable ChosenInlineResult => Selector(UpdateType.ChosenInlineResult, _update => _update.ChosenInlineResult); + public IObservable EditedChannelPost => Selector(UpdateType.EditedChannelPost, _update => _update.EditedChannelPost); + public IObservable EditedMessage => Selector(UpdateType.EditedMessage, _update => _update.EditedMessage); + public IObservable InlineQuery => Selector(UpdateType.InlineQuery, _update => _update.InlineQuery); + public IObservable Message => Selector(UpdateType.Message, _update => _update.Message); + public IObservable MyChatMember => Selector(UpdateType.MyChatMember, _update => _update.MyChatMember); + public IObservable Poll => Selector(UpdateType.Poll, _update => _update.Poll); + public IObservable PollAnswer => Selector(UpdateType.PollAnswer, _update => _update.PollAnswer); + public IObservable PreCheckoutQuery => Selector(UpdateType.PreCheckoutQuery, _update => _update.PreCheckoutQuery); + public IObservable RemovedChatBoost => Selector(UpdateType.RemovedChatBoost, _update => _update.RemovedChatBoost); + public IObservable ShippingQuery => Selector(UpdateType.ShippingQuery, _update => _update.ShippingQuery); + public IObservable Update => (IObservable)(_update.Observer ??= new UpdateSubject(x => x, + onSubscribe: AddGeneralListener, + onDispose: RemoveGeneralListener)); + #endregion + +#if NETSTANDARD2_1 + public IAsyncEnumerable CallbackQueryEnumerable() => CallbackQuery.ToAsyncEnumerable(); + public IAsyncEnumerable ChannelPostEnumerable() => ChannelPost.ToAsyncEnumerable(); + public IAsyncEnumerable ChatBoostEnumerable() => ChatBoost.ToAsyncEnumerable(); + public IAsyncEnumerable ChatJoinRequestEnumerable() => ChatJoinRequest.ToAsyncEnumerable(); + public IAsyncEnumerable ChatMemberEnumerable() => ChatMember.ToAsyncEnumerable(); + public IAsyncEnumerable ChosenInlineResultEnumerable() => ChosenInlineResult.ToAsyncEnumerable(); + public IAsyncEnumerable EditedChannelPostEnumerable() => EditedChannelPost.ToAsyncEnumerable(); + public IAsyncEnumerable EditedMessageEnumerable() => EditedMessage.ToAsyncEnumerable(); + public IAsyncEnumerable InlineQueryEnumerable() => InlineQuery.ToAsyncEnumerable(); + public IAsyncEnumerable MessageEnumerable() => Message.ToAsyncEnumerable(); + public IAsyncEnumerable MyChatMemberEnumerable() => MyChatMember.ToAsyncEnumerable(); + public IAsyncEnumerable ShippingQueryEnumerable() => ShippingQuery.ToAsyncEnumerable(); + public IAsyncEnumerable PollAnswerEnumerable() => PollAnswer.ToAsyncEnumerable(); + public IAsyncEnumerable PollEnumerable() => Poll.ToAsyncEnumerable(); + public IAsyncEnumerable PreCheckoutQueryEnumerable() => PreCheckoutQuery.ToAsyncEnumerable(); + public IAsyncEnumerable RemovedChatBoostEnumerable() => RemovedChatBoost.ToAsyncEnumerable(); + public IAsyncEnumerable UpdateEnumerable() => Update.ToAsyncEnumerable(); + +#endif + public UpdateDistributor(IObservable updateTracker) + { + _updateInfos = Enum.GetValues(typeof(UpdateType)) + .Cast() + .ToDictionary(x => x, _ => new UpdateTypeInfo()); + Set(updateTracker); + } + + private void AddGeneralListener() + { + ++_update.Listeners; + + (_tracker as ITrackerSetup)?.Set(null); + + _update.Subscription ??= _tracker.Subscribe(_update.Observer); + } + private void AddListener(UpdateType type) + { + var updateType = _updateInfos[type]; + ++updateType.Listeners; + + UpdateTrackerTypes(); + + updateType.Subscription ??= _tracker.Subscribe(updateType.Observer); + } + private void RemoveGeneralListener() + { + --_update.Listeners; + _update.Subscription?.Dispose(); + _update.Subscription = null; + + UpdateTrackerTypes(); + } + private void RemoveListener(UpdateType type) + { + var updateType = _updateInfos[type]; + --updateType.Listeners; + _update.Subscription?.Dispose(); + _update.Subscription = null; + + UpdateTrackerTypes(); + } + public IObservable Selector(UpdateType updateType, Func propertySelector) + { + var info = _updateInfos[updateType]; + if (info.Observer != null) + return (IObservable)info; + + var subject = new UpdateSubject(propertySelector, + onSubscribe: () => AddListener(updateType), + onDispose: () => RemoveListener(updateType)); + info.Observer = subject; + info.Subscription = _tracker.Subscribe(info.Observer); + return subject; + } + public void Set(IObservable tracker) + { + //Setup current tracker to listen all messages before change to a new one + (_tracker as ITrackerSetup)?.Set(null); + DisposeTrackerSubcription(); + _tracker = tracker; + UpdateTrackerTypes(); + SubscribeToTracker(); + } + public void DisposeTrackerSubcription() + { + _update.Subscription?.Dispose(); + foreach (var info in _updateInfos.Values) + info.Subscription?.Dispose(); + } + public void SubscribeToTracker() + { + if(_update.Observer!=null) + _update.Subscription = _tracker.Subscribe(_update.Observer); + foreach (var info in _updateInfos.Values.Where(x=>x.Observer!=null)) + info.Subscription = _tracker.Subscribe(info.Observer); + } + private void UpdateTrackerTypes() + { + if (_tracker is not ITrackerSetup) return; + + IEnumerable types = null; + if (_update.Listeners == 0) + { + types = _updateInfos.Where(x => x.Value.Listeners != 0).Select(x => x.Key); + if (!types.Any()) + types = null; + } + (_tracker as ITrackerSetup).Set(types); + } + + public void Dispose() => DisposeTrackerSubcription(); + + private class UpdateTypeInfo + { + public int Listeners { get; set; } = 0; + public IObserver Observer { get; set; } = null; + public IDisposable Subscription { get; set; } = null; + } +} \ No newline at end of file diff --git a/src/RxTelegram.Bot/Api/UpdateManager.cs b/src/RxTelegram.Bot/Api/UpdateManager.cs index cdc386d..51a6740 100644 --- a/src/RxTelegram.Bot/Api/UpdateManager.cs +++ b/src/RxTelegram.Bot/Api/UpdateManager.cs @@ -16,6 +16,8 @@ namespace RxTelegram.Bot.Api; public class UpdateManager : IUpdateManager { + public void Set(IObservable tracker) => throw new NotImplementedException(); + public IObservable Update => _update; public IObservable Message => _message; diff --git a/src/RxTelegram.Bot/Interface/Setup/UpdateSubject.cs b/src/RxTelegram.Bot/Interface/Setup/UpdateSubject.cs new file mode 100644 index 0000000..4bdc2fe --- /dev/null +++ b/src/RxTelegram.Bot/Interface/Setup/UpdateSubject.cs @@ -0,0 +1,8 @@ +using RxTelegram.Bot.Utils.Rx; +using System; + +namespace RxTelegram.Bot.Interface.Setup; + +public class UpdateSubject(Func selector, Action onSubscribe, Action onDispose) + : CustomSubject(selector, onSubscribe, onDispose) +{ } diff --git a/src/RxTelegram.Bot/TelegramBot.Builder.cs b/src/RxTelegram.Bot/TelegramBot.Builder.cs new file mode 100644 index 0000000..943c4c6 --- /dev/null +++ b/src/RxTelegram.Bot/TelegramBot.Builder.cs @@ -0,0 +1,42 @@ +using System; +using RxTelegram.Bot.Api; +using RxTelegram.Bot.Interface.Setup; + +namespace RxTelegram.Bot; + +public partial class TelegramBot : BaseTelegramBot, ITelegramBot +{ + public class Builder + { + private string _token; + private IObservable _tracker = null; + private IUpdateManager _updateManager = null; + public Builder() { } + public Builder(string token) : this() { _token = token; } + + public Builder SetToken(string token) + { + _token = token; + return this; + } + public Builder SetTracker(IObservable tracker) + { + _tracker = tracker; + return this; + } + public Builder SetManager(IUpdateManager updateManager) + { + _updateManager = updateManager; + return this; + } + public TelegramBot Build() + { + var bot = new TelegramBot(_token); + _tracker ??= new LongpollingUpdateTracker(bot); + bot.Updates = _updateManager ?? new UpdateDistributor(_tracker); + bot.Updates.Set(_tracker); + + return bot; + } + } +} \ No newline at end of file diff --git a/src/RxTelegram.Bot/TelegramBot.cs b/src/RxTelegram.Bot/TelegramBot.cs index 7a8c1e5..c09bfd4 100644 --- a/src/RxTelegram.Bot/TelegramBot.cs +++ b/src/RxTelegram.Bot/TelegramBot.cs @@ -30,15 +30,19 @@ namespace RxTelegram.Bot; -public class TelegramBot : BaseTelegramBot, ITelegramBot +public partial class TelegramBot : BaseTelegramBot, ITelegramBot { public TelegramBot(string token) : this(new BotInfo(token)) { } - public TelegramBot(BotInfo botInfo) : base(botInfo) => Updates = new UpdateManager(this); + public TelegramBot(BotInfo botInfo) : base(botInfo) + { + var tracker = new LongpollingUpdateTracker(this); + Updates = new UpdateDistributor(tracker); + } - public IUpdateManager Updates { get; } + public IUpdateManager Updates { get; private set; } /// /// Use this method to get basic info about a file and prepare it for downloading. diff --git a/src/RxTelegram.Bot/Utils/Rx/CustomSubject.cs b/src/RxTelegram.Bot/Utils/Rx/CustomSubject.cs new file mode 100644 index 0000000..3c29cca --- /dev/null +++ b/src/RxTelegram.Bot/Utils/Rx/CustomSubject.cs @@ -0,0 +1,55 @@ +using System; +using System.Collections.Generic; + +namespace RxTelegram.Bot.Utils.Rx; + +/// +/// Simple subject that multicasts observable values +/// It can use on subscribe and on dispose action +/// +/// Incoming type that the subject listens to +/// Result type that the subject emits +public class CustomSubject : IObserver, IObservable +{ + private readonly Func _selector; + private readonly Action _onSubscribe; + private readonly Action _onDispose; + private readonly List> _observers = new List>(); + public CustomSubject(Func selector, Action onSubscribe = null, Action onDispose = null) + { + if (selector == null) + throw new ArgumentNullException(nameof(selector)); + + this._selector = selector; + this._onSubscribe = onSubscribe; + this._onDispose = onDispose; + } + + public void OnCompleted() + { + for (int oid = 0; oid != _observers.Count; ++oid) + _observers[oid].OnCompleted(); + } + public void OnError(Exception error) + { + for (int oid = 0; oid != _observers.Count; ++oid) + _observers[oid].OnError(error); + } + public void OnNext(TInput value) + { + var result = _selector(value); + if (result == null) return; + for (int oid = 0; oid != _observers.Count; ++oid) + _observers[oid].OnNext(result); + } + public IDisposable Subscribe(IObserver observer) + { + _observers.Add(observer); + _onSubscribe?.Invoke(); + return new DisposableAction(() => + { + _onDispose?.Invoke(); + _observers.Remove(observer); + }); + } +} diff --git a/src/RxTelegram.Bot/Utils/Rx/DisposableAction.cs b/src/RxTelegram.Bot/Utils/Rx/DisposableAction.cs new file mode 100644 index 0000000..283864a --- /dev/null +++ b/src/RxTelegram.Bot/Utils/Rx/DisposableAction.cs @@ -0,0 +1,16 @@ +using System; +namespace RxTelegram.Bot.Utils.Rx; + +public class DisposableAction : IDisposable +{ + private readonly Action action; + + public DisposableAction(Action action) + { + if (action == null) + throw new ArgumentNullException(nameof(action)); + this.action = action; + } + + public void Dispose() => action(); +}