Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Libs: add polling consumer #1832

Merged
merged 3 commits into from
Mar 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
198 changes: 198 additions & 0 deletions csharp/Svix/EventsPublic.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
// this file is @generated
#nullable enable
using Microsoft.Extensions.Logging;
using Svix.Models;

namespace Svix
{
public class EventsPublicConsumerOptions : SvixOptionsBase
{
public ulong? Limit { get; set; }
public string? Iterator { get; set; }
public string? EventType { get; set; }
public string? Channel { get; set; }

public new Dictionary<string, string> QueryParams()
{
return SerializeParams(
new Dictionary<string, object?>
{
{ "limit", Limit },
{ "iterator", Iterator },
{ "event_type", EventType },
{ "channel", Channel },
}
);
}
}

public class EventsPublicConsumerSeekOptions : SvixOptionsBase
{
public string? IdempotencyKey { get; set; }

public new Dictionary<string, string> HeaderParams()
{
return SerializeParams(
new Dictionary<string, object?> { { "idempotency-key", IdempotencyKey } }
);
}
}

public class EventsPublic(SvixClient client)
{
readonly SvixClient _client = client;

/// <summary>
/// Reads the stream of created messages for an application, filtered on the Sink's event types and
/// Channels, using server-managed iterator tracking.
/// </summary>
public async Task<PollingEndpointOut> ConsumerAsync(
string appId,
string sinkId,
string consumerId,
EventsPublicConsumerOptions? options = null,
CancellationToken cancellationToken = default
)
{
try
{
var response = await _client.SvixHttpClient.SendRequestAsync<PollingEndpointOut>(
method: HttpMethod.Get,
path: "/api/v1/app/{app_id}/poller/{sink_id}/consumer/{consumer_id}",
pathParams: new Dictionary<string, string>
{
{ "app_id", appId },
{ "sink_id", sinkId },
{ "consumer_id", consumerId },
},
queryParams: options?.QueryParams(),
headerParams: options?.HeaderParams(),
cancellationToken: cancellationToken
);
return response.Data;
}
catch (ApiException e)
{
_client.Logger?.LogError(e, $"{nameof(ConsumerAsync)} failed");

throw;
}
}

/// <summary>
/// Reads the stream of created messages for an application, filtered on the Sink's event types and
/// Channels, using server-managed iterator tracking.
/// </summary>
public PollingEndpointOut Consumer(
string appId,
string sinkId,
string consumerId,
EventsPublicConsumerOptions? options = null
)
{
try
{
var response = _client.SvixHttpClient.SendRequest<PollingEndpointOut>(
method: HttpMethod.Get,
path: "/api/v1/app/{app_id}/poller/{sink_id}/consumer/{consumer_id}",
pathParams: new Dictionary<string, string>
{
{ "app_id", appId },
{ "sink_id", sinkId },
{ "consumer_id", consumerId },
},
queryParams: options?.QueryParams(),
headerParams: options?.HeaderParams()
);
return response.Data;
}
catch (ApiException e)
{
_client.Logger?.LogError(e, $"{nameof(Consumer)} failed");

throw;
}
}

/// <summary>
/// Sets the starting offset for the consumer of a polling endpoint.
/// </summary>
public async Task<PollingEndpointConsumerSeekOut> ConsumerSeekAsync(
string appId,
string sinkId,
string consumerId,
PollingEndpointConsumerSeekIn pollingEndpointConsumerSeekIn,
EventsPublicConsumerSeekOptions? options = null,
CancellationToken cancellationToken = default
)
{
pollingEndpointConsumerSeekIn =
pollingEndpointConsumerSeekIn
?? throw new ArgumentNullException(nameof(pollingEndpointConsumerSeekIn));
try
{
var response =
await _client.SvixHttpClient.SendRequestAsync<PollingEndpointConsumerSeekOut>(
method: HttpMethod.Post,
path: "/api/v1/app/{app_id}/poller/{sink_id}/consumer/{consumer_id}/seek",
pathParams: new Dictionary<string, string>
{
{ "app_id", appId },
{ "sink_id", sinkId },
{ "consumer_id", consumerId },
},
queryParams: options?.QueryParams(),
headerParams: options?.HeaderParams(),
content: pollingEndpointConsumerSeekIn,
cancellationToken: cancellationToken
);
return response.Data;
}
catch (ApiException e)
{
_client.Logger?.LogError(e, $"{nameof(ConsumerSeekAsync)} failed");

throw;
}
}

/// <summary>
/// Sets the starting offset for the consumer of a polling endpoint.
/// </summary>
public PollingEndpointConsumerSeekOut ConsumerSeek(
string appId,
string sinkId,
string consumerId,
PollingEndpointConsumerSeekIn pollingEndpointConsumerSeekIn,
EventsPublicConsumerSeekOptions? options = null
)
{
pollingEndpointConsumerSeekIn =
pollingEndpointConsumerSeekIn
?? throw new ArgumentNullException(nameof(pollingEndpointConsumerSeekIn));
try
{
var response = _client.SvixHttpClient.SendRequest<PollingEndpointConsumerSeekOut>(
method: HttpMethod.Post,
path: "/api/v1/app/{app_id}/poller/{sink_id}/consumer/{consumer_id}/seek",
pathParams: new Dictionary<string, string>
{
{ "app_id", appId },
{ "sink_id", sinkId },
{ "consumer_id", consumerId },
},
queryParams: options?.QueryParams(),
headerParams: options?.HeaderParams(),
content: pollingEndpointConsumerSeekIn
);
return response.Data;
}
catch (ApiException e)
{
_client.Logger?.LogError(e, $"{nameof(ConsumerSeek)} failed");

throw;
}
}
}
}
22 changes: 22 additions & 0 deletions csharp/Svix/Models/PollingEndpointConsumerSeekIn.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// this file is @generated
using System.Text;
using Newtonsoft.Json;

namespace Svix.Models
{
public class PollingEndpointConsumerSeekIn
{
[JsonProperty("after", Required = Required.Always)]
public required DateTime After { get; set; }

public override string ToString()
{
StringBuilder sb = new StringBuilder();

sb.Append("class PollingEndpointConsumerSeekIn {\n");
sb.Append(" After: ").Append(After).Append('\n');
sb.Append("}\n");
return sb.ToString();
}
}
}
22 changes: 22 additions & 0 deletions csharp/Svix/Models/PollingEndpointConsumerSeekOut.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// this file is @generated
using System.Text;
using Newtonsoft.Json;

namespace Svix.Models
{
public class PollingEndpointConsumerSeekOut
{
[JsonProperty("iterator", Required = Required.Always)]
public required string Iterator { get; set; }

public override string ToString()
{
StringBuilder sb = new StringBuilder();

sb.Append("class PollingEndpointConsumerSeekOut {\n");
sb.Append(" Iterator: ").Append(Iterator).Append('\n');
sb.Append("}\n");
return sb.ToString();
}
}
}
54 changes: 54 additions & 0 deletions csharp/Svix/Models/PollingEndpointMessageOut.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// this file is @generated
using System.Text;
using Newtonsoft.Json;

namespace Svix.Models
{
/// <summary>
/// The MessageOut equivalent of polling endpoint
/// <summary>

public class PollingEndpointMessageOut
{
[JsonProperty("channels")]
public List<string>? Channels { get; set; } = null;

[JsonProperty("eventId")]
public string? EventId { get; set; } = null;

[JsonProperty("eventType", Required = Required.Always)]
public required string EventType { get; set; }

[JsonProperty("headers")]
public Dictionary<string, string>? Headers { get; set; } = null;

[JsonProperty("id", Required = Required.Always)]
public required string Id { get; set; }

[JsonProperty("payload", Required = Required.Always)]
public required Object Payload { get; set; }

[JsonProperty("tags")]
public List<string>? Tags { get; set; } = null;

[JsonProperty("timestamp", Required = Required.Always)]
public required DateTime Timestamp { get; set; }

public override string ToString()
{
StringBuilder sb = new StringBuilder();

sb.Append("class PollingEndpointMessageOut {\n");
sb.Append(" Channels: ").Append(Channels).Append('\n');
sb.Append(" EventId: ").Append(EventId).Append('\n');
sb.Append(" EventType: ").Append(EventType).Append('\n');
sb.Append(" Headers: ").Append(Headers).Append('\n');
sb.Append(" Id: ").Append(Id).Append('\n');
sb.Append(" Payload: ").Append(Payload).Append('\n');
sb.Append(" Tags: ").Append(Tags).Append('\n');
sb.Append(" Timestamp: ").Append(Timestamp).Append('\n');
sb.Append("}\n");
return sb.ToString();
}
}
}
30 changes: 30 additions & 0 deletions csharp/Svix/Models/PollingEndpointOut.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// this file is @generated
using System.Text;
using Newtonsoft.Json;

namespace Svix.Models
{
public class PollingEndpointOut
{
[JsonProperty("data", Required = Required.Always)]
public required List<PollingEndpointMessageOut> Data { get; set; }

[JsonProperty("done", Required = Required.Always)]
public required bool Done { get; set; }

[JsonProperty("iterator", Required = Required.Always)]
public required string Iterator { get; set; }

public override string ToString()
{
StringBuilder sb = new StringBuilder();

sb.Append("class PollingEndpointOut {\n");
sb.Append(" Data: ").Append(Data).Append('\n');
sb.Append(" Done: ").Append(Done).Append('\n');
sb.Append(" Iterator: ").Append(Iterator).Append('\n');
sb.Append("}\n");
return sb.ToString();
}
}
}
Loading