Skip to content

Commit

Permalink
Fix #3 add readmes that are included in the nuget package (#9)
Browse files Browse the repository at this point in the history
  • Loading branch information
Mpdreamz authored Jan 25, 2023
1 parent 66b4872 commit 379a7ea
Show file tree
Hide file tree
Showing 10 changed files with 231 additions and 17 deletions.
16 changes: 10 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
# `Elastic.Ingest`
# `Elastic.Ingest.*`

This repository houses various `Elastic.Ingest.*` packages that utilize `Elastic.Channels` to send bulk data to various (Elastic) endpoints.

## Usage

### Projects

```c#
```
* [Elastic.Channels](src/Elastic.Channels/README.md) - core library that implements a batching `System.Threading.Channels.ChannelWriter`
* [Elastic.Ingest.Transport](src/Elastic.Ingest.Transport/README.md) - core library that ships common setup for pushing data utilizing [Elastic.Transport](https://github.com/elastic/elastic-transport-net)
* [Elastic.Ingest.Elasticsearch](src/Elastic.Ingest.Elasticsearch/README.md) - exposes `DataStreamChannel` and `IndexChannel` to push data to Elasticsearch with great ease.

#### in development
* [Elastic.Ingest.APM](src/Elastic.Ingest.Apm/README.md) - Pushes APM data to apm-server over the V2 intake API. Still under development.

### Projects
#### No plans of releasing
* [Elastic.Ingest.OpenTelemetry](src/Elastic.Ingest.OpenTelemetry/README.md) - a toy implementation of `Elastic.Channels` that pushes `Activities` over `OTLP`

7 changes: 7 additions & 0 deletions elastic-ingest-dotnet.sln
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "examples", "examples", "{B6
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Elastic.Channels.Example", "examples\Elastic.Channels.Example\Elastic.Channels.Example.csproj", "{D584C7ED-A2F5-472F-9DEE-5F36B88B558E}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Elastic.Ingest.OpenTelemetry", "src\Elastic.Ingest.OpenTelemetry\Elastic.Ingest.OpenTelemetry.csproj", "{92F87F85-3028-4E98-A1C7-6CCEC4392AB4}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand All @@ -64,6 +66,7 @@ Global
{8AFDD165-F5B1-4555-97E3-A376B30236D3} = {B284B3C8-2592-4B5D-B287-207285E4B7F9}
{DF02EDE5-1DBD-487B-BFA3-006407B6392D} = {8A402CB0-CB84-4F7D-9F97-EFEC23F36814}
{D584C7ED-A2F5-472F-9DEE-5F36B88B558E} = {B67CBB46-74C1-47EB-9E41-D55C5E0E0D85}
{92F87F85-3028-4E98-A1C7-6CCEC4392AB4} = {A60DDBBB-4BF4-4B3B-A13A-E0B409917433}
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{9F529525-E8E3-463D-A920-4D6E34150FC5}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
Expand Down Expand Up @@ -102,5 +105,9 @@ Global
{D584C7ED-A2F5-472F-9DEE-5F36B88B558E}.Debug|Any CPU.Build.0 = Debug|Any CPU
{D584C7ED-A2F5-472F-9DEE-5F36B88B558E}.Release|Any CPU.ActiveCfg = Release|Any CPU
{D584C7ED-A2F5-472F-9DEE-5F36B88B558E}.Release|Any CPU.Build.0 = Release|Any CPU
{92F87F85-3028-4E98-A1C7-6CCEC4392AB4}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{92F87F85-3028-4E98-A1C7-6CCEC4392AB4}.Debug|Any CPU.Build.0 = Debug|Any CPU
{92F87F85-3028-4E98-A1C7-6CCEC4392AB4}.Release|Any CPU.ActiveCfg = Release|Any CPU
{92F87F85-3028-4E98-A1C7-6CCEC4392AB4}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
EndGlobal
3 changes: 2 additions & 1 deletion src/Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@
<PackageIcon>nuget-icon.png</PackageIcon>

<WarningsAsErrors>True</WarningsAsErrors>

<PackageReadmeFile>README.md</PackageReadmeFile>
</PropertyGroup>

<ItemGroup>
<Content Include="README.md" Pack="true" PackagePath="README.md" CopyToOutputDirectory="PreserveNewest"/>
<Content Include="../../nuget-icon.png" CopyToOutputDirectory="PreserveNewest">
<Link>nuget-icon.png</Link>
<Pack>True</Pack>
Expand Down
84 changes: 84 additions & 0 deletions src/Elastic.Channels/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
# Elastic.Channels

Provides an specialized `System.Threading.Channels.ChannelWriter` implementation that makes it easy
to consume data pushed to that thread in batches.

The batches will emit either when a certain maximum is hit or when a batch's lifecycle exceeds a certain age.

This allows data of various rates to pushed in the same manner while different implementations to send the batched data to receivers can be implemented.

This package serves mainly as a core library with abstract classes
and does not ship any useful implementations.

It ships with a `NoopBufferedChannel` implementation that does nothing in its `Send` implementation for unit test and benchmark purposes.


## BufferedChannelBase<>

An abstract class that requires implementers to implement:

```csharp
protected abstract Task<TResponse> Send(IReadOnlyCollection<TEvent> buffer);
```

Any implementation allows data to pushed to it through:

```csharp
var e = new TEvent();
if (await channel.WaitToWriteAsync(e))
written++;
```

## ChannelOptionsBase<>

Implementers of `BufferedChannelBase<>` must also create their own implementation of `ChannelOptionsBase<>`. This to ensure each channel implementation creates an appropriately named options class.


## Quick minimal implementation

```chsarp
public class Event { }
public class Response { }
public class NoopChannelOptions
: ChannelOptionsBase<Event, Response> { }
public class NoopBufferedChannel
: BufferedChannelBase<NoopChannelOptions, Event, Response>
{
public NoopBufferedChannel(NoopChannelOptions options)
: base(options) { }
protected override Task<Response> Send(IReadOnlyCollection<NoopEvent> buffer)
{
return Task.FromResult(new Response());
}
}
```

Now once we instantiate an `NoopBufferedChannel` we can use it push data to it.

```csharp
var e = new Event();
if (await noopChannel.WaitToWriteAsync(e))
written++;
```


## BufferOptions

Each `ChannelOptionsBase<>` implementation takes and exposes a `BufferOptions` instance. This controls the buffering behavior of `BufferedChannelBase<>`.


| Option | Description |
|-----------------------------|------------------------------------------------------------------------------------------------------------------------------|
| `MaxInFlightMessages` | The maximum number of in flight instances that can be queued in memory. If this threshold is reached, events will be dropped |
| `MaxConsumerBufferSize` | The number of events a local buffer should reach before sending the events in a single call to Elasticsearch. |
| `MaxRetries` | The maximum number of retries over `Send` |
| `MaxConsumerBufferLifetime` | The maximum age of buffer before its flushed |
| `ConcurrentConsumers` | Controls how many concurrent `Send` operations may occur |
| `BackOfPeriod` | Func that calculates an appropriate backoff time for a retry |
| `BufferFlushCallback` | Called `once` whenever a buffer is flushed, excluding retries |
| `WaitHandle` | Inject a waithandle that will be signalled after each flush, excluding retries. |
11 changes: 11 additions & 0 deletions src/Elastic.Ingest.Apm/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# Elastic.Ingest.APM

A `Elastic.Channel` implementation of `BufferedChannelBase` that allows APM data to be written to `apm-server` over the V2 intake API.


Utilizes `Elastic.Transport` through `Elastic.Ingest.Transport`.


This project is currently still under development and not pushed to Nuget.

We are still working on finishing this implementation as a possible replacement for the PayloadSender that's currently part of `Elastic.Apm`
97 changes: 97 additions & 0 deletions src/Elastic.Ingest.Elasticsearch/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
# Elastic.Ingest.Elasticsearch

`Elastic.Channels` implementations of `BufferedChannelBase` that allows data to pushed to either indices or data streams


## `DataStreamChannel<TEvent>`

A channel that specializes to writing data with a timestamp to Elasticsearch data streams. E.g given the following document.

```csharp
public class TimeSeriesDocument
{
[JsonPropertyName("@timestamp")]
public DateTimeOffset Timestamp { get; set; }

[JsonPropertyName("message")]
public string Message { get; set; }
}

```

A channel can be created to push data to the `logs-dotnet-default` data stream.

```csharp
var dataStream = new DataStreamName("logs", "dotnet");
var bufferOptions = new BufferOptions { }
var options = new DataStreamChannelOptions<TimeSeriesDocument>(transport)
{
DataStream = dataStream,
BufferOptions = bufferOptions
};
var channel = new DataStreamChannel<TimeSeriesDocument>(options);
```

NOTE: read more about Elastic's data stream naming convention here:
https://www.elastic.co/blog/an-introduction-to-the-elastic-data-stream-naming-scheme

we can now push data to Elasticsearch using the `DataStreamChannel`
```csharp
var doc = new TimeSeriesDocument
{
Timestamp = DateTimeOffset.Now,
Message = "Hello World!",
}
channel.TryWrite(doc);
```

# `IndexChannel<TEvent>`

A channel that specializes in writing catalog data to Elastic indices.
Catalog data is typically data that has an id of sorts.

Given the following minimal document

```csharp
public class CatalogDocument
{
[JsonPropertyName("id")]
public string Id { get; set; }

[JsonPropertyName("title")]
public string Title { get; set; }

[JsonPropertyName("created")]
public DateTimeOffset Created { get; set; }
}
```

We can create an `IndexChannel<>` to push `CatalogDocument` instances.

```csharp
var options = new IndexChannelOptions<CatalogDocument>(transport)
{
IndexFormat = "catalog-data-{0:yyyy.MM.dd}",
BulkOperationIdLookup = c => c.Id,
TimestampLookup = c => c.Created,
};
var channel = new IndexChannel<CatalogDocument>(options);
```

now we can push data using:

```csharp
var doc = new CatalogDocument
{
Created = date,
Title = "Hello World!",
Id = "hello-world"
}
channel.TryWrite(doc);
```

This will push data to `catalog-data-2023.01.1` because `TimestampLookup` yields `Created` to `IndexFormat`.

`IndexFormat` can also simply be a fixed string to write to an Elasticsearch alias/index.

`BulkOperationIdLookup` determines if the document should be pushed to Elasticsearch using a `create` or `index` operation.
9 changes: 3 additions & 6 deletions src/Elastic.Ingest.OpenTelemetry/CustomOtlpTraceExporter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using OpenTelemetry;
using OpenTelemetry.Exporter;
using OpenTelemetry.Resources;
using Elastic.Channels;

namespace Elastic.Ingest.OpenTelemetry
{
Expand All @@ -33,11 +34,7 @@ public CustomOtlpTraceExporter(OtlpExporterOptions options, TraceChannelOptions
}


public class TraceBufferOptions : BufferOptions<Activity>
{
}

public class TraceChannelOptions : ChannelOptionsBase<Activity, TraceBufferOptions, TraceExportResult>
public class TraceChannelOptions : ChannelOptionsBase<Activity, TraceExportResult>
{
public string? ServiceName { get; set; }
public Uri? Endpoint { get; set; }
Expand All @@ -49,7 +46,7 @@ public class TraceExportResult
public ExportResult Result { get; internal set; }
}

public class TraceChannel : ChannelBase<TraceChannelOptions, TraceBufferOptions, Activity, TraceExportResult>
public class TraceChannel : BufferedChannelBase<TraceChannelOptions, Activity, TraceExportResult>
{
public TraceChannel(TraceChannelOptions options) : base(options) {
var o = new OtlpExporterOptions();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,15 @@

<PropertyGroup>
<TargetFramework>netstandard2.1</TargetFramework>
<Title>Elasticsearch Buffer backed data shipper</Title>
<Description>TODO</Description>
<PackageTags>TODO</PackageTags>
<Description>Offers an easy to use ChannelWriter implementation to push activities over OTLP to OpenTelemetry endpoints</Description>
<PackageTags>elastic, channels, apm, ingest, opentelemetry</PackageTags>
<LangVersion>latest</LangVersion>
<Nullable>enable</Nullable>
<IsPackable>false</IsPackable>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\Elastic.Ingest\Elastic.Ingest.csproj" />
<ProjectReference Include="..\Elastic.Channels\Elastic.Channels.csproj" />
<PackageReference Include="Google.Protobuf" Version="3.17.3" />
<PackageReference Include="Grpc.Net.Client" Version="2.38.0" />
<PackageReference Include="Grpc.Tools" Version="2.39.0-pre1">
Expand Down
7 changes: 7 additions & 0 deletions src/Elastic.Ingest.OpenTelemetry/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Elastic.Ingest.OpenTelemetry

A `Elastic.Channel` implementation of `BufferedChannelBase` that allows OpenTelemetry data to be written over OTLP.

This is not currently published to NuGet with no current plans to ever do so.

This project currently only exists as a proof of concept to validate the concepts of `Elastic.Channels`
7 changes: 7 additions & 0 deletions src/Elastic.Ingest.Transport/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Elastic.Ingest.Transport

An abstract `Elastic.Channels` implementation of `BufferedChannelBase` that allows implementes to quickly utilize `Elastic.Transport` to send data over HTTP(S) to one or many receiving endpoints.

This is a core library that does not ship any useful implementation.

See e.g `Elastic.Ingest.Elasticsearch` for a concrete implementation to push data to Elasticsearch

0 comments on commit 379a7ea

Please sign in to comment.