Skip to content

Commit

Permalink
Make the example plugin useful (#3)
Browse files Browse the repository at this point in the history
This changes the example plugin from doing nothing to trimming strings,
which is a much more useful example.
  • Loading branch information
dominiklohmann authored May 25, 2024
1 parent 33ca197 commit 926c719
Show file tree
Hide file tree
Showing 11 changed files with 144 additions and 35 deletions.
6 changes: 3 additions & 3 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
cmake_minimum_required(VERSION 3.19...3.28 FATAL_ERROR)

project(example
DESCRIPTION "Example plugin for Tenzir"
project(trim
DESCRIPTION "Trim operator plugin for Tenzir"
LANGUAGES CXX)

find_package(Tenzir REQUIRED PATHS "/opt/tenzir")

TenzirRegisterPlugin(
TARGET example
TARGET trim
ENTRYPOINT "src/plugin.cpp"
SOURCES GLOB "src/*.cpp"
INCLUDE_DIRECTORIES "include")
18 changes: 8 additions & 10 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,20 +1,18 @@
ARG TENZIR_VERSION=main
FROM ghcr.io/tenzir/tenzir-dev:${TENZIR_VERSION} AS example-builder
FROM ghcr.io/tenzir/tenzir-dev:${TENZIR_VERSION} AS builder

COPY . /plugins/example/
COPY . /tmp/trim/

RUN cmake -S /plugins/example -B build-example -G Ninja -D CMAKE_INSTALL_PREFIX:STRING="$PREFIX"
RUN cmake --build build-example --parallel
RUN cmake --install build-example --strip --component Runtime --prefix /plugin/example
RUN cmake -S /tmp/trim -B /tmp/trim/build -G Ninja -D CMAKE_INSTALL_PREFIX:STRING="$PREFIX"
RUN cmake --build /tmp/trim/build --parallel
RUN cmake --install /tmp/trim/build --strip --component Runtime --prefix /opt/tenzir/plugin/trim

FROM example-builder AS example-test
FROM builder AS test

ENV BATS_LIB_PATH=/tmp/tenzir/tenzir/integration/lib
# TODO: Use the update-integration target instead
ENV UPDATE=1

ENTRYPOINT cmake --build build-example --target integration
ENTRYPOINT cmake --build /tmp/trim/build --target update-integration

FROM ghcr.io/tenzir/tenzir:${TENZIR_VERSION}

COPY --from=example-builder --chown=tenzir:tenzir /plugin/example /opt/tenzir
COPY --from=builder --chown=tenzir:tenzir /opt/tenzir/plugin/trim /opt/tenzir/plugin/trim
8 changes: 6 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Tenzir Example Plugin

This is an example plugin for Tenzir.
This is an example plugin for Tenzir, adding a `trim` operaotr that removes
whitespace from string fields.

## Build and run

Expand Down Expand Up @@ -36,7 +37,10 @@ reference files automatically.

If you want to upstream your plugin so that it is bundled with every Tenzir
installation, open a PR that adds it to the [`plugins/` directory in the
`tenzir/tenzir` repository][plugins-source].
`tenzir/tenzir` repository][plugins-source]. If your plugin has no
dependencies, consider contributing it as a builtin instead. Builtins are
located in the [`libtenzir/builtins/` directory in the `tenzir/tenzir`
repositorsbuiltins-source].

[plugins-source]: https://github.com/tenzir/tenzir/tree/main/plugins
[builtins-source]: https://github.com/tenzir/tenzir/tree/main/libtenzir/builtins
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ services:
tests:
build:
context: .
target: example-test
target: test
args:
- TENZIR_VERSION=main
profiles:
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"foo": "foo"}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"foo": "foo", "bar": " bar"}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"foo": "foo", "bar": "bar"}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
error: NotImplemented: Function 'utf8_trim' has no kernel matching input types (uint64)
--> <input>:1:16
|
1 | version | trim :uint64
| ^^^^^^^
|
12 changes: 9 additions & 3 deletions integration/tests/tests.bats
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,20 @@ setup() {
bats_load_library bats-tenzir

export_default_node_config
export TENZIR_PLUGINS="example"
export TENZIR_PLUGINS="trim"
setup_node
}

teardown() {
teardown_node
}

@test "Check plugin availability" {
check tenzir 'show plugins | where name == "example" | drop version, kind'
@test "trim strings" {
check tenzir 'version | put foo=" foo " | trim foo'
check tenzir 'version | put foo=" foo ", bar=" bar" | trim foo'
check tenzir 'version | put foo=" foo ", bar=" bar" | trim :string'
}

@test "trimming non-string fields fails" {
check ! tenzir 'version | trim :uint64'
}
123 changes: 108 additions & 15 deletions src/plugin.cpp
Original file line number Diff line number Diff line change
@@ -1,35 +1,123 @@
#include <tenzir/argument_parser.hpp>
#include <tenzir/arrow_table_slice.hpp>
#include <tenzir/collect.hpp>
#include <tenzir/plugin.hpp>

namespace tenzir::plugins::example {
#include <arrow/api.h>
#include <arrow/compute/api.h>

#include <unordered_set>

namespace tenzir::plugins::trim {

namespace {

class example_operator final : public crtp_operator<example_operator> {
class trim_operator final : public crtp_operator<trim_operator> {
public:
auto operator()(table_slice events) const -> table_slice {
// Does nothing with the input.
return events;
trim_operator() = default;

explicit trim_operator(located<std::string> field)
: field_{std::move(field)} {
}

// This function contains the logic of the trim operator when
// instantiated. It maps from a generator :53
// of table slices to a generator of
// table slices. A table slice is simply a batch of events.
auto operator()(generator<table_slice> input,
operator_control_plane& ctrl) const
-> generator<table_slice> {
// All code up to the first yield is run synchronously in an operator and
// is considered the start up phase. This operator doesn't do anything
// special in this case, so we can signal a successful startup immediately.
co_yield {};
// The main loop of the operator exits once the previous operator has
// finished. Utilize this for control flow. For example, we keep a list
// of schemas outside of the loop that we already warned for.
auto warned_for_schemas = std::unordered_set<std::string>{};
for (auto events : input) {
// There's one important contract that an operator must always adhere to:
// if an input batch is empty, the operator must yield. In all other
// situations, the operator may continue without yielding.
if (events.rows() == 0) {
co_yield {};
continue;
}
// We can now start processing the events in the batch. First, we resolve
// the field for the batch's schema to a set of indices pointing to the
// field(s) within the schema. This transparently supports resolving
// concepts and field extractors.
const auto indices = collect(events.schema().resolve(field_.inner));
// If the field didn't resolve, we can't do anything with the batch. We
// warn the user about it and return the input unchanged. We warn only
// once per schema, and dwe only warn when the specified field was not a
// type extractor.
if (indices.empty()) {
const auto [_, inserted] = warned_for_schemas.insert(
std::string{events.schema().name()});
if (inserted and not field_.inner.starts_with(':')) {
diagnostic::warning("field did not resolve for schema `{}`",
events.schema())
.primary(field_.source)
.emit(ctrl.diagnostics());
}
co_yield std::move(events);
continue;
}
// Now we can transform the events in the batch. We're utilizing Apache
// Arrow's compute function 'utf8_trim' for this, confuring it to trim
// whitespace.
const auto trim = [&](struct record_type::field field,
std::shared_ptr<arrow::Array> array)
-> indexed_transformation::result_type {
static const auto options = arrow::compute::TrimOptions{" \t\n\v\f\r"};
auto trimmed_array = arrow::compute::CallFunction(
"utf8_trim", {array}, &options);
if (not trimmed_array.ok()) {
diagnostic::error("{}", trimmed_array.status().ToString())
.primary(field_.source)
.throw_();
}
return {
{field, trimmed_array.MoveValueUnsafe().make_array()},
};
};
// Lastly, we apply the transformation to all indices and then return the
// transformed batch.
auto transformations = std::vector<indexed_transformation>{};
for (auto index : indices) {
transformations.push_back({index, trim});
}
co_yield transform_columns(std::move(events), std::move(transformations));
}
}

// Return the user-facing name of the operator. Must be a valid identifier.
auto name() const -> std::string override {
return "example";
return "trim";
}

auto optimize(expression const& filter, event_order order) const
// Specify how optimizations affect the operator.
auto optimize(const expression& filter, event_order order) const
-> optimize_result override {
(void)order;
(void)filter;
return do_not_optimize(*this);
}

friend auto inspect(auto& f, example_operator& x) -> bool {
return f.object(x).fields();
// List all fields so that the operator can successfully be transmitted
// between nodes.
friend auto inspect(auto& f, trim_operator& x) -> bool {
return f.object(x).fields(f.field("field", x.field_));
}

private:
located<std::string> field_ = {};
};

class plugin final : public virtual operator_plugin<example_operator> {
class plugin final : public virtual operator_plugin<trim_operator> {
public:
// Provide the signature of the operator for `show operators`.
auto signature() const -> operator_signature override {
return {
.source = false,
Expand All @@ -38,16 +126,21 @@ class plugin final : public virtual operator_plugin<example_operator> {
};
}

// Parse the operator from the parser interface.
auto parse_operator(parser_interface& p) const -> operator_ptr override {
auto parser = argument_parser{"example",
"https://docs.tenzir.com/operators/example"};
auto parser = argument_parser{
"trim",
"https://github.com/tenzir/example-plugin",
};
auto field = located<std::string>{};
parser.add(field, "<field>");
parser.parse(p);
return std::make_unique<example_operator>();
return std::make_unique<trim_operator>(field);
}
};

} // namespace

} // namespace tenzir::plugins::example
} // namespace tenzir::plugins::trim

TENZIR_REGISTER_PLUGIN(tenzir::plugins::example::plugin)
TENZIR_REGISTER_PLUGIN(tenzir::plugins::trim::plugin)

0 comments on commit 926c719

Please sign in to comment.