Skip to content

Commit

Permalink
fix: Change default .arrow compression to "uncompressed" (#656)
Browse files Browse the repository at this point in the history
  • Loading branch information
dangotbanned authored Dec 31, 2024
1 parent 369b462 commit 7c2e67f
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 19 deletions.
Binary file modified data/flights-200k.arrow
Binary file not shown.
4 changes: 2 additions & 2 deletions datapackage.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
}
],
"version": "2.11.0",
"created": "2024-12-21T16:09:47.521815+00:00",
"created": "2024-12-31T18:32:26.970186+00:00",
"resources": [
{
"name": "7zip.png",
Expand Down Expand Up @@ -1080,7 +1080,7 @@
"scheme": "file",
"format": ".arrow",
"mediatype": "application/vnd.apache.arrow.file",
"bytes": 662832,
"bytes": 1600864,
"schema": {
"fields": [
{
Expand Down
2 changes: 1 addition & 1 deletion datapackage.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# vega-datasets
`2.11.0` | [GitHub](http://github.com/vega/vega-datasets.git) | 2024-12-21 16:09:47 [UTC]
`2.11.0` | [GitHub](http://github.com/vega/vega-datasets.git) | 2024-12-31 18:32:26 [UTC]

Common repository for example datasets used by Vega related projects.
BSD-3-Clause license applies only to package code and infrastructure. Users should verify their use of datasets
Expand Down
72 changes: 56 additions & 16 deletions scripts/flights.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import tomllib
import zipfile
from collections import defaultdict, deque
from collections.abc import Iterable, Sequence
from collections.abc import Iterable, Mapping, Sequence
from functools import cached_property
from pathlib import Path
from typing import TYPE_CHECKING, Annotated, Literal
Expand All @@ -43,7 +43,7 @@

if TYPE_CHECKING:
import sys
from collections.abc import Iterator, Mapping
from collections.abc import Iterator
from typing import Any, ClassVar, LiteralString

if sys.version_info >= (3, 13):
Expand All @@ -63,6 +63,15 @@ def is_extension(obj: Any) -> TypeIs[Extension]:
return obj in _get_args(Extension)


type WriteOptions = Mapping[str, Any]


def is_write_options(obj: Any) -> TypeIs[WriteOptions]:
return isinstance(obj, Mapping) and (
len(obj) == 0 or all(isinstance(k, str) for k in obj)
)


type Column = Literal[
"date",
"time",
Expand Down Expand Up @@ -394,10 +403,23 @@ class Spec:
see ``DateTimeFormat`` doc.
columns
Columns included in the output.
write_options
Overrides for defaults defined in ``Spec._WRITE_OPTIONS``.
"""

_PREFIX: ClassVar[Literal["flights-"]] = "flights-"
_RANDOM_SEED: ClassVar[Literal[42]] = 42
_WRITE_OPTIONS: ClassVar[Mapping[Extension, WriteOptions]] = {
".arrow": {"compression": "uncompressed"},
".parquet": {"compression": "zstd", "compression_level": 22},
".csv": {
"date_format": None,
"datetime_format": None,
"time_format": None,
"null_value": None,
},
}
"""Default keyword arguments used for each output format."""

def __init__(
self,
Expand All @@ -406,17 +428,19 @@ def __init__(
suffix: Extension,
dt_format: DateTimeFormat = None,
columns: Sequence[Column] = COLUMNS_DEFAULT,
write_options: WriteOptions | None = None,
) -> None:
self.range: DateRange = (
range if isinstance(range, DateRange) else DateRange.from_dates(range)
)
n_rows, suffix, dt_format, columns = self._validate(
n_rows, suffix, dt_format, columns
n_rows, suffix, dt_format, columns, write_options = self._validate(
n_rows, suffix, dt_format, columns, write_options
)
self.n_rows: Rows = n_rows
self.suffix: Extension = suffix
self.dt_format: DateTimeFormat = dt_format
self.columns: Sequence[Column] = columns
self.write_options: WriteOptions = self._merge_write_options(write_options)

@classmethod
def from_dict(cls, mapping: Mapping[str, Any], /) -> Spec:
Expand Down Expand Up @@ -499,26 +523,27 @@ def write(self, df: pl.DataFrame, output_dir: Path, /) -> None:
fp.touch()
msg = f"Writing {fp.as_posix()!r} ..."
logger.info(msg)
kwds = self.write_options
match self.suffix:
case ".arrow":
df.write_ipc(fp, compression="zstd")
df.with_columns(pl.all().shrink_dtype()).write_ipc(fp, **kwds)
case ".csv":
df.write_csv(
fp,
date_format=None,
datetime_format=None,
time_format=None,
null_value=None,
)
df.write_csv(fp, **kwds)
case ".json":
df.write_json(fp)
case ".parquet":
df.write_parquet(fp, compression="zstd", compression_level=22)
df.write_parquet(fp, **kwds)
case _:
fp.unlink()
msg = f"Unexpected extension {self.suffix!r}"
raise NotImplementedError(msg)

def _merge_write_options(self, kwds: WriteOptions, /) -> WriteOptions:
defaults = dict(self._WRITE_OPTIONS.get(self.suffix, {}))
if kwds:
defaults.update(kwds)
return defaults

def _transform_temporal(self, ldf: pl.LazyFrame, /) -> pl.LazyFrame:
if not self.dt_format:
return ldf
Expand All @@ -531,8 +556,8 @@ def _transform_temporal(self, ldf: pl.LazyFrame, /) -> pl.LazyFrame:

@staticmethod
def _validate(
n_rows: Any, suffix: Any, dt_format: Any, columns: Any, /
) -> tuple[Rows, Extension, DateTimeFormat, Sequence[Column]]:
n_rows: Any, suffix: Any, dt_format: Any, columns: Any, write_options: Any, /
) -> tuple[Rows, Extension, DateTimeFormat, Sequence[Column], WriteOptions]:
if not is_columns(columns):
msg = f"`columns` contains unrecognized names:\n{columns!r}"
raise TypeError(msg)
Expand Down Expand Up @@ -561,7 +586,22 @@ def _validate(
msg = f"Unexpected extension {suffix!r}"
raise TypeError(msg)

return n_rows, suffix, dt_format, columns
if suffix == ".json" and write_options:
msg = (
f"Keyword arguments are not supported for {pl.DataFrame.write_json.__qualname__!r}.\n"
f"But got: {write_options!r}\n"
)
raise TypeError(msg)

write_options = write_options or {}
if not is_write_options(write_options):
msg = (
f"Expected a mapping of keyword arguments for `write_options`.\n"
f"But got: {type(write_options).__name__!r}"
)
raise TypeError(msg)

return n_rows, suffix, dt_format, columns, write_options


class SourceMap:
Expand Down

0 comments on commit 7c2e67f

Please sign in to comment.