From f365d29a082342fb8e40c21608be3f8664e60e42 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20H=C3=B8xbro=20Hansen?= Date: Thu, 30 Jan 2025 14:40:54 +0100 Subject: [PATCH 01/22] compat: Support dask query-planning --- pixi.toml | 7 +------ pyproject.toml | 2 +- 2 files changed, 2 insertions(+), 7 deletions(-) diff --git a/pixi.toml b/pixi.toml index 408d0c7..b0466e4 100644 --- a/pixi.toml +++ b/pixi.toml @@ -8,11 +8,9 @@ install = 'python -m pip install --no-deps --disable-pip-version-check -e .' [activation.env] PYTHONIOENCODING = "utf-8" -DASK_DATAFRAME__QUERY_PLANNING = "False" # TODO: Support query planning USE_PYGEOS = '0' [environments] -test-39 = ["py39", "test-core", "test", "test-task", "example", "test-example"] test-310 = ["py310", "test-core", "test", "test-task", "example", "test-example"] test-311 = ["py311", "test-core", "test", "test-task", "example", "test-example"] test-312 = ["py312", "test-core", "test", "test-task", "example", "test-example"] @@ -22,7 +20,7 @@ lint = ["py311", "lint"] [dependencies] numba = "*" -dask-core = "<2025.1" # TODO: Does not work with new DataFrame interface +dask-core = ">=2025.1" fsspec = "*" packaging = "*" pandas = "*" @@ -30,9 +28,6 @@ pip = "*" pyarrow = ">=10" retrying = "*" -[feature.py39.dependencies] -python = "3.9.*" - [feature.py310.dependencies] python = "3.10.*" diff --git a/pyproject.toml b/pyproject.toml index 3eed850..9ad56f6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -27,7 +27,7 @@ classifiers = [ "Topic :: Software Development :: Libraries", ] dependencies = [ - 'dask <2025.1', + 'dask >=2025.1', 'fsspec >=2022.8', 'numba', 'packaging', From 19afd85a09a4390029e6ccf7272d18821217828e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20H=C3=B8xbro=20Hansen?= Date: Thu, 30 Jan 2025 14:43:11 +0100 Subject: [PATCH 02/22] update imports --- spatialpandas/dask.py | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/spatialpandas/dask.py b/spatialpandas/dask.py index 21cf467..7042abb 100644 --- a/spatialpandas/dask.py +++ b/spatialpandas/dask.py @@ -13,16 +13,10 @@ from dask import delayed from dask.dataframe.core import get_parallel_type from dask.dataframe.extensions import make_array_nonempty -from dask.dataframe.partitionquantiles import partition_quantiles +from dask.dataframe.utils import make_meta as make_meta_dispatch, meta_nonempty from packaging.version import Version from retrying import retry -try: - from dask.dataframe.backends import meta_nonempty - from dask.dataframe.dispatch import make_meta_dispatch -except ImportError: - from dask.dataframe.utils import make_meta as make_meta_dispatch, meta_nonempty - from .geodataframe import GeoDataFrame from .geometry.base import GeometryDtype, _BaseCoordinateIndexer from .geoseries import GeoSeries @@ -312,6 +306,7 @@ def move_retry(p1, p2): ddf = self._with_hilbert_distance_column(p) # Compute output hilbert_distance divisions + from dask.dataframe.partitionquantiles import partition_quantiles quantiles = partition_quantiles( ddf.hilbert_distance, npartitions ).compute().values From 43e64636b4d903f480f54da70317db633a3e43fe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20H=C3=B8xbro=20Hansen?= Date: Thu, 30 Jan 2025 14:47:11 +0100 Subject: [PATCH 03/22] rename make_meta_dispatch.register(...) -> make_meta_dispatch(...) --- spatialpandas/dask.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spatialpandas/dask.py b/spatialpandas/dask.py index 7042abb..dc3a406 100644 --- a/spatialpandas/dask.py +++ b/spatialpandas/dask.py @@ -99,7 +99,7 @@ def persist(self, **kwargs): ) -@make_meta_dispatch.register(GeoSeries) +@make_meta_dispatch(GeoSeries) def make_meta_series(s, index=None): result = s.head(0) if index is not None: @@ -579,7 +579,7 @@ def __getitem__(self, key): return result -@make_meta_dispatch.register(GeoDataFrame) +@make_meta_dispatch(GeoDataFrame) def make_meta_dataframe(df, index=None): result = df.head(0) if index is not None: From 589ded7e29da109a94dfffafbcb35006549b4d9f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20H=C3=B8xbro=20Hansen?= Date: Thu, 30 Jan 2025 15:27:37 +0100 Subject: [PATCH 04/22] use make_meta_obj --- spatialpandas/dask.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/spatialpandas/dask.py b/spatialpandas/dask.py index dc3a406..2bbbc8d 100644 --- a/spatialpandas/dask.py +++ b/spatialpandas/dask.py @@ -13,7 +13,7 @@ from dask import delayed from dask.dataframe.core import get_parallel_type from dask.dataframe.extensions import make_array_nonempty -from dask.dataframe.utils import make_meta as make_meta_dispatch, meta_nonempty +from dask.dataframe.utils import make_meta_obj, meta_nonempty from packaging.version import Version from retrying import retry @@ -24,6 +24,9 @@ class DaskGeoSeries(dd.Series): + + _partition_type = GeoSeries + def __init__(self, dsk, name, meta, divisions, *args, **kwargs): super().__init__(dsk, name, meta, divisions) @@ -99,7 +102,7 @@ def persist(self, **kwargs): ) -@make_meta_dispatch(GeoSeries) +@make_meta_obj.register(GeoSeries) def make_meta_series(s, index=None): result = s.head(0) if index is not None: @@ -118,6 +121,8 @@ def get_parallel_type_dataframe(df): class DaskGeoDataFrame(dd.DataFrame): + _partition_type = GeoDataFrame + def __init__(self, dsk, name, meta, divisions): super().__init__(dsk, name, meta, divisions) self._partition_sindex = {} @@ -579,7 +584,7 @@ def __getitem__(self, key): return result -@make_meta_dispatch(GeoDataFrame) +@make_meta_obj.register(GeoDataFrame) def make_meta_dataframe(df, index=None): result = df.head(0) if index is not None: From c7f4108de8ea9937f901eacee25eeaa4467e839d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20H=C3=B8xbro=20Hansen?= Date: Thu, 30 Jan 2025 20:13:55 +0100 Subject: [PATCH 05/22] More registers --- spatialpandas/dask.py | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/spatialpandas/dask.py b/spatialpandas/dask.py index 2bbbc8d..f6a6de2 100644 --- a/spatialpandas/dask.py +++ b/spatialpandas/dask.py @@ -27,8 +27,8 @@ class DaskGeoSeries(dd.Series): _partition_type = GeoSeries - def __init__(self, dsk, name, meta, divisions, *args, **kwargs): - super().__init__(dsk, name, meta, divisions) + def __init__(self, expr, *args, **kwargs): + super().__init__(expr, *args, **kwargs) # Init backing properties self._partition_bounds = None @@ -116,15 +116,20 @@ def meta_nonempty_series(s, index=None): @get_parallel_type.register(GeoSeries) -def get_parallel_type_dataframe(df): +def get_parallel_type_series(df): + return DaskGeoSeries + + +@dd.get_collection_type.register(GeoSeries) +def get_collection_type_series(df): return DaskGeoSeries class DaskGeoDataFrame(dd.DataFrame): _partition_type = GeoDataFrame - def __init__(self, dsk, name, meta, divisions): - super().__init__(dsk, name, meta, divisions) + def __init__(self, expr, *args, **kwargs): + super().__init__(expr, *args, **kwargs) self._partition_sindex = {} self._partition_bounds = {} @@ -598,10 +603,14 @@ def meta_nonempty_dataframe(df, index=None): @get_parallel_type.register(GeoDataFrame) -def get_parallel_type_series(s): +def get_parallel_type_dataframe(s): return DaskGeoDataFrame +@dd.get_collection_type.register(GeoDataFrame) +def get_collection_type_dataframe(df): + return DaskGeoDataFrame + class _DaskCoordinateIndexer(_BaseCoordinateIndexer): def __init__(self, obj, sindex): super().__init__(sindex) From fab834926a0096c4f439cb66dddce1f7500243ae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20H=C3=B8xbro=20Hansen?= Date: Thu, 30 Jan 2025 20:49:42 +0100 Subject: [PATCH 06/22] update test action --- .github/workflows/test.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index 4e30210..191d7c8 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -83,7 +83,7 @@ jobs: run: | MATRIX=$(jq -nsc '{ "os": ["ubuntu-latest", "macos-latest", "windows-latest"], - "environment": ["test-39", "test-312"] + "environment": ["test-310", "test-312"] }') echo "MATRIX=$MATRIX" >> $GITHUB_ENV - name: Set test matrix with 'full' option @@ -91,7 +91,7 @@ jobs: run: | MATRIX=$(jq -nsc '{ "os": ["ubuntu-latest", "macos-latest", "windows-latest"], - "environment": ["test-39", "test-310", "test-311", "test-312"] + "environment": ["test-310", "test-311", "test-312"] }') echo "MATRIX=$MATRIX" >> $GITHUB_ENV - name: Set test matrix with 'downstream' option From b43b4c1e2385678da8239d5d045d61d0c0b2cc5d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20H=C3=B8xbro=20Hansen?= Date: Thu, 30 Jan 2025 20:55:41 +0100 Subject: [PATCH 07/22] Update to use dd.partitionquantiles --- spatialpandas/dask.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/spatialpandas/dask.py b/spatialpandas/dask.py index f6a6de2..a7acd76 100644 --- a/spatialpandas/dask.py +++ b/spatialpandas/dask.py @@ -316,8 +316,7 @@ def move_retry(p1, p2): ddf = self._with_hilbert_distance_column(p) # Compute output hilbert_distance divisions - from dask.dataframe.partitionquantiles import partition_quantiles - quantiles = partition_quantiles( + quantiles = dd.partitionquantiles( ddf.hilbert_distance, npartitions ).compute().values From f4164b310970981b5c91454a65c0dfd6cc2ad68b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20H=C3=B8xbro=20Hansen?= Date: Thu, 30 Jan 2025 21:52:40 +0100 Subject: [PATCH 08/22] Update partitionquantiles to new method --- spatialpandas/dask.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/spatialpandas/dask.py b/spatialpandas/dask.py index a7acd76..14abbaa 100644 --- a/spatialpandas/dask.py +++ b/spatialpandas/dask.py @@ -316,14 +316,15 @@ def move_retry(p1, p2): ddf = self._with_hilbert_distance_column(p) # Compute output hilbert_distance divisions - quantiles = dd.partitionquantiles( - ddf.hilbert_distance, npartitions - ).compute().values + from dask.dataframe.dask_expr import RepartitionQuantiles, new_collection + quantiles, *_ = dask.compute( + new_collection(RepartitionQuantiles(ddf.hilbert_distance, npartitions)) + ) # Add _partition column containing output partition number of each row ddf = ddf.map_partitions( lambda df: df.assign( - _partition=np.digitize(df.hilbert_distance, quantiles[1:], right=True)) + _partition=np.digitize(df.hilbert_distance, quantiles.values[1:], right=True)) ) # Compute part paths From d22906208f12e256d7c0853167e58b0194f7c45b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20H=C3=B8xbro=20Hansen?= Date: Fri, 31 Jan 2025 10:36:07 +0100 Subject: [PATCH 09/22] clean up --- spatialpandas/dask.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/spatialpandas/dask.py b/spatialpandas/dask.py index 14abbaa..5a88751 100644 --- a/spatialpandas/dask.py +++ b/spatialpandas/dask.py @@ -317,14 +317,14 @@ def move_retry(p1, p2): # Compute output hilbert_distance divisions from dask.dataframe.dask_expr import RepartitionQuantiles, new_collection - quantiles, *_ = dask.compute( - new_collection(RepartitionQuantiles(ddf.hilbert_distance, npartitions)) - ) + quantiles = new_collection( + RepartitionQuantiles(ddf.hilbert_distance, npartitions) + ).compute().values # Add _partition column containing output partition number of each row ddf = ddf.map_partitions( lambda df: df.assign( - _partition=np.digitize(df.hilbert_distance, quantiles.values[1:], right=True)) + _partition=np.digitize(df.hilbert_distance, quantiles[1:], right=True)) ) # Compute part paths From 805dbe9dc8025e7b72743a67f5205f4c3a4364ab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20H=C3=B8xbro=20Hansen?= Date: Fri, 31 Jan 2025 10:36:22 +0100 Subject: [PATCH 10/22] Remove int conversion for f-string --- spatialpandas/dask.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/spatialpandas/dask.py b/spatialpandas/dask.py index 5a88751..b9e2afb 100644 --- a/spatialpandas/dask.py +++ b/spatialpandas/dask.py @@ -333,7 +333,7 @@ def move_retry(p1, p2): for out_partition in out_partitions ] part_output_paths = [ - os.path.join(path, f"part.{int(out_partition)}.parquet") + os.path.join(path, f"part.{out_partition}.parquet") for out_partition in out_partitions ] @@ -343,7 +343,7 @@ def move_retry(p1, p2): rm_retry(path) for out_partition in out_partitions: - part_dir = os.path.join(path, f"part.{int(out_partition)}.parquet" ) + part_dir = os.path.join(path, f"part.{out_partition}.parquet" ) mkdirs_retry(part_dir) tmp_part_dir = tempdir_format.format(partition=out_partition, uuid=dataset_uuid) mkdirs_retry(tmp_part_dir) @@ -365,7 +365,7 @@ def process_partition(df, i): for out_partition, df_part in df.groupby('_partition'): part_path = os.path.join( tempdir_format.format(partition=out_partition, uuid=dataset_uuid), - f'part{int(i)}.parquet', + f'part{i}.parquet', ) df_part = ( df_part From 3ce10dbeb35057db9983931b7b7c3373e242d0e5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20H=C3=B8xbro=20Hansen?= Date: Fri, 31 Jan 2025 10:45:00 +0100 Subject: [PATCH 11/22] Ignore userwarning when using to_parquet --- spatialpandas/io/parquet.py | 30 +++++++++++++++++++----------- 1 file changed, 19 insertions(+), 11 deletions(-) diff --git a/spatialpandas/io/parquet.py b/spatialpandas/io/parquet.py index 4d6dc8a..d301785 100644 --- a/spatialpandas/io/parquet.py +++ b/spatialpandas/io/parquet.py @@ -1,4 +1,5 @@ import json +import warnings from collections.abc import Iterable from functools import reduce from glob import has_magic @@ -180,17 +181,24 @@ def to_parquet_dask( spatial_metadata = {'partition_bounds': partition_bounds} b_spatial_metadata = json.dumps(spatial_metadata).encode('utf') - dd_to_parquet( - ddf, - path, - engine="pyarrow", - compression=compression, - storage_options=storage_options, - custom_metadata={b'spatialpandas': b_spatial_metadata}, - write_metadata_file=True, - **engine_kwargs, - **kwargs, - ) + + with warnings.catch_warnings(): + warnings.filterwarnings( + "ignore", + category=UserWarning, + message="Dask annotations .* detected. Annotations will be ignored when using query-planning.", + ) + dd_to_parquet( + ddf, + path, + engine="pyarrow", + compression=compression, + storage_options=storage_options, + custom_metadata={b'spatialpandas': b_spatial_metadata}, + write_metadata_file=True, + **engine_kwargs, + **kwargs, + ) def read_parquet_dask( From 8e1f92d548210cd04b1f258a2ccdc6bd4a9197a7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20H=C3=B8xbro=20Hansen?= Date: Fri, 31 Jan 2025 12:01:35 +0100 Subject: [PATCH 12/22] Add np.ndarray --- spatialpandas/dask.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/spatialpandas/dask.py b/spatialpandas/dask.py index b9e2afb..54de057 100644 --- a/spatialpandas/dask.py +++ b/spatialpandas/dask.py @@ -104,6 +104,8 @@ def persist(self, **kwargs): @make_meta_obj.register(GeoSeries) def make_meta_series(s, index=None): + if hasattr(s, "__array__") or isinstance(s, np.ndarray): + return s[:0] result = s.head(0) if index is not None: result = result.reindex(index[:0]) @@ -591,6 +593,8 @@ def __getitem__(self, key): @make_meta_obj.register(GeoDataFrame) def make_meta_dataframe(df, index=None): + if hasattr(df, "__array__") or isinstance(df, np.ndarray): + return df[:0] result = df.head(0) if index is not None: result = result.reindex(index[:0]) From cc4aed56ece9a3830222b482046d0fb7a14b0858 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20H=C3=B8xbro=20Hansen?= Date: Fri, 31 Jan 2025 13:46:57 +0100 Subject: [PATCH 13/22] Update pyproject.toml --- pyproject.toml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 9ad56f6..6c9723f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -8,14 +8,13 @@ dynamic = ["version"] description = 'Pandas extension arrays for spatial/geometric operations' readme = "README.md" license = { text = "BSD-2-Clause" } -requires-python = ">=3.9" +requires-python = ">=3.10" authors = [{ name = "HoloViz developers", email = "developers@holoviz.org" }] maintainers = [{ name = "HoloViz developers", email = "developers@holoviz.org" }] classifiers = [ "License :: OSI Approved :: BSD License", "Development Status :: 5 - Production/Stable", "Programming Language :: Python :: 3", - "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", "Programming Language :: Python :: 3.12", From 7c0018619bd027ec4347987509af6438fdad62ba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20H=C3=B8xbro=20Hansen?= Date: Fri, 31 Jan 2025 13:55:08 +0100 Subject: [PATCH 14/22] Disable Python 3.10 linting rules for now --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index 6c9723f..b2d9e81 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -78,6 +78,7 @@ filterwarnings = [ [tool.ruff] fix = true line-length = 100 +target-version = "py39" # TODO: Remove in follow up PR [tool.ruff.lint] select = [ From 76ecbe90be8b1c88be1395f23d86405a3e375abc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20H=C3=B8xbro=20Hansen?= Date: Sun, 2 Feb 2025 14:44:09 +0100 Subject: [PATCH 15/22] dask-scheduler as single-threaded for examples --- pixi.toml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pixi.toml b/pixi.toml index b0466e4..8ad6fdb 100644 --- a/pixi.toml +++ b/pixi.toml @@ -81,6 +81,9 @@ test-unit = 'pytest spatialpandas/tests -n logical --dist loadgroup' [feature.test-example.dependencies] nbval = "*" +[feature.test-example.activation.env] +DASK_SCHEDULER = "single-threaded" + [feature.test-example.tasks] test-example = 'pytest -n logical --dist loadscope --nbval-lax examples' From 2ffea479e9012ca01a25369a74a6a0aa8caf9166 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20H=C3=B8xbro=20Hansen?= Date: Mon, 3 Feb 2025 08:55:07 +0100 Subject: [PATCH 16/22] Bump minimum version dependencies to match dask --- pixi.toml | 4 ++-- pyproject.toml | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pixi.toml b/pixi.toml index 8ad6fdb..405a23a 100644 --- a/pixi.toml +++ b/pixi.toml @@ -23,9 +23,9 @@ numba = "*" dask-core = ">=2025.1" fsspec = "*" packaging = "*" -pandas = "*" +pandas = ">=2.0" pip = "*" -pyarrow = ">=10" +pyarrow = ">=14.0.1" retrying = "*" [feature.py310.dependencies] diff --git a/pyproject.toml b/pyproject.toml index b2d9e81..ae3ee99 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,8 +30,8 @@ dependencies = [ 'fsspec >=2022.8', 'numba', 'packaging', - 'pandas', - 'pyarrow >=10', + 'pandas >=2.0', + 'pyarrow >=14.0.1', 'retrying', ] From 4ccf6756ba136ba46eb8f32bdef392d3aaca5e94 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20H=C3=B8xbro=20Hansen?= Date: Mon, 3 Feb 2025 08:58:59 +0100 Subject: [PATCH 17/22] Remove pandas 1.2 compability --- spatialpandas/io/parquet.py | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/spatialpandas/io/parquet.py b/spatialpandas/io/parquet.py index d301785..6ee3f98 100644 --- a/spatialpandas/io/parquet.py +++ b/spatialpandas/io/parquet.py @@ -18,7 +18,6 @@ to_parquet as dd_to_parquet, ) from dask.utils import natural_sort_key -from packaging.version import Version from pandas.io.parquet import to_parquet as pd_to_parquet from pyarrow.parquet import ParquetDataset, ParquetFile, read_metadata @@ -31,9 +30,6 @@ validate_coerce_filesystem, ) -# improve pandas compatibility, based on geopandas _compat.py -PANDAS_GE_12 = Version(pd.__version__) >= Version("1.2.0") - def _load_parquet_pandas_metadata( path, @@ -91,15 +87,10 @@ def to_parquet( "compression": compression, "filesystem": filesystem, "index": index, + "storage_options": storage_options, **kwargs, } - if PANDAS_GE_12: - to_parquet_args.update({"storage_options": storage_options}) - elif filesystem is None: - filesystem = validate_coerce_filesystem(path, filesystem, storage_options) - to_parquet_args.update({"filesystem": filesystem}) - pd_to_parquet(**to_parquet_args) From 7385cb82ccea473f9d1339986365649f07005a32 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20H=C3=B8xbro=20Hansen?= Date: Thu, 6 Feb 2025 12:10:16 +0100 Subject: [PATCH 18/22] Reduce warning to only be the current case --- spatialpandas/io/parquet.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spatialpandas/io/parquet.py b/spatialpandas/io/parquet.py index 6ee3f98..6df9e8e 100644 --- a/spatialpandas/io/parquet.py +++ b/spatialpandas/io/parquet.py @@ -177,7 +177,7 @@ def to_parquet_dask( warnings.filterwarnings( "ignore", category=UserWarning, - message="Dask annotations .* detected. Annotations will be ignored when using query-planning.", + message="Dask annotations {'retries': 5} detected", ) dd_to_parquet( ddf, From 6d5a3de3fa4c0bee7c063e286c209d7d935d0f98 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20H=C3=B8xbro=20Hansen?= Date: Thu, 6 Feb 2025 13:29:46 +0100 Subject: [PATCH 19/22] Move to filterwarning to pyproject.toml --- pyproject.toml | 4 +++- spatialpandas/io/parquet.py | 30 +++++++++++------------------- 2 files changed, 14 insertions(+), 20 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index ae3ee99..6944577 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -73,12 +73,14 @@ filterwarnings = [ "ignore:datetime.datetime.utcnow():DeprecationWarning:botocore", # https://github.com/boto/boto3/issues/3889 # 2024-11 "ignore:The legacy Dask DataFrame implementation is deprecated:FutureWarning", # https://github.com/holoviz/spatialpandas/issues/146 + # 2025-02 + "ignore:Dask annotations ..retries.. 5. detected:UserWarning", # https://github.com/dask/dask/issues/11721 ] [tool.ruff] fix = true line-length = 100 -target-version = "py39" # TODO: Remove in follow up PR +target-version = "py39" # TODO: Remove in follow up PR [tool.ruff.lint] select = [ diff --git a/spatialpandas/io/parquet.py b/spatialpandas/io/parquet.py index 6df9e8e..05979af 100644 --- a/spatialpandas/io/parquet.py +++ b/spatialpandas/io/parquet.py @@ -1,5 +1,4 @@ import json -import warnings from collections.abc import Iterable from functools import reduce from glob import has_magic @@ -172,24 +171,17 @@ def to_parquet_dask( spatial_metadata = {'partition_bounds': partition_bounds} b_spatial_metadata = json.dumps(spatial_metadata).encode('utf') - - with warnings.catch_warnings(): - warnings.filterwarnings( - "ignore", - category=UserWarning, - message="Dask annotations {'retries': 5} detected", - ) - dd_to_parquet( - ddf, - path, - engine="pyarrow", - compression=compression, - storage_options=storage_options, - custom_metadata={b'spatialpandas': b_spatial_metadata}, - write_metadata_file=True, - **engine_kwargs, - **kwargs, - ) + dd_to_parquet( + ddf, + path, + engine="pyarrow", + compression=compression, + storage_options=storage_options, + custom_metadata={b'spatialpandas': b_spatial_metadata}, + write_metadata_file=True, + **engine_kwargs, + **kwargs, + ) def read_parquet_dask( From 75f1d43b78470bb7768217a6de58c46136cde538 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20H=C3=B8xbro=20Hansen?= Date: Wed, 12 Feb 2025 08:39:41 +0100 Subject: [PATCH 20/22] remove unnessesary _partion_type --- spatialpandas/dask.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/spatialpandas/dask.py b/spatialpandas/dask.py index 54de057..aee8138 100644 --- a/spatialpandas/dask.py +++ b/spatialpandas/dask.py @@ -24,9 +24,6 @@ class DaskGeoSeries(dd.Series): - - _partition_type = GeoSeries - def __init__(self, expr, *args, **kwargs): super().__init__(expr, *args, **kwargs) @@ -128,8 +125,6 @@ def get_collection_type_series(df): class DaskGeoDataFrame(dd.DataFrame): - _partition_type = GeoDataFrame - def __init__(self, expr, *args, **kwargs): super().__init__(expr, *args, **kwargs) self._partition_sindex = {} From 94505ac0a1e5a54281bfb903d2b94ee16399d99f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20H=C3=B8xbro=20Hansen?= Date: Wed, 12 Feb 2025 08:42:15 +0100 Subject: [PATCH 21/22] simplify to_parquet --- spatialpandas/io/parquet.py | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/spatialpandas/io/parquet.py b/spatialpandas/io/parquet.py index 05979af..c78e1f5 100644 --- a/spatialpandas/io/parquet.py +++ b/spatialpandas/io/parquet.py @@ -79,18 +79,16 @@ def to_parquet( filesystem = validate_coerce_filesystem(path, filesystem, storage_options) # Standard pandas to_parquet with pyarrow engine - to_parquet_args = { - "df": df, - "path": path, - "engine": "pyarrow", - "compression": compression, - "filesystem": filesystem, - "index": index, - "storage_options": storage_options, + pd_to_parquet( + df=df, + path=path, + engine="pyarrow", + compression=compression, + filesystem=filesystem, + index=index, + storage_options=storage_options, **kwargs, - } - - pd_to_parquet(**to_parquet_args) + ) def read_parquet( From 7dbf6a0e7d55d7478ea07644304e7dd3df52e436 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20H=C3=B8xbro=20Hansen?= Date: Wed, 12 Feb 2025 09:44:11 +0100 Subject: [PATCH 22/22] Don't do dask version check --- spatialpandas/dask.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/spatialpandas/dask.py b/spatialpandas/dask.py index aee8138..c1f377b 100644 --- a/spatialpandas/dask.py +++ b/spatialpandas/dask.py @@ -14,7 +14,6 @@ from dask.dataframe.core import get_parallel_type from dask.dataframe.extensions import make_array_nonempty from dask.dataframe.utils import make_meta_obj, meta_nonempty -from packaging.version import Version from retrying import retry from .geodataframe import GeoDataFrame @@ -192,11 +191,7 @@ def pack_partitions(self, npartitions=None, p=15, shuffle='tasks'): # Set index to distance. This will trigger an expensive shuffle # sort operation - if Version(dask.__version__) >= Version('2024.1'): - shuffle_kwargs = {'shuffle_method': shuffle} - else: - shuffle_kwargs = {'shuffle': shuffle} - ddf = ddf.set_index('hilbert_distance', npartitions=npartitions, **shuffle_kwargs) + ddf = ddf.set_index('hilbert_distance', npartitions=npartitions, shuffle_method=shuffle) if ddf.npartitions != npartitions: # set_index doesn't change the number of partitions if the partitions