Skip to content

compat!: Support dask query-planning, drop Python 3.9, update pins #171

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 22 commits into from
Feb 13, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
@@ -83,15 +83,15 @@ jobs:
run: |
MATRIX=$(jq -nsc '{
"os": ["ubuntu-latest", "macos-latest", "windows-latest"],
"environment": ["test-39", "test-312"]
"environment": ["test-310", "test-312"]
}')
echo "MATRIX=$MATRIX" >> $GITHUB_ENV
- name: Set test matrix with 'full' option
if: env.MATRIX_OPTION == 'full'
run: |
MATRIX=$(jq -nsc '{
"os": ["ubuntu-latest", "macos-latest", "windows-latest"],
"environment": ["test-39", "test-310", "test-311", "test-312"]
"environment": ["test-310", "test-311", "test-312"]
}')
echo "MATRIX=$MATRIX" >> $GITHUB_ENV
- name: Set test matrix with 'downstream' option
14 changes: 6 additions & 8 deletions pixi.toml
Original file line number Diff line number Diff line change
@@ -8,11 +8,9 @@ install = 'python -m pip install --no-deps --disable-pip-version-check -e .'

[activation.env]
PYTHONIOENCODING = "utf-8"
DASK_DATAFRAME__QUERY_PLANNING = "False" # TODO: Support query planning
USE_PYGEOS = '0'

[environments]
test-39 = ["py39", "test-core", "test", "test-task", "example", "test-example"]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dask 2025.1 only supports Python 3.10.

test-310 = ["py310", "test-core", "test", "test-task", "example", "test-example"]
test-311 = ["py311", "test-core", "test", "test-task", "example", "test-example"]
test-312 = ["py312", "test-core", "test", "test-task", "example", "test-example"]
@@ -22,17 +20,14 @@ lint = ["py311", "lint"]

[dependencies]
numba = "*"
dask-core = "<2025.1" # TODO: Does not work with new DataFrame interface
dask-core = ">=2025.1"
fsspec = "*"
packaging = "*"
pandas = "*"
pandas = ">=2.0"
pip = "*"
pyarrow = ">=10"
pyarrow = ">=14.0.1"
retrying = "*"

[feature.py39.dependencies]
python = "3.9.*"

[feature.py310.dependencies]
python = "3.10.*"

@@ -86,6 +81,9 @@ test-unit = 'pytest spatialpandas/tests -n logical --dist loadgroup'
[feature.test-example.dependencies]
nbval = "*"

[feature.test-example.activation.env]
DASK_SCHEDULER = "single-threaded"
Copy link
Member Author

@hoxbro hoxbro Feb 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to try to avoid this kind of error (which also happens on main)

image

edit: It still seems to fail, but I will let it be for now.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, that seems like a dask configuration problem, probably not for us to worry about.


[feature.test-example.tasks]
test-example = 'pytest -n logical --dist loadscope --nbval-lax examples'

12 changes: 7 additions & 5 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -8,14 +8,13 @@ dynamic = ["version"]
description = 'Pandas extension arrays for spatial/geometric operations'
readme = "README.md"
license = { text = "BSD-2-Clause" }
requires-python = ">=3.9"
requires-python = ">=3.10"
authors = [{ name = "HoloViz developers", email = "developers@holoviz.org" }]
maintainers = [{ name = "HoloViz developers", email = "developers@holoviz.org" }]
classifiers = [
"License :: OSI Approved :: BSD License",
"Development Status :: 5 - Production/Stable",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
@@ -27,12 +26,12 @@ classifiers = [
"Topic :: Software Development :: Libraries",
]
dependencies = [
'dask <2025.1',
'dask >=2025.1',
'fsspec >=2022.8',
'numba',
'packaging',
'pandas',
'pyarrow >=10',
'pandas >=2.0',
'pyarrow >=14.0.1',
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two are part of the minimum dependencies of dask

'retrying',
]

@@ -74,11 +73,14 @@ filterwarnings = [
"ignore:datetime.datetime.utcnow():DeprecationWarning:botocore", # https://github.com/boto/boto3/issues/3889
# 2024-11
"ignore:The legacy Dask DataFrame implementation is deprecated:FutureWarning", # https://github.com/holoviz/spatialpandas/issues/146
# 2025-02
"ignore:Dask annotations ..retries.. 5. detected:UserWarning", # https://github.com/dask/dask/issues/11721
]

[tool.ruff]
fix = true
line-length = 100
target-version = "py39" # TODO: Remove in follow up PR
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in #173


[tool.ruff.lint]
select = [
55 changes: 29 additions & 26 deletions spatialpandas/dask.py
Original file line number Diff line number Diff line change
@@ -13,25 +13,18 @@
from dask import delayed
from dask.dataframe.core import get_parallel_type
from dask.dataframe.extensions import make_array_nonempty
from dask.dataframe.partitionquantiles import partition_quantiles
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from packaging.version import Version
from dask.dataframe.utils import make_meta_obj, meta_nonempty
from retrying import retry

try:
from dask.dataframe.backends import meta_nonempty
from dask.dataframe.dispatch import make_meta_dispatch
except ImportError:
from dask.dataframe.utils import make_meta as make_meta_dispatch, meta_nonempty

from .geodataframe import GeoDataFrame
from .geometry.base import GeometryDtype, _BaseCoordinateIndexer
from .geoseries import GeoSeries
from .spatialindex import HilbertRtree


class DaskGeoSeries(dd.Series):
def __init__(self, dsk, name, meta, divisions, *args, **kwargs):
super().__init__(dsk, name, meta, divisions)
def __init__(self, expr, *args, **kwargs):
super().__init__(expr, *args, **kwargs)

# Init backing properties
self._partition_bounds = None
@@ -105,8 +98,10 @@ def persist(self, **kwargs):
)


@make_meta_dispatch.register(GeoSeries)
@make_meta_obj.register(GeoSeries)
def make_meta_series(s, index=None):
if hasattr(s, "__array__") or isinstance(s, np.ndarray):
return s[:0]
result = s.head(0)
if index is not None:
result = result.reindex(index[:0])
@@ -119,13 +114,18 @@ def meta_nonempty_series(s, index=None):


@get_parallel_type.register(GeoSeries)
def get_parallel_type_dataframe(df):
def get_parallel_type_series(df):
return DaskGeoSeries


@dd.get_collection_type.register(GeoSeries)
def get_collection_type_series(df):
return DaskGeoSeries


class DaskGeoDataFrame(dd.DataFrame):
def __init__(self, dsk, name, meta, divisions):
super().__init__(dsk, name, meta, divisions)
def __init__(self, expr, *args, **kwargs):
super().__init__(expr, *args, **kwargs)
self._partition_sindex = {}
self._partition_bounds = {}

@@ -191,11 +191,7 @@ def pack_partitions(self, npartitions=None, p=15, shuffle='tasks'):

# Set index to distance. This will trigger an expensive shuffle
# sort operation
if Version(dask.__version__) >= Version('2024.1'):
shuffle_kwargs = {'shuffle_method': shuffle}
else:
shuffle_kwargs = {'shuffle': shuffle}
ddf = ddf.set_index('hilbert_distance', npartitions=npartitions, **shuffle_kwargs)
ddf = ddf.set_index('hilbert_distance', npartitions=npartitions, shuffle_method=shuffle)

if ddf.npartitions != npartitions:
# set_index doesn't change the number of partitions if the partitions
@@ -312,8 +308,9 @@ def move_retry(p1, p2):
ddf = self._with_hilbert_distance_column(p)

# Compute output hilbert_distance divisions
quantiles = partition_quantiles(
ddf.hilbert_distance, npartitions
from dask.dataframe.dask_expr import RepartitionQuantiles, new_collection
quantiles = new_collection(
RepartitionQuantiles(ddf.hilbert_distance, npartitions)
).compute().values

# Add _partition column containing output partition number of each row
@@ -328,7 +325,7 @@ def move_retry(p1, p2):
for out_partition in out_partitions
]
part_output_paths = [
os.path.join(path, f"part.{int(out_partition)}.parquet")
os.path.join(path, f"part.{out_partition}.parquet")
for out_partition in out_partitions
]

@@ -338,7 +335,7 @@ def move_retry(p1, p2):
rm_retry(path)

for out_partition in out_partitions:
part_dir = os.path.join(path, f"part.{int(out_partition)}.parquet" )
part_dir = os.path.join(path, f"part.{out_partition}.parquet" )
mkdirs_retry(part_dir)
tmp_part_dir = tempdir_format.format(partition=out_partition, uuid=dataset_uuid)
mkdirs_retry(tmp_part_dir)
@@ -360,7 +357,7 @@ def process_partition(df, i):
for out_partition, df_part in df.groupby('_partition'):
part_path = os.path.join(
tempdir_format.format(partition=out_partition, uuid=dataset_uuid),
f'part{int(i)}.parquet',
f'part{i}.parquet',
)
df_part = (
df_part
@@ -584,8 +581,10 @@ def __getitem__(self, key):
return result


@make_meta_dispatch.register(GeoDataFrame)
@make_meta_obj.register(GeoDataFrame)
def make_meta_dataframe(df, index=None):
if hasattr(df, "__array__") or isinstance(df, np.ndarray):
return df[:0]
result = df.head(0)
if index is not None:
result = result.reindex(index[:0])
@@ -598,10 +597,14 @@ def meta_nonempty_dataframe(df, index=None):


@get_parallel_type.register(GeoDataFrame)
def get_parallel_type_series(s):
def get_parallel_type_dataframe(s):
return DaskGeoDataFrame


@dd.get_collection_type.register(GeoDataFrame)
def get_collection_type_dataframe(df):
return DaskGeoDataFrame

class _DaskCoordinateIndexer(_BaseCoordinateIndexer):
def __init__(self, obj, sindex):
super().__init__(sindex)
29 changes: 9 additions & 20 deletions spatialpandas/io/parquet.py
Original file line number Diff line number Diff line change
@@ -17,7 +17,6 @@
to_parquet as dd_to_parquet,
)
from dask.utils import natural_sort_key
from packaging.version import Version
from pandas.io.parquet import to_parquet as pd_to_parquet
from pyarrow.parquet import ParquetDataset, ParquetFile, read_metadata

@@ -30,9 +29,6 @@
validate_coerce_filesystem,
)

# improve pandas compatibility, based on geopandas _compat.py
PANDAS_GE_12 = Version(pd.__version__) >= Version("1.2.0")


def _load_parquet_pandas_metadata(
path,
@@ -83,23 +79,16 @@ def to_parquet(
filesystem = validate_coerce_filesystem(path, filesystem, storage_options)

# Standard pandas to_parquet with pyarrow engine
to_parquet_args = {
"df": df,
"path": path,
"engine": "pyarrow",
"compression": compression,
"filesystem": filesystem,
"index": index,
pd_to_parquet(
df=df,
path=path,
engine="pyarrow",
compression=compression,
filesystem=filesystem,
index=index,
storage_options=storage_options,
**kwargs,
}

if PANDAS_GE_12:
to_parquet_args.update({"storage_options": storage_options})
elif filesystem is None:
filesystem = validate_coerce_filesystem(path, filesystem, storage_options)
to_parquet_args.update({"filesystem": filesystem})

pd_to_parquet(**to_parquet_args)
)


def read_parquet(