forked from ch-robinson/dotnet-avro
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathSchemaResolutionOptions.cs
88 lines (79 loc) · 3.7 KB
/
SchemaResolutionOptions.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
using System;
using System.Threading.Tasks;
using SchemaRegistryClient = Confluent.SchemaRegistry.CachedSchemaRegistryClient;
using SchemaRegistryConfiguration = Confluent.SchemaRegistry.SchemaRegistryConfig;
using SchemaRegistryException = Confluent.SchemaRegistry.SchemaRegistryException;
namespace Chr.Avro.Cli
{
public interface ISchemaResolutionOptions
{
string RegistryUrl { get; }
int? SchemaId { get; }
string SchemaSubject { get; }
int? SchemaVersion { get; }
}
internal static class SchemaResolutionOptionExtensions
{
public static async Task<string> ResolveSchema(this ISchemaResolutionOptions options)
{
var configuration = new SchemaRegistryConfiguration
{
SchemaRegistryUrl = options.RegistryUrl
};
using (var client = new SchemaRegistryClient(configuration))
{
try
{
if (options.SchemaId is int id)
{
if (!string.IsNullOrEmpty(options.SchemaSubject) || options.SchemaVersion.HasValue)
{
throw new ProgramException(message: "When using --id, don’t use --schema or --version.");
}
try
{
return await client.GetSchemaAsync(id);
}
catch (AggregateException aggregate) when (aggregate.InnerException is SchemaRegistryException inner)
{
throw new ProgramException(message: $"Failed to retrieve schema with ID {id} ({inner.Message}).", inner: inner);
}
}
else
{
if (options.SchemaSubject is var subject && string.IsNullOrEmpty(subject))
{
throw new ProgramException(message: "Either --id or --schema (and optionally --version) must be provided.");
}
if (options.SchemaVersion is int version)
{
try
{
return await client.GetSchemaAsync(subject, version);
}
catch (AggregateException aggregate) when (aggregate.InnerException is SchemaRegistryException inner)
{
throw new ProgramException(message: $"Failed to retrieve schema with subject {subject} and version {version} ({inner.Message}).", inner: inner);
}
}
else
{
try
{
return (await client.GetLatestSchemaAsync(options.SchemaSubject)).SchemaString;
}
catch (AggregateException aggregate) when (aggregate.InnerException is SchemaRegistryException inner)
{
throw new ProgramException(message: $"Failed to retrieve latest schema with subject {subject} ({inner.Message}).", inner: inner);
}
}
}
}
catch (AggregateException aggregate) when (aggregate.InnerException is Exception inner)
{
throw new ProgramException(message: $"Failed to connect to the Schema Registry ({inner.Message}).", inner: inner);
}
}
}
}
}