Skip to content

getlantern/zenodb

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

ZenoDB Travis CI Status Coverage Status GoDoc Sourcegraph

ZenoDB is a Go-based embeddable time series database optimized for performing aggregated analytical SQL queries on dimensional data. It was developed to replace influxdb as a repository for client and server metrics at Lantern.

Dependencies

This project uses Go modules to manage dependencies. If running a Go version prior to 1.13, you can enable Go modules using the environment variable GO111MODULE=on, like:

GO111MODULE=on go install github.com/getlantern/zenodb/cmd/zeno

Current Features

  • No limits on the number of dimensions
  • SQL-based query language including GROUP BY and HAVING support
  • Auto-correlation
  • Reasonably efficient storage model
  • (Mostly) parallel query processing
  • Crosstab queries
  • FROM subqueries
  • Write-ahead Log
  • Seems pretty fast
  • Materialized views (with historical data from write-ahead log)
  • Some unit tests
  • Limit query memory consumption to avoid OOM killer
  • Multi-leader, multi-follower architecture

Future Stuff

  • Auto cleanup of Write-ahead Log
  • Harmonize field vs column language
  • More unit tests and general code cleanup
  • Byte array buffers to avoid allocations for sequences and ByteMaps
  • Smart sorting - e.g. only sort data files if a substantial number of new keys have been added
  • More validations/error checking
  • TLS in HTTP
  • Stored statistics (database-level, table-level, size, throughput, dimensions, etc.)
  • Optimized queries using expression references (avoid recomputing same expression when referenced multiple times in same row)
  • Completely parallel query processing
  • Interruptible queries using Context
  • User-level authentication/authorization
  • Multi-dimensional crosstab queries
  • Read-only query server replication using rsync?

Standalone Quick Start

In this tutorial, you will:

  • Run Zeno as a standalone database
  • Insert data into zeno using the RESTful HTTP API
  • Query zeno using the zeno-cli

You will learn how to use zeno to:

  • Aggregate multi-dimensional data
  • Correlate different kinds of data by inserting into a single table
  • Correlate data using the IF function
  • Derive data by performing calculations on existing data
  • Sort data
  • Filter data based on the results of your aggregations

Install Go if you haven't already.

GO111MODULE=on go install github.com/getlantern/zenodb/cmd/zeno
GO111MODULE=on go install github.com/getlantern/zenodb/cmd/zeno-cli

Make a working directory (e.g. '~/zeno-quickstart'). In here, create a schema.yaml like the below to configure your database:

combined:
  retentionperiod: 1h
  sql: >
    SELECT
      requests,
      AVG(load_avg) AS load_avg
    FROM inbound
    GROUP BY *, period(5m)

This schema creates a table called combined which is filled by selecting data from the stream inbound. combined keeps track of the requests and load_avg values and includes all dimensions from the inbound stream. It groups data into 5 minute buckets. The requests column is grouped using the SUM aggregation operator, which is the default if no operator is specified. load_avg on the other hand is aggregated as an average.

Core Concept - Zeno does not store individual points, everything is stored in aggregated form.

Open three terminals

Terminal 1

> # Start the database
> cd ~/zeno-quickstart
> zeno
DEBUG zenodb: schema.go:77 Creating table 'combined' as
SELECT
  requests,
  AVG(load_avg) AS load_avg
FROM inbound GROUP BY *, period(5m)
DEBUG zenodb: schema.go:78 MaxMemStoreBytes: 1 B    MaxFlushLatency: 0s    MinFlushLatency: 0s
DEBUG zenodb: table.go:118 MinFlushLatency disabled
DEBUG zenodb: table.go:122 MaxFlushLatency disabled
DEBUG zenodb: schema.go:83 Created table combined
DEBUG zenodb: zenodb.go:63 Enabling geolocation functions
DEBUG zenodb.combined: row_store.go:111 Will flush after 2562047h47m16.854775807s
DEBUG zenodb: zenodb.go:75 Dir: /Users/ox.to.a.cart//zeno-quickstart    SchemaFile: /Users/ox.to.a.cart//zeno-quickstart/schema.yaml
Opened database at /Users/ox.to.a.cart//zeno-quickstart
Listening for gRPC connections at 127.0.0.1:17712
Listening for HTTP connections at 127.0.0.1:17713

Terminal 2

> # Submit some data via the REST API. Omit the ts parameter to use current time.
> curl -i -H "Content-Type: application/json" -X POST -d '{"dims": {"server": "56.234.163.23", "path": "/index.html", "status": 200}, "vals": {"requests": 56}}
{"dims": {"server": "56.234.163.23", "path": "/login", "status": 200}, "vals": {"requests": 34}}
{"dims": {"server": "56.234.163.23", "path": "/login", "status": 500}, "vals": {"requests": 12}}
{"dims": {"server": "56.234.163.23"}, "vals": {"load_avg": 1.7}}
{"dims": {"server": "56.234.163.24", "path": "/index.html", "status": 200}, "vals": {"requests": 523}}
{"dims": {"server": "56.234.163.24", "path": "/login", "status": 200}, "vals": {"requests": 411}}
{"dims": {"server": "56.234.163.24", "path": "/login", "status": 500}, "vals": {"requests": 28}}
{"dims": {"server": "56.234.163.24"}, "vals": {"load_avg": 0.3}}' -k https://localhost:17713/insert/inbound
HTTP/1.1 201 Created
Date: Mon, 29 Aug 2016 03:00:38 GMT
Content-Length: 0
Content-Type: text/plain; charset=utf-8

Notice that:

  • You're inserting into a the stream inbound not the table combined
  • You can batch insert multiple points in a single HTTP request
  • You can insert heterogenous data like HTTP response statuses and load averages into a single stream, thereby automatically correlating the data on any shared dimensions (bye bye JOINs!).

Terminal 3

SQL

SELECT
  _points,
  requests,
  load_avg
FROM combined
GROUP BY *
ORDER BY requests DESC

zeno-cli

> # Query the data
> zeno-cli -insecure -fresh
Will save history to /Users/ox.to.a.cart/Library/Application Support/zeno-cli/history
zeno-cli > SELECT _points, requests, load_avg FROM combined GROUP BY * ORDER BY requests DESC;
# time                             path           server           status        _points    requests    load_avg
Mon, 29 Aug 2016 03:00:00 UTC      /index.html    56.234.163.24    200            1.0000    523.0000      0.0000
Mon, 29 Aug 2016 03:00:00 UTC      /login         56.234.163.24    200            1.0000    411.0000      0.0000
Mon, 29 Aug 2016 03:00:00 UTC      /index.html    56.234.163.23    200            1.0000     56.0000      0.0000
Mon, 29 Aug 2016 03:00:00 UTC      /login         56.234.163.23    200            1.0000     34.0000      0.0000
Mon, 29 Aug 2016 03:00:00 UTC      /login         56.234.163.24    500            1.0000     28.0000      0.0000
Mon, 29 Aug 2016 03:00:00 UTC      /login         56.234.163.23    500            1.0000     12.0000      0.0000
Mon, 29 Aug 2016 03:00:00 UTC      <nil>          56.234.163.23    <nil>          1.0000      0.0000      1.7000
Mon, 29 Aug 2016 03:00:00 UTC      <nil>          56.234.163.24    <nil>          1.0000      0.0000      0.3000

Notice that:

  • Dimensions are included in the result based on the GROUP BY, you don't include them in the SELECT expression.
  • There's a built-in field _points that gives a count of the number of points that were inserted.

Now run the same insert again.

Then run the same query again.

Pro tip - zeno-cli has a history, so try the up-arrow or Ctrl+R.

zeno-cli

zeno-cli > SELECT _points, requests, load_avg FROM combined GROUP BY * ORDER BY requests DESC;
# time                             path           server           status        _points     requests    load_avg
Mon, 29 Aug 2016 03:00:00 UTC      /index.html    56.234.163.24    200            2.0000    1046.0000      0.0000
Mon, 29 Aug 2016 03:00:00 UTC      /login         56.234.163.24    200            2.0000     822.0000      0.0000
Mon, 29 Aug 2016 03:00:00 UTC      /index.html    56.234.163.23    200            2.0000     112.0000      0.0000
Mon, 29 Aug 2016 03:00:00 UTC      /login         56.234.163.23    200            2.0000      68.0000      0.0000
Mon, 29 Aug 2016 03:00:00 UTC      /login         56.234.163.24    500            2.0000      56.0000      0.0000
Mon, 29 Aug 2016 03:00:00 UTC      /login         56.234.163.23    500            2.0000      24.0000      0.0000
Mon, 29 Aug 2016 03:00:00 UTC      <nil>          56.234.163.23    <nil>          2.0000       0.0000      1.7000
Mon, 29 Aug 2016 03:00:00 UTC      <nil>          56.234.163.24    <nil>          2.0000       0.0000      0.3000

As long as you're submitted the 2nd batch of data soon after the first, you should see that the number of rows hasn't changed, Zeno just aggregated the data on the existing timestamps. The requests figures all doubled, since these are aggregated as a SUM. The load_avg figures remained unchanged since they're being aggregated as an AVG.

Note - if enough time has elapsed that we have a new timestamp, you will see additional rows like this:

# time                             path           server           status        _points    requests    load_avg
Mon, 29 Aug 2016 03:00:00 UTC      /index.html    56.234.163.24    200            1.0000    523.0000      0.0000
Mon, 29 Aug 2016 03:05:00 UTC      /index.html    56.234.163.24    200            1.0000    523.0000      0.0000
Mon, 29 Aug 2016 03:00:00 UTC      /login         56.234.163.24    200            1.0000    411.0000      0.0000
Mon, 29 Aug 2016 03:05:00 UTC      /login         56.234.163.24    200            1.0000    411.0000      0.0000
Mon, 29 Aug 2016 03:00:00 UTC      /index.html    56.234.163.23    200            1.0000     56.0000      0.0000
Mon, 29 Aug 2016 03:05:00 UTC      /index.html    56.234.163.23    200            1.0000     56.0000      0.0000
Mon, 29 Aug 2016 03:00:00 UTC      /login         56.234.163.23    200            1.0000     34.0000      0.0000
Mon, 29 Aug 2016 03:05:00 UTC      /login         56.234.163.23    200            1.0000     34.0000      0.0000
Mon, 29 Aug 2016 03:00:00 UTC      /login         56.234.163.24    500            1.0000     28.0000      0.0000
Mon, 29 Aug 2016 03:05:00 UTC      /login         56.234.163.24    500            1.0000     28.0000      0.0000
Mon, 29 Aug 2016 03:00:00 UTC      /login         56.234.163.23    500            1.0000     12.0000      0.0000
Mon, 29 Aug 2016 03:05:00 UTC      /login         56.234.163.23    500            1.0000     12.0000      0.0000
Mon, 29 Aug 2016 03:00:00 UTC      <nil>          56.234.163.23    <nil>          1.0000      0.0000      1.7000
Mon, 29 Aug 2016 03:05:00 UTC      <nil>          56.234.163.23    <nil>          1.0000      0.0000      1.7000
Mon, 29 Aug 2016 03:00:00 UTC      <nil>          56.234.163.24    <nil>          1.0000      0.0000      0.3000
Mon, 29 Aug 2016 03:05:00 UTC      <nil>          56.234.163.24    <nil>          1.0000      0.0000      0.3000

Core Concept - Zeno knows how to aggregate fields based on the schema, so you don't need to include aggregation operators in your query. What happens if we try to query for SUM(load_avg)?

zeno-cli

zeno-cli > SELECT _points, requests, SUM(load_avg) AS load_avg FROM combined GROUP BY * ORDER BY requests DESC;
rpc error: code = 2 desc = No column found for load_avg (SUM(load_avg))

The underlying column is an AVG(load_avg), so taking a SUM is not possible!

Sometimes, it's useful to show a dimension in columns rather than rows. You can do this using the CROSSTAB function.

SELECT
  requests,
  load_avg
FROM combined
GROUP BY server, CROSSTAB(path)
ORDER BY requests;
zeno-cli > SELECT requests, load_avg FROM combined GROUP BY server, CROSSTAB(path) ORDER BY requests;
# time                             server                  requests    requests     requests    load_avg    load_avg
#                                                       /index.html      /login      *total*       <nil>     *total*
Mon, 29 Aug 2016 03:05:00 UTC      56.234.163.23           112.0000     92.0000     204.0000      1.7000      1.7000
Mon, 29 Aug 2016 03:05:00 UTC      56.234.163.24          1046.0000    878.0000    1924.0000      0.3000      0.3000

Notice how there's a second header row now that shows the different values of path. Notice also how paths that don't have any data are not shown, and notice that a total column is automatically included for each field.

Now let's do some correlation using the IF function. IF takes two parameters, a conditional expression that determines whether or not to include a value based on its associated dimensions, and the value expression that selects which column to include.

Let's say that we want to get the error rate, defined as the number of non-200 statuses versus total requests:

sql

SELECT
  IF(status <> 200, requests) AS errors,
  requests,
  errors / requests AS error_rate,
  load_avg
FROM combined
GROUP BY *
ORDER BY error_rate DESC

zeno-cli

zeno-cli > SELECT IF(status <> 200, requests) AS errors, requests, errors / requests AS error_rate, load_avg FROM combined GROUP BY * ORDER BY error_rate DESC;
# time                             path           server           status         errors     requests    error_rate    load_avg
Mon, 29 Aug 2016 03:00:00 UTC      /login         56.234.163.24    500           56.0000      56.0000        1.0000      0.0000
Mon, 29 Aug 2016 03:00:00 UTC      /login         56.234.163.23    500           24.0000      24.0000        1.0000      0.0000
Mon, 29 Aug 2016 03:00:00 UTC      /login         56.234.163.23    200            0.0000      68.0000        0.0000      0.0000
Mon, 29 Aug 2016 03:00:00 UTC      <nil>          56.234.163.23    <nil>          0.0000       0.0000        0.0000      1.7000
Mon, 29 Aug 2016 03:00:00 UTC      <nil>          56.234.163.24    <nil>          0.0000       0.0000        0.0000      0.3000
Mon, 29 Aug 2016 03:00:00 UTC      /index.html    56.234.163.23    200            0.0000     112.0000        0.0000      0.0000
Mon, 29 Aug 2016 03:00:00 UTC      /login         56.234.163.24    200            0.0000     822.0000        0.0000      0.0000
Mon, 29 Aug 2016 03:00:00 UTC      /index.html    56.234.163.24    200            0.0000    1046.0000        0.0000      0.0000

Okay, this distinguishes between errors and other requests, but errors and other requests aren't being correlated yet so the error_rate isn't useful. Notice also that load_avg is separate from the requests measurements. That's because we're still implicitly grouping on status and path, so error rows are separate from success rows and load_avg rows (which are only associated with servers, not paths) are separate from everything else.

So instead, let's group only by server:

sql

SELECT
  IF(status <> 200, requests) AS errors,
  requests,
  errors / requests AS error_rate,
  load_avg
FROM combined
GROUP BY server
ORDER BY error_rate DESC

zeno-cli

zeno-cli > SELECT IF(status <> 200, requests) AS errors, requests, errors / requests AS error_rate, load_avg FROM combined GROUP BY server ORDER BY error_rate DESC;
# time                             server                errors     requests    error_rate    load_avg
Mon, 29 Aug 2016 03:00:00 UTC      56.234.163.23        24.0000     204.0000        0.1176      1.7000
Mon, 29 Aug 2016 03:00:00 UTC      56.234.163.24        56.0000    1924.0000        0.0291      0.3000

That looks better! We're getting a meaningful error rate, and we can even see that there's a correlation between the error_rate and the load_avg.

Challenge - What calculation would yield a meaningful understanding of the relationship between error_rate and load_avg?

Now, if we had a ton of servers, we would really only be interested in the ones with the top error rates. We could handle that either with a LIMIT clause:

sql

SELECT
  IF(status <> 200, requests) AS errors,
  requests,
  errors / requests AS error_rate,
  load_avg
FROM combined
GROUP BY server
ORDER BY error_rate DESC
LIMIT 1

zeno-cli

zeno-cli > SELECT IF(status <> 200, requests) AS errors, requests, errors / requests AS error_rate, load_avg FROM combined GROUP BY server ORDER BY error_rate DESC LIMIT 1;
# time                             server                errors    requests    error_rate    load_avg
Mon, 29 Aug 2016 03:00:00 UTC      56.234.163.23        24.0000    204.0000        0.1176      1.7000

Or you can we can use the HAVING clause to filter based on the actual data:

sql

SELECT
  IF(status <> 200, requests) AS errors,
  requests,
  errors / requests AS error_rate,
  load_avg
FROM combined
GROUP BY server
HAVING error_rate > 0.1
ORDER BY error_rate DESC

zeno-cli

zeno-cli > SELECT IF(status <> 200, requests) AS errors, requests, errors / requests AS error_rate, load_avg FROM combined GROUP BY server HAVING error_rate > 0.1 ORDER BY error_rate DESC;
# time                             server                errors    requests    error_rate    load_avg
Mon, 29 Aug 2016 03:05:00 UTC      56.234.163.23        24.0000    204.0000        0.1176      1.7000

There! You've just aggregated, correlated and gained valuable insights into your server infrastructure. At Lantern we do this sort of stuff with data from thousands of servers and millions of clients!

Schema

ZenoDB relies on a schema file (by default schema.yaml).

Example: How to add a view

A view is really a language construct for creating a table whose properties are derived from an existing table. The data stream between the view and the table it inherits from is the same, however, it's stored separately. Consequently, a view can have different (and finer) granularity than its parent table.

This is an example of a view defined in the YAML Schema:

emojis_fetched:
  view:             true
  retentionperiod:  168h
  backfill:         6h
  minflushlatency:  1m
  maxflushlatency:  1h
  partitionby:      [client_ip]
  sql: >
    SELECT success_count, error_count, error_rate, emojis_fetched
      FROM core
      WHERE client_ip LIKE ‘192.-%’
      GROUP BY client_ip, period(1h)

We start with the name of the view:

emojis_fetched

This means that the data for this table comes from another table or view, rather than an input stream:

view: true

This means that we keep 1 week worth of history

retentionperiod: 168h

This means that we won’t flush the memstore more frequently than every 1 minute:

minflushlatency: 1m

And flush at least every hour:

maxflushlatency: 1h

Flusing means storing the data held in memory until now, and saving it to the permanent on-disk storage for later retrieval.

The physical storage happens on the follower nodes, so Zenodb needs to know how to distribute that data across nodes:

partitionby: [client_ip]

The next part is the definition of the contents of the table/view:

  sql: >
    SELECT success_count, error_count, error_rate, emojis_fetched
      FROM core
      WHERE client_ip LIKE ‘192.-%’
      GROUP BY client_ip, period(1h)

The SELECT row are the fields. In this case, success_count, error_count, error_rate and emojis_fetched. The rest of the statement are the grouping and filter operations, which are optional and can be adapted to the needs. This selects only measurements related to a group of IPs.

WHERE client_ip LIKE ‘192.-%’

For instance, if we wanted to change it to emojis:

WHERE emojis_fetched LIKE ‘smile-%’

This selects which dimensions to keep, and what resolution to use. This is usually the hardest part of creating a view, because you need to anticipate what questions will be asked:

GROUP BY client_ip, period(1h)

Functions

TODO - fill out function reference

Subqueries

TODO - explain how subqueries work

Embedding

Check out the zenodbdemo for an example of how to embed zenodb.

Implementation Notes

Sequences

These are the central units of data storage in Zenodb: https://github.com/getlantern/zenodb/blob/master/encoding/seq.go

In short, we group by dimension, and then we summarize/roll-up fields. The summarization is done using an aggregation function like SUM, AVG, MIN, MAX, etc. That is key aspect to how Zenodb manages to discard data and achieve compression. For example, for a new view, let’s say we’re not grouping by anything (that would be GROUP BY _, PERIOD(1h)), and let’s look just at the success_count field, in storage, we would have a single row with no dimensions and an array of aggregated success_counts by time period:

success_count: [4000, 5000]

That would be the result of a series of points that add up to 4000 in the first period, and to 5000 in the second period (of 1h)

Now let’s say that we did GROUP BY geo_country, PERIOD(1h)

And let’s say that success_counts are evenly split between US and AU, then we would have two rows:

geo_country: US   success_count: [2000, 2500]
geo_country: AU   success_count: [2000, 2500]

One important aspect is that each field is actually not just an array, the array is actually preceded by the timestamp high water mark, so it would actually be something like this:

geo_country: US success_count: 20180118T05:00Z[1000, 1500]

Views

Since everything in zenodb comes in through input streams, views cannot actually be constructed from the underlying tables, so they need to be stored independently. This allows for views to have different granularities that the tables/views they are referring to. In other words, at runtime, views are actually just like tables that pull from that same input stream, the only difference is that when you define a view, it can take into account knowledge from the definition of the underlying table.

Clustering

Performance timestamps

  • Partition on high cardinality fields/combinations that you frequently query
  • Don't partition on low-cardinality fields as these will tend to hotspot one one or another partition and slow down synchronization from the leader.
  • Don't partition on too many different fields/combinations is this will increase amount of data that each follower has to synchronize.

Acknowledgements

Packages

No packages published

Languages