Skip to content

Conversation

@mauguignard
Copy link
Contributor

My team and I are considering moving into Druid. As part of our tests, we were benchmarking some big queries and we found out that the fetch of the rows was taking longer than expected. After some profiling, we traced back the bottleneck to the loop in rows_from_chunks.

The new version introduced in this PR reduced the wall time of this function from 28.70s to only 4.93s (providing an effective speedup of ~5.8x) when running the following benchmark on my laptop (Ubuntu 20.04 - CPython 3.7.10 - Intel i5-8250U):

import argparse
import datetime
import json
import logging
import timeit
from collections import OrderedDict
from random import Random

from pydruid.db.api import rows_from_chunks


STRING_CHOICES = [
    "alice",
    "bob",
    r"ali\"ce",
    "ali{ce",
    r"b\ob",
    "{bob}",
    "{1: 2}",
    r"{\"id\": 1}",
]


def generate_rows():
    seed = 123
    rnd = Random(seed)

    rows = []
    base_timestamp = datetime.datetime(2021, 1, 1).timestamp()

    for i in range(500_000):
        row = OrderedDict(
            [
                (
                    "__time",
                    datetime.datetime.fromtimestamp(
                        rnd.random() * base_timestamp
                    ).isoformat(),
                ),
                ("dimension0", rnd.choice(STRING_CHOICES)),
                ("dimension1", rnd.choice(STRING_CHOICES)),
                ("dimension2", rnd.choice(STRING_CHOICES)),
                ("dimension3", rnd.choice(STRING_CHOICES)),
                ("dimension4", rnd.choice(STRING_CHOICES)),
                ("counter0", rnd.randrange(10_000_000)),
                ("counter1", rnd.randrange(10_000_000)),
                ("counter2", rnd.randrange(10_000_000)),
                ("counter3", rnd.randrange(10_000_000)),
                ("counter4", rnd.randrange(10_000_000)),
                ("counter5", rnd.randrange(10_000_000)),
                ("counter6", rnd.randrange(10_000_000)),
                ("counter7", rnd.randrange(10_000_000)),
                ("counter8", rnd.randrange(10_000_000)),
                ("counter9", rnd.randrange(10_000_000)),
                ("counter0f", rnd.randrange(10_000_000) / 10_000.0),
                ("counter1f", rnd.randrange(10_000_000) / 10_000.0),
                ("counter2f", rnd.randrange(10_000_000) / 10_000.0),
                ("counter3f", rnd.randrange(10_000_000) / 10_000.0),
                ("counter4f", rnd.randrange(10_000_000) / 10_000.0),
                ("counter5f", rnd.randrange(10_000_000) / 10_000.0),
                ("counter6f", rnd.randrange(10_000_000) / 10_000.0),
                ("counter7f", rnd.randrange(10_000_000) / 10_000.0),
                ("counter8f", rnd.randrange(10_000_000) / 10_000.0),
                ("counter9f", rnd.randrange(10_000_000) / 10_000.0),
            ]
        )
        rows.append(row)

    return rows


def generate_chunks(rows, chunk_size=8192):
    body = json.dumps(rows, separators=(",", ":"))
    return [body[i: i + chunk_size] for i in range(0, len(body), chunk_size)]


def verify():
    logging.info("Generating data...")
    rows = generate_rows()

    logging.info("Generating chunks...")
    chunks = generate_chunks(rows)

    logging.info("Parsing chunks...")
    parsed_rows = list(rows_from_chunks(chunks))

    logging.info("Verifying results...")

    assert len(rows) == len(parsed_rows), "The number of rows is not the expected"

    for row, parsed_row in zip(rows, parsed_rows):
        assert tuple(row.items()) == tuple(
            parsed_row.items()
        ), "Rows differ. %r != %r" % (row, parsed_row)


def main(options):
    if options.verify:
        verify()

    logging.info("Starting benchmark...")
    timer = timeit.Timer(
        setup="chunks = generate_chunks(generate_rows())",
        stmt="list(rows_from_chunks(chunks))",
        globals=globals(),
    )

    timings = timer.repeat(repeat=options.repeat, number=1)
    logging.info("Best timing = %f", min(timings))


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Benchmark rows_from_chunks.")
    parser.add_argument("--verify", dest="verify", action="store_true")
    parser.add_argument("--no-verify", dest="verify", action="store_false")
    parser.set_defaults(verify=True)

    parser.add_argument("--repeat", "-r", type=int, default=20)

    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s - [%(levelname)s] - %(name)s:%(module)s:%(funcName)s: %(message)s",
    )
    main(parser.parse_args())

In our tests, however, we had an actual speedup closer to 9x when fetching the rows from one of our test queries.

In Python 3.6 and lower, we cannot avoid the overhead of calling OrderedDict. In our tests, though, the speedup is still close to 4x.

As a side effect, this PR should also solve #242 , since the parsing is now completely delegated to the official json module.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant