Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/RxTelegram.Bot/Api/ITrackerSetup.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
using RxTelegram.Bot.Interface.BaseTypes.Enums;
using System.Collections.Generic;

namespace RxTelegram.Bot.Api;

public interface ITrackerSetup
{
void Set(IEnumerable<UpdateType> types);
}
6 changes: 6 additions & 0 deletions src/RxTelegram.Bot/Api/IUpdateManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ namespace RxTelegram.Bot.Api;

public interface IUpdateManager
{
/// <summary>
/// Allows to set custom updates tracker
/// </summary>
/// <param name="tracker"></param>
void Set(IObservable<Update> tracker);

/// <summary>
/// Updates of all Types.
/// </summary>
Expand Down
148 changes: 148 additions & 0 deletions src/RxTelegram.Bot/Api/LongpollingUpdateTracker.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
using RxTelegram.Bot.Interface.BaseTypes.Enums;
using RxTelegram.Bot.Interface.Setup;
using RxTelegram.Bot.Utils.Rx;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace RxTelegram.Bot.Api;
public class LongpollingUpdateTracker(ITelegramBot telegramBot)
: IObservable<Update>, ITrackerSetup
{
private readonly ITelegramBot _telegramBot = telegramBot;
private const int NotRunning = 0;
private const int Running = 1;
private int _isRunning = NotRunning;
private CancellationTokenSource _cancellationTokenSource;
UpdateType[] _trackedUpdateTypes = [];
List<IObserver<Update>> _observers = new List<IObserver<Update>>();

private IEnumerable<UpdateType> GetTrackingUpdateTypes()
=> _trackedUpdateTypes;
public void Set(IEnumerable<UpdateType> types)
{
if (types == null && _trackedUpdateTypes == null)
return;

if (types == null)
{
_trackedUpdateTypes = null;
_cancellationTokenSource?.Cancel();
return;
}

if (_trackedUpdateTypes == null || !types.SequenceEqual(_trackedUpdateTypes))
{
_trackedUpdateTypes = types.ToArray();
_cancellationTokenSource?.Cancel();
}
}

internal async Task RunUpdateSafe()
{
try
{
_cancellationTokenSource = new CancellationTokenSource();
await RunUpdate();
}
catch (Exception)
{
// ignored
}
finally
{
Volatile.Write(ref _isRunning, NotRunning);
_cancellationTokenSource = null;
}
}

internal async Task RunUpdate()
{
int? offset = null;

while (_observers.Count != 0)
{
try
{
// if the token already canceled before the first request reset token
if (_cancellationTokenSource.IsCancellationRequested)
_cancellationTokenSource = new CancellationTokenSource();

var getUpdate = new GetUpdate
{
Offset = offset,
Timeout = 60,

// if there is a null value in the list, it means that all updates are allowed
AllowedUpdates = GetTrackingUpdateTypes() ?? null
};
var result = await _telegramBot.GetUpdate(getUpdate, _cancellationTokenSource.Token);
if (!result.Any())
{
await Task.Delay(1000);
continue;
}

offset = result.Max(x => x.UpdateId) + 1;
NotifyObservers(result);
}
catch (TaskCanceledException)
{
// create new token and check observers
offset = null;
_cancellationTokenSource = new CancellationTokenSource();
}
catch (Exception exception)
{
// unexpected exception report them to the observers and cancel run update
OnException(exception);
throw;
}
}
}
internal void NotifyObservers(Update[] updates)
{
for (int uid = 0; uid != updates.Length; ++uid)
for (int oid = 0; oid != _observers.Count; ++oid)
_observers[oid].OnNext(updates[uid]);
}
internal void OnException(Exception exception)
{
for (int oid = 0; oid != _observers.Count; ++oid)
_observers[oid].OnError(exception);
}
internal void Remove(IObserver<Update> observer)
{
if (!_observers.Contains(observer))
return;

lock (_observers)
{
_observers.Remove(observer);
}

if (!_observers.Any() && Volatile.Read(ref _isRunning) == Running)
_cancellationTokenSource?.Cancel();
}
public IDisposable Subscribe(IObserver<Update> observer)
{
if(observer==null)
throw new ArgumentNullException(nameof(observer));
lock (_observers)
{
if (!_observers.Contains(observer))
{
_observers.Add(observer);
}
}

if (Interlocked.Exchange(ref _isRunning, Running) == NotRunning)
{
Task.Run(RunUpdateSafe);
}

return new DisposableAction(() => Remove(observer));
}
}
165 changes: 165 additions & 0 deletions src/RxTelegram.Bot/Api/UpdateDistributor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
using RxTelegram.Bot.Interface.BaseTypes;
using RxTelegram.Bot.Interface.BaseTypes.Enums;
using RxTelegram.Bot.Interface.InlineMode;
using RxTelegram.Bot.Interface.Payments;
using RxTelegram.Bot.Interface.Setup;
using System;
using System.Collections.Generic;
using System.Linq;

#if NETSTANDARD2_1

using RxTelegram.Bot.Utils;

#endif

namespace RxTelegram.Bot.Api;

public sealed class UpdateDistributor : IUpdateManager, IDisposable
{
private IObservable<Update> _tracker;

#region Observable Update properties
private Dictionary<UpdateType, UpdateTypeInfo> _updateInfos = new Dictionary<UpdateType, UpdateTypeInfo>();
private UpdateTypeInfo _update = new UpdateTypeInfo();
public IObservable<CallbackQuery> CallbackQuery => Selector(UpdateType.CallbackQuery, _update => _update.CallbackQuery);
public IObservable<Message> ChannelPost => Selector(UpdateType.ChannelPost, _update => _update.ChannelPost);
public IObservable<ChatBoostUpdated> ChatBoost => Selector(UpdateType.ChatBoost, _update => _update.ChatBoost);
public IObservable<ChatJoinRequest> ChatJoinRequest => Selector(UpdateType.ChatJoinRequest, _update => _update.ChatJoinRequest);
public IObservable<ChatMemberUpdated> ChatMember => Selector(UpdateType.ChatMember, _update => _update.ChatMember);
public IObservable<ChosenInlineResult> ChosenInlineResult => Selector(UpdateType.ChosenInlineResult, _update => _update.ChosenInlineResult);
public IObservable<Message> EditedChannelPost => Selector(UpdateType.EditedChannelPost, _update => _update.EditedChannelPost);
public IObservable<Message> EditedMessage => Selector(UpdateType.EditedMessage, _update => _update.EditedMessage);
public IObservable<InlineQuery> InlineQuery => Selector(UpdateType.InlineQuery, _update => _update.InlineQuery);
public IObservable<Message> Message => Selector(UpdateType.Message, _update => _update.Message);
public IObservable<ChatMemberUpdated> MyChatMember => Selector(UpdateType.MyChatMember, _update => _update.MyChatMember);
public IObservable<Poll> Poll => Selector(UpdateType.Poll, _update => _update.Poll);
public IObservable<PollAnswer> PollAnswer => Selector(UpdateType.PollAnswer, _update => _update.PollAnswer);
public IObservable<PreCheckoutQuery> PreCheckoutQuery => Selector(UpdateType.PreCheckoutQuery, _update => _update.PreCheckoutQuery);
public IObservable<ChatBoostRemoved> RemovedChatBoost => Selector(UpdateType.RemovedChatBoost, _update => _update.RemovedChatBoost);
public IObservable<ShippingQuery> ShippingQuery => Selector(UpdateType.ShippingQuery, _update => _update.ShippingQuery);
public IObservable<Update> Update => (IObservable<Update>)(_update.Observer ??= new UpdateSubject<Update>(x => x,
onSubscribe: AddGeneralListener,
onDispose: RemoveGeneralListener));
#endregion

#if NETSTANDARD2_1
public IAsyncEnumerable<CallbackQuery> CallbackQueryEnumerable() => CallbackQuery.ToAsyncEnumerable();
public IAsyncEnumerable<Message> ChannelPostEnumerable() => ChannelPost.ToAsyncEnumerable();
public IAsyncEnumerable<ChatBoostUpdated> ChatBoostEnumerable() => ChatBoost.ToAsyncEnumerable();
public IAsyncEnumerable<ChatJoinRequest> ChatJoinRequestEnumerable() => ChatJoinRequest.ToAsyncEnumerable();
public IAsyncEnumerable<ChatMemberUpdated> ChatMemberEnumerable() => ChatMember.ToAsyncEnumerable();
public IAsyncEnumerable<ChosenInlineResult> ChosenInlineResultEnumerable() => ChosenInlineResult.ToAsyncEnumerable();
public IAsyncEnumerable<Message> EditedChannelPostEnumerable() => EditedChannelPost.ToAsyncEnumerable();
public IAsyncEnumerable<Message> EditedMessageEnumerable() => EditedMessage.ToAsyncEnumerable();
public IAsyncEnumerable<InlineQuery> InlineQueryEnumerable() => InlineQuery.ToAsyncEnumerable();
public IAsyncEnumerable<Message> MessageEnumerable() => Message.ToAsyncEnumerable();
public IAsyncEnumerable<ChatMemberUpdated> MyChatMemberEnumerable() => MyChatMember.ToAsyncEnumerable();
public IAsyncEnumerable<ShippingQuery> ShippingQueryEnumerable() => ShippingQuery.ToAsyncEnumerable();
public IAsyncEnumerable<PollAnswer> PollAnswerEnumerable() => PollAnswer.ToAsyncEnumerable();
public IAsyncEnumerable<Poll> PollEnumerable() => Poll.ToAsyncEnumerable();
public IAsyncEnumerable<PreCheckoutQuery> PreCheckoutQueryEnumerable() => PreCheckoutQuery.ToAsyncEnumerable();
public IAsyncEnumerable<ChatBoostRemoved> RemovedChatBoostEnumerable() => RemovedChatBoost.ToAsyncEnumerable();
public IAsyncEnumerable<Update> UpdateEnumerable() => Update.ToAsyncEnumerable();

#endif
public UpdateDistributor(IObservable<Update> updateTracker)
{
_updateInfos = Enum.GetValues(typeof(UpdateType))
.Cast<UpdateType>()
.ToDictionary(x => x, _ => new UpdateTypeInfo());
Set(updateTracker);
}

private void AddGeneralListener()
{
++_update.Listeners;

(_tracker as ITrackerSetup)?.Set(null);

_update.Subscription ??= _tracker.Subscribe(_update.Observer);
}
private void AddListener(UpdateType type)
{
var updateType = _updateInfos[type];
++updateType.Listeners;

UpdateTrackerTypes();

updateType.Subscription ??= _tracker.Subscribe(updateType.Observer);
}
private void RemoveGeneralListener()
{
--_update.Listeners;
_update.Subscription?.Dispose();
_update.Subscription = null;

UpdateTrackerTypes();
}
private void RemoveListener(UpdateType type)
{
var updateType = _updateInfos[type];
--updateType.Listeners;
_update.Subscription?.Dispose();
_update.Subscription = null;

UpdateTrackerTypes();
}
public IObservable<T> Selector<T>(UpdateType updateType, Func<Update, T> propertySelector)
{
var info = _updateInfos[updateType];
if (info.Observer != null)
return (IObservable<T>)info;

var subject = new UpdateSubject<T>(propertySelector,
onSubscribe: () => AddListener(updateType),
onDispose: () => RemoveListener(updateType));
info.Observer = subject;
info.Subscription = _tracker.Subscribe(info.Observer);
return subject;
}
public void Set(IObservable<Update> tracker)
{
//Setup current tracker to listen all messages before change to a new one
(_tracker as ITrackerSetup)?.Set(null);
DisposeTrackerSubcription();
_tracker = tracker;
UpdateTrackerTypes();
SubscribeToTracker();
}
public void DisposeTrackerSubcription()
{
_update.Subscription?.Dispose();
foreach (var info in _updateInfos.Values)
info.Subscription?.Dispose();
}
public void SubscribeToTracker()
{
if(_update.Observer!=null)
_update.Subscription = _tracker.Subscribe(_update.Observer);
foreach (var info in _updateInfos.Values.Where(x=>x.Observer!=null))
info.Subscription = _tracker.Subscribe(info.Observer);
}
private void UpdateTrackerTypes()
{
if (_tracker is not ITrackerSetup) return;

IEnumerable<UpdateType> types = null;
if (_update.Listeners == 0)
{
types = _updateInfos.Where(x => x.Value.Listeners != 0).Select(x => x.Key);
if (!types.Any())
types = null;
}
(_tracker as ITrackerSetup).Set(types);
}

public void Dispose() => DisposeTrackerSubcription();

private class UpdateTypeInfo
{
public int Listeners { get; set; } = 0;
public IObserver<Update> Observer { get; set; } = null;
public IDisposable Subscription { get; set; } = null;
}
}
2 changes: 2 additions & 0 deletions src/RxTelegram.Bot/Api/UpdateManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ namespace RxTelegram.Bot.Api;

public class UpdateManager : IUpdateManager
{
public void Set(IObservable<Update> tracker) => throw new NotImplementedException();

public IObservable<Update> Update => _update;

public IObservable<Message> Message => _message;
Expand Down
8 changes: 8 additions & 0 deletions src/RxTelegram.Bot/Interface/Setup/UpdateSubject.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
using RxTelegram.Bot.Utils.Rx;
using System;

namespace RxTelegram.Bot.Interface.Setup;

public class UpdateSubject<T>(Func<Update, T> selector, Action onSubscribe, Action onDispose)
: CustomSubject<Update, T>(selector, onSubscribe, onDispose)
{ }
Loading
Loading