-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathProgram.cs
64 lines (48 loc) · 1.79 KB
/
Program.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
using AvroSpecific;
using Confluent.Kafka;
using Confluent.SchemaRegistry;
using Confluent.SchemaRegistry.Serdes;
public void Main()
{
string bootstrapServers = "localhost:29092";
string schemaRegistryUrl = "localhost:8081";
string topicName = "usertopic";
var producerConfig = new ProducerConfig
{
BootstrapServers = bootstrapServers
};
var schemaRegistryConfig = new SchemaRegistryConfig
{
Url = schemaRegistryUrl
};
var avroSerializerConfig = new AvroSerializerConfig
{
// optional Avro serializer properties:
BufferBytes = 100
};
CancellationTokenSource cts = new CancellationTokenSource();
using (var schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryConfig))
using (var producer = new ProducerBuilder<string, User>(producerConfig)
.SetValueSerializer(new AvroSerializer<User>(schemaRegistry, avroSerializerConfig))
.Build())
{
Console.WriteLine($"{producer.Name} producing on {topicName}. Enter user names, q to exit.");
int i = 1;
string text;
while ((text = Console.ReadLine()) != "q")
{
User user = new User { name = text, favorite_color = "green", favorite_number = ++i };
producer.ProduceAsync(topicName, new Message<string, User> { Key = null, Value = user })
.ContinueWith(task =>
{
if (!task.IsFaulted)
{
Console.WriteLine($"produced to: {task.Result.TopicPartitionOffset}");
return;
}
Console.WriteLine($"error producing message: {task.Exception.InnerException}");
});
}
}
cts.Cancel();
}