Skip to content

Commit

Permalink
misc refactoring + integration tests (#12)
Browse files Browse the repository at this point in the history
* misc refactoring + integration tests

* review changes

* ProduceAsync method signature changes

* Deserializing Consumer + misc improvements to Consumer

* Removed Consumer.Start method, added Consumer.Poll

* Consumer.Consume method signature change

* Consumer benchmark tests

* Redirect poll queue -> consumer poll queue in Consumer

* get/query watermark API improvements + integration tests

* DeserializingConsumer [bug fix / API changes / examples / integration tests]

* misc tidyup + documentation

* Offset class equality, inequality, hash fn, ToString + unit tests

* OnLog integration tests + Flush method -> CLS compliant.

* string.Join..

* structs -> immutable, equality ops. unit tests. name changes. misc other.

* improvement to .Consume internals

* error rethink

* MessageInfo -> Message

* timeout related API changes

* tidyup

* review changes
  • Loading branch information
mhowlett authored Jan 16, 2017
1 parent b1eaf65 commit bb1438a
Show file tree
Hide file tree
Showing 102 changed files with 4,368 additions and 1,506 deletions.
29 changes: 28 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,40 @@ confluent-kafka-dotnet - .NET Apache Kafka client

Forked from [rdkafka-dotnet](https://github.com/ah-/rdkafka-dotnet) by Andreas Heider

Copyright (c) 2016 [Confluent Inc.](https://www.confluent.io), 2015-2016, [Andreas Heider](mailto:[email protected])
Copyright (c) 2016-2017 [Confluent Inc.](https://www.confluent.io), 2015-2017, [Andreas Heider](mailto:[email protected])


## Usage

Just reference the [confluent-kafka-dotnet NuGet package](https://www.nuget.org/packages/confluent-kafka-dotnet)

## Build

To build the library or any test or example project, run the following from within the relevant project directory:

```
dotnet restore
dotnet build
```

To run one of the examples, use the following from within the relevant project directory:

```
dotnet run <command line args>
```

To run the integration or unit tests, use the following from within the relevant project directory:

```
dotnet test
```

To create a nuget package, use the following from wihin `src/Confluent.Kafka`:

```
dotnet pack
```

## Examples

## Documentation
Expand Down
240 changes: 199 additions & 41 deletions examples/AdvancedConsumer/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,90 +2,248 @@
using System.Collections.Generic;
using System.Linq;
using System.Text;
using Confluent.Kafka.Serialization;


namespace Confluent.Kafka.AdvancedConsumer
/// <summary>
/// Demonstrates use of the deserializing Consumer.
/// </summary>
namespace Confluent.Kafka.Examples.AdvancedConsumer
{
public class Program
{
public static void Run(string brokerList, List<string> topics)
{
bool enableAutoCommit = false;

var config = new Dictionary<string, object>
private static Dictionary<string, object> constructConfig(string brokerList, bool enableAutoCommit) =>
new Dictionary<string, object>
{
{ "group.id", "advanced-csharp-consumer" },
{ "enable.auto.commit", enableAutoCommit },
{ "auto.commit.interval.ms", 5000 },
{ "statistics.interval.ms", 60000 },
{ "bootstrap.servers", brokerList }
{ "bootstrap.servers", brokerList },
{ "default.topic.config", new Dictionary<string, object>()
{
{ "auto.offset.reset", "smallest" }
}
}
};

using (var consumer = new EventConsumer(config))
/// <summary>
// In this example:
/// - offsets are auto commited.
/// - OnMessage is used to consume messages.
/// - the poll loop is performed on a background thread started using Consumer.Start().
/// </summary>
public static void Run_Background(string brokerList, List<string> topics)
{
using (var consumer = new Consumer<Null, string>(constructConfig(brokerList, true), null, new StringDeserializer(Encoding.UTF8)))
{
consumer.OnMessage += (obj, msg) => {
string text = Encoding.UTF8.GetString(msg.Value, 0, msg.Value.Length);
Console.WriteLine($"Topic: {msg.Topic} Partition: {msg.Partition} Offset: {msg.Offset} {text}");
// Note: All event handlers are executed on the same thread (the one created/started by the Consumer.Start())

if (!enableAutoCommit && msg.Offset % 10 == 0)
consumer.OnMessage += (_, msg)
=> Console.WriteLine($"Topic: {msg.Topic} Partition: {msg.Partition} Offset: {msg.Offset} {msg.Value}");

consumer.OnPartitionEOF += (_, end)
=> Console.WriteLine($"Reached end of topic {end.Topic} partition {end.Partition}, next message will be at offset {end.Offset}");

consumer.OnError += (_, error)
=> Console.WriteLine($"Error: {error}");

consumer.OnOffsetCommit += (_, commit) =>
{
Console.WriteLine($"[{string.Join(", ", commit.Offsets)}]");

if (commit.Error)
{
Console.WriteLine($"Committing offset");
consumer.Commit(msg).Wait();
Console.WriteLine($"Committed offset");
Console.WriteLine($"Failed to commit offsets: {commit.Error}");
}
Console.WriteLine($"Successfully committed offsets: [{string.Join(", ", commit.Offsets)}]");
};

consumer.OnConsumerError += (obj, errorCode) =>
consumer.OnPartitionsAssigned += (_, partitions) =>
{
Console.WriteLine($"Consumer Error: {errorCode}");
Console.WriteLine($"Assigned partitions: [{string.Join(", ", partitions)}], member id: {consumer.MemberId}");
consumer.Assign(partitions.Select(p => new TopicPartitionOffset(p, Offset.Invalid)));
};

consumer.OnEndReached += (obj, end) => {
Console.WriteLine($"Reached end of topic {end.Topic} partition {end.Partition}, next message will be at offset {end.Offset}");
consumer.OnPartitionsRevoked += (_, partitions) =>
{
Console.WriteLine($"Revoked partitions: [{string.Join(", ", partitions)}]");
consumer.Unassign();
};

consumer.OnStatistics += (_, json)
=> Console.WriteLine($"Statistics: {json}");

consumer.Subscribe(topics);

Console.WriteLine($"Subscribed to: [{string.Join(", ", consumer.Subscription)}]");

consumer.Start();

Console.WriteLine($"Started consumer, press enter to stop consuming");
Console.ReadLine();

consumer.Stop();
}
}

/// <summary>
// In this example:
/// - offsets are auto commited.
/// - consumer.Poll / OnMessage is used to consume messages.
/// - the poll loop is performed on a separate thread.
/// </summary>
public static void Run_Poll(string brokerList, List<string> topics)
{
using (var consumer = new Consumer<Null, string>(constructConfig(brokerList, true), null, new StringDeserializer(Encoding.UTF8)))
{
// Note: All event handlers are called on the main thread.

consumer.OnMessage += (_, msg)
=> Console.WriteLine($"Topic: {msg.Topic} Partition: {msg.Partition} Offset: {msg.Offset} {msg.Value}");

consumer.OnPartitionEOF += (_, end)
=> Console.WriteLine($"Reached end of topic {end.Topic} partition {end.Partition}, next message will be at offset {end.Offset}");

consumer.OnError += (_, error)
=> Console.WriteLine($"Error: {error}");

consumer.OnOffsetCommit += (_, commit) =>
{
Console.WriteLine($"[{string.Join(", ", commit.Offsets)}]");

if (commit.Error)
{
Console.WriteLine($"Failed to commit offsets: {commit.Error}");
}
Console.WriteLine($"Successfully committed offsets: [{string.Join(", ", commit.Offsets)}]");
};

consumer.OnError += (obj, error) => {
Console.WriteLine($"Error: {error.ErrorCode} {error.Reason}");
consumer.OnPartitionsAssigned += (_, partitions) =>
{
Console.WriteLine($"Assigned partitions: [{string.Join(", ", partitions)}], member id: {consumer.MemberId}");
consumer.Assign(partitions.Select(p => new TopicPartitionOffset(p, Offset.Invalid)));
};

if (enableAutoCommit)
consumer.OnPartitionsRevoked += (_, partitions) =>
{
consumer.OnOffsetCommit += (obj, commit) => {
if (commit.Error != ErrorCode.NO_ERROR)
{
Console.WriteLine($"Failed to commit offsets: {commit.Error}");
}
Console.WriteLine($"Successfully committed offsets: [{string.Join(", ", commit.Offsets)}]");
};
Console.WriteLine($"Revoked partitions: [{string.Join(", ", partitions)}]");
consumer.Unassign();
};

consumer.OnStatistics += (_, json)
=> Console.WriteLine($"Statistics: {json}");

consumer.Subscribe(topics);

Console.WriteLine($"Subscribed to: [{string.Join(", ", consumer.Subscription)}]");

var cancelled = false;
Console.CancelKeyPress += (_, e) => {
e.Cancel = true; // prevent the process from terminating.
cancelled = true;
};

Console.WriteLine("Ctrl-C to exit.");
while(!cancelled)
{
consumer.Poll(TimeSpan.FromMilliseconds(100));
}
}
}

/// <summary>
/// In this example
/// - offsets are manually committed.
/// - consumer.Consume is used to consume messages.
/// (all other events are still handled by event handlers)
/// - no extra thread is created for the Poll (Consume) loop.
/// </summary>
public static void Run_Consume(string brokerList, List<string> topics)
{
using (var consumer = new Consumer<Null, string>(constructConfig(brokerList, false), null, new StringDeserializer(Encoding.UTF8)))
{
// Note: All event handlers are called on the main thread.

consumer.OnPartitionEOF += (_, end)
=> Console.WriteLine($"Reached end of topic {end.Topic} partition {end.Partition}, next message will be at offset {end.Offset}");

consumer.OnError += (_, error)
=> Console.WriteLine($"Error: {error}");

consumer.OnOffsetCommit += (_, commit) =>
{
Console.WriteLine($"[{string.Join(", ", commit.Offsets)}]");

if (commit.Error)
{
Console.WriteLine($"Failed to commit offsets: {commit.Error}");
}
Console.WriteLine($"Successfully committed offsets: [{string.Join(", ", commit.Offsets)}]");
};

consumer.OnPartitionsAssigned += (obj, partitions) => {
consumer.OnPartitionsAssigned += (_, partitions) =>
{
Console.WriteLine($"Assigned partitions: [{string.Join(", ", partitions)}], member id: {consumer.MemberId}");
consumer.Assign(partitions);
consumer.Assign(partitions.Select(p => new TopicPartitionOffset(p, Offset.Invalid)));
};

consumer.OnPartitionsRevoked += (obj, partitions) => {
consumer.OnPartitionsRevoked += (_, partitions) =>
{
Console.WriteLine($"Revoked partitions: [{string.Join(", ", partitions)}]");
consumer.Unassign();
};

consumer.OnStatistics += (obj, json) => {
Console.WriteLine($"Statistics: {json}");
};
consumer.OnStatistics += (_, json)
=> Console.WriteLine($"Statistics: {json}");

consumer.Subscribe(topics);
consumer.Start();

Console.WriteLine($"Assigned to: [{string.Join(", ", consumer.Assignment)}]");
Console.WriteLine($"Subscribed to: [{string.Join(", ", consumer.Subscription)}]");
Console.WriteLine($"Started consumer, Ctrl-C to stop consuming");

Console.WriteLine($"Started consumer, press enter to stop consuming");
Console.ReadLine();
var cancelled = false;
Console.CancelKeyPress += (_, e) => {
e.Cancel = true; // prevent the process from terminating.
cancelled = true;
};

while (!cancelled)
{
Message<Null, string> msg;
if (!consumer.Consume(out msg, TimeSpan.FromMilliseconds(100)))
{
continue;
}

Console.WriteLine($"Topic: {msg.Topic} Partition: {msg.Partition} Offset: {msg.Offset} {msg.Value}");

if (msg.Offset % 5 == 0)
{
Console.WriteLine($"Committing offset");
consumer.Commit(msg);
Console.WriteLine($"Committed offset");
}
}
}
}

public static void Main(string[] args)
{
Run(args[0], args.Skip(1).ToList());
var mode = args[1];
var topics = args.Skip(2).ToList();

switch (args[0])
{
case "poll":
Run_Poll(mode, topics);
break;
case "consume":
Run_Consume(mode, topics);
break;
case "background":
Run_Background(mode, topics);
break;
}
}
}
}
6 changes: 3 additions & 3 deletions examples/AdvancedProducer/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
using System.Threading.Tasks;
using Confluent.Kafka.Serialization;

namespace Confluent.Kafka.AdvancedProducer
namespace Confluent.Kafka.Examples.AdvancedProducer
{
public class Program
{
Expand Down Expand Up @@ -49,7 +49,7 @@ public static void Main(string[] args)
Console.WriteLine("Ctrl-C to quit.\n");

var cancelled = false;
Console.CancelKeyPress += (object sender, ConsoleCancelEventArgs e) => {
Console.CancelKeyPress += (_, e) => {
e.Cancel = true; // prevent the process from terminating.
cancelled = true;
};
Expand Down Expand Up @@ -80,7 +80,7 @@ public static void Main(string[] args)
val = text.Substring(index + 1);
}

Task<DeliveryReport> deliveryReport = producer.ProduceAsync(topicName, key, val);
var deliveryReport = producer.ProduceAsync(topicName, key, val);
var result = deliveryReport.Result; // synchronously waits for message to be produced.
Console.WriteLine($"Partition: {result.Partition}, Offset: {result.Offset}");
}
Expand Down
18 changes: 0 additions & 18 deletions examples/Benchmark/Benchmark.xproj

This file was deleted.

Loading

0 comments on commit bb1438a

Please sign in to comment.