Skip to content

Commit 2a3afb0

Browse files
committed
Bumping version to 2.0.1
1 parent da01429 commit 2a3afb0

15 files changed

+201
-28
lines changed

README.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,13 @@
66

77
> An [AWS Professional Service](https://aws.amazon.com/professional-services/) open source initiative | [email protected]
88
9-
[![Release](https://img.shields.io/badge/release-2.0.0-brightgreen.svg)](https://pypi.org/project/awswrangler/)
9+
[![Release](https://img.shields.io/badge/release-2.0.1-brightgreen.svg)](https://pypi.org/project/awswrangler/)
1010
[![Python Version](https://img.shields.io/badge/python-3.6%20%7C%203.7%20%7C%203.8-brightgreen.svg)](https://anaconda.org/conda-forge/awswrangler)
1111
[![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black)
1212
[![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://opensource.org/licenses/Apache-2.0)
1313

1414
[![Checked with mypy](http://www.mypy-lang.org/static/mypy_badge.svg)](http://mypy-lang.org/)
15-
[![Coverage](https://img.shields.io/badge/coverage-93%25-brightgreen.svg)](https://pypi.org/project/awswrangler/)
15+
[![Coverage](https://img.shields.io/badge/coverage-92%25-brightgreen.svg)](https://pypi.org/project/awswrangler/)
1616
![Static Checking](https://github.com/awslabs/aws-data-wrangler/workflows/Static%20Checking/badge.svg?branch=master)
1717
[![Documentation Status](https://readthedocs.org/projects/aws-data-wrangler/badge/?version=latest)](https://aws-data-wrangler.readthedocs.io/?badge=latest)
1818

awswrangler/__metadata__.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,5 @@
77

88
__title__: str = "awswrangler"
99
__description__: str = "Pandas on AWS."
10-
__version__: str = "2.0.0"
10+
__version__: str = "2.0.1"
1111
__license__: str = "Apache License 2.0"

awswrangler/exceptions.py

+4
Original file line numberDiff line numberDiff line change
@@ -99,3 +99,7 @@ class NoFilesFound(Exception):
9999

100100
class InvalidDataFrame(Exception):
101101
"""InvalidDataFrame."""
102+
103+
104+
class InvalidFile(Exception):
105+
"""InvalidFile."""

awswrangler/redshift.py

+23
Original file line numberDiff line numberDiff line change
@@ -127,16 +127,21 @@ def _redshift_types_from_path(
127127
varchar_lengths_default: int,
128128
varchar_lengths: Optional[Dict[str, int]],
129129
parquet_infer_sampling: float,
130+
path_suffix: Optional[str],
131+
path_ignore_suffix: Optional[str],
130132
use_threads: bool,
131133
boto3_session: Optional[boto3.Session],
132134
s3_additional_kwargs: Optional[Dict[str, str]],
133135
) -> Dict[str, str]:
134136
"""Extract Redshift data types from a Pandas DataFrame."""
135137
_varchar_lengths: Dict[str, int] = {} if varchar_lengths is None else varchar_lengths
136138
session: boto3.Session = _utils.ensure_session(session=boto3_session)
139+
_logger.debug("Scanning parquet schemas on s3...")
137140
athena_types, _ = s3.read_parquet_metadata(
138141
path=path,
139142
sampling=parquet_infer_sampling,
143+
path_suffix=path_suffix,
144+
path_ignore_suffix=path_ignore_suffix,
140145
dataset=False,
141146
use_threads=use_threads,
142147
boto3_session=session,
@@ -167,6 +172,8 @@ def _create_table(
167172
varchar_lengths_default: int,
168173
varchar_lengths: Optional[Dict[str, int]],
169174
parquet_infer_sampling: float = 1.0,
175+
path_suffix: Optional[str] = None,
176+
path_ignore_suffix: Optional[str] = None,
170177
use_threads: bool = True,
171178
boto3_session: Optional[boto3.Session] = None,
172179
s3_additional_kwargs: Optional[Dict[str, str]] = None,
@@ -199,6 +206,8 @@ def _create_table(
199206
varchar_lengths_default=varchar_lengths_default,
200207
varchar_lengths=varchar_lengths,
201208
parquet_infer_sampling=parquet_infer_sampling,
209+
path_suffix=path_suffix,
210+
path_ignore_suffix=path_ignore_suffix,
202211
use_threads=use_threads,
203212
boto3_session=boto3_session,
204213
s3_additional_kwargs=s3_additional_kwargs,
@@ -927,6 +936,8 @@ def copy_from_files( # pylint: disable=too-many-locals,too-many-arguments
927936
primary_keys: Optional[List[str]] = None,
928937
varchar_lengths_default: int = 256,
929938
varchar_lengths: Optional[Dict[str, int]] = None,
939+
path_suffix: Optional[str] = None,
940+
path_ignore_suffix: Optional[str] = None,
930941
use_threads: bool = True,
931942
boto3_session: Optional[boto3.Session] = None,
932943
s3_additional_kwargs: Optional[Dict[str, str]] = None,
@@ -983,6 +994,16 @@ def copy_from_files( # pylint: disable=too-many-locals,too-many-arguments
983994
The size that will be set for all VARCHAR columns not specified with varchar_lengths.
984995
varchar_lengths : Dict[str, int], optional
985996
Dict of VARCHAR length by columns. (e.g. {"col1": 10, "col5": 200}).
997+
path_suffix: Union[str, List[str], None]
998+
Suffix or List of suffixes to be scanned on s3 for the schema extraction
999+
(e.g. [".gz.parquet", ".snappy.parquet"]).
1000+
Only has effect during the table creation.
1001+
If None, will try to read all files. (default)
1002+
path_ignore_suffix: Union[str, List[str], None]
1003+
Suffix or List of suffixes for S3 keys to be ignored during the schema extraction.
1004+
(e.g. [".csv", "_SUCCESS"]).
1005+
Only has effect during the table creation.
1006+
If None, will try to read all files. (default)
9861007
use_threads : bool
9871008
True to enable concurrent requests, False to disable multiple threads.
9881009
If enabled os.cpu_count() will be used as the max number of threads.
@@ -1020,6 +1041,8 @@ def copy_from_files( # pylint: disable=too-many-locals,too-many-arguments
10201041
df=None,
10211042
path=path,
10221043
parquet_infer_sampling=parquet_infer_sampling,
1044+
path_suffix=path_suffix,
1045+
path_ignore_suffix=path_ignore_suffix,
10231046
cursor=cursor,
10241047
table=table,
10251048
schema=schema,

awswrangler/s3/_copy.py

+4-1
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ def merge_datasets(
4545
source_path: str,
4646
target_path: str,
4747
mode: str = "append",
48+
ignore_empty: bool = False,
4849
use_threads: bool = True,
4950
boto3_session: Optional[boto3.Session] = None,
5051
s3_additional_kwargs: Optional[Dict[str, Any]] = None,
@@ -74,6 +75,8 @@ def merge_datasets(
7475
S3 Path for the target directory.
7576
mode: str, optional
7677
``append`` (Default), ``overwrite``, ``overwrite_partitions``.
78+
ignore_empty: bool
79+
Ignore files with 0 bytes.
7780
use_threads : bool
7881
True to enable concurrent requests, False to disable multiple threads.
7982
If enabled os.cpu_count() will be used as the max number of threads.
@@ -120,7 +123,7 @@ def merge_datasets(
120123
target_path = target_path[:-1] if target_path[-1] == "/" else target_path
121124
session: boto3.Session = _utils.ensure_session(session=boto3_session)
122125

123-
paths: List[str] = list_objects(path=f"{source_path}/", boto3_session=session)
126+
paths: List[str] = list_objects(path=f"{source_path}/", ignore_empty=ignore_empty, boto3_session=session)
124127
_logger.debug("len(paths): %s", len(paths))
125128
if len(paths) < 1:
126129
return []

awswrangler/s3/_list.py

+10-1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ def _path2list(
2020
last_modified_end: Optional[datetime.datetime] = None,
2121
suffix: Union[str, List[str], None] = None,
2222
ignore_suffix: Union[str, List[str], None] = None,
23+
ignore_empty: bool = False,
2324
) -> List[str]:
2425
"""Convert Amazon S3 path to list of objects."""
2526
_suffix: Optional[List[str]] = [suffix] if isinstance(suffix, str) else suffix
@@ -32,6 +33,7 @@ def _path2list(
3233
boto3_session=boto3_session,
3334
last_modified_begin=last_modified_begin,
3435
last_modified_end=last_modified_end,
36+
ignore_empty=ignore_empty,
3537
)
3638
elif isinstance(path, list):
3739
if last_modified_begin or last_modified_end:
@@ -72,6 +74,7 @@ def _list_objects( # pylint: disable=too-many-branches
7274
last_modified_begin: Optional[datetime.datetime] = None,
7375
last_modified_end: Optional[datetime.datetime] = None,
7476
boto3_session: Optional[boto3.Session] = None,
77+
ignore_empty: bool = False,
7578
) -> List[str]:
7679
bucket: str
7780
prefix_original: str
@@ -94,7 +97,9 @@ def _list_objects( # pylint: disable=too-many-branches
9497
if contents is not None:
9598
for content in contents:
9699
key: str = content["Key"]
97-
if (content is not None) and ("Key" in content):
100+
if ignore_empty and content.get("Size", 0) == 0:
101+
_logger.debug("Skipping empty file: %s", f"s3://{bucket}/{key}")
102+
elif (content is not None) and ("Key" in content):
98103
if (_suffix is None) or key.endswith(tuple(_suffix)):
99104
if last_modified_begin is not None:
100105
if content["LastModified"] < last_modified_begin:
@@ -212,6 +217,7 @@ def list_objects(
212217
ignore_suffix: Union[str, List[str], None] = None,
213218
last_modified_begin: Optional[datetime.datetime] = None,
214219
last_modified_end: Optional[datetime.datetime] = None,
220+
ignore_empty: bool = False,
215221
boto3_session: Optional[boto3.Session] = None,
216222
) -> List[str]:
217223
"""List Amazon S3 objects from a prefix.
@@ -238,6 +244,8 @@ def list_objects(
238244
last_modified_end: datetime, optional
239245
Filter the s3 files by the Last modified date of the object.
240246
The filter is applied only after list all s3 files.
247+
ignore_empty: bool
248+
Ignore files with 0 bytes.
241249
boto3_session : boto3.Session(), optional
242250
Boto3 Session. The default boto3 session will be used if boto3_session receive None.
243251
@@ -270,5 +278,6 @@ def list_objects(
270278
boto3_session=boto3_session,
271279
last_modified_begin=last_modified_begin,
272280
last_modified_end=last_modified_end,
281+
ignore_empty=ignore_empty,
273282
)
274283
return [p for p in paths if not p.endswith("/")]

awswrangler/s3/_read_parquet.py

+51-11
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
import boto3
1313
import pandas as pd
1414
import pyarrow as pa
15-
import pyarrow.lib
1615
import pyarrow.parquet
1716

1817
from awswrangler import _data_types, _utils, exceptions
@@ -32,9 +31,21 @@
3231
_logger: logging.Logger = logging.getLogger(__name__)
3332

3433

34+
def _pyarrow_parquet_file_wrapper(
35+
source: Any, read_dictionary: Optional[List[str]] = None
36+
) -> pyarrow.parquet.ParquetFile:
37+
try:
38+
return pyarrow.parquet.ParquetFile(source=source, read_dictionary=read_dictionary)
39+
except pyarrow.ArrowInvalid as ex:
40+
if str(ex) == "Parquet file size is 0 bytes":
41+
_logger.warning("Ignoring empty file...xx")
42+
return None
43+
raise
44+
45+
3546
def _read_parquet_metadata_file(
3647
path: str, boto3_session: boto3.Session, s3_additional_kwargs: Optional[Dict[str, str]], use_threads: bool
37-
) -> Dict[str, str]:
48+
) -> Optional[Dict[str, str]]:
3849
with open_s3_object(
3950
path=path,
4051
mode="rb",
@@ -43,7 +54,9 @@ def _read_parquet_metadata_file(
4354
s3_additional_kwargs=s3_additional_kwargs,
4455
boto3_session=boto3_session,
4556
) as f:
46-
pq_file: pyarrow.parquet.ParquetFile = pyarrow.parquet.ParquetFile(source=f)
57+
pq_file: Optional[pyarrow.parquet.ParquetFile] = _pyarrow_parquet_file_wrapper(source=f)
58+
if pq_file is None:
59+
return None
4760
return _data_types.athena_types_from_pyarrow_schema(schema=pq_file.schema.to_arrow_schema(), partitions=None)[0]
4861

4962

@@ -55,7 +68,7 @@ def _read_schemas_from_files(
5568
s3_additional_kwargs: Optional[Dict[str, str]],
5669
) -> Tuple[Dict[str, str], ...]:
5770
paths = _utils.list_sampling(lst=paths, sampling=sampling)
58-
schemas: Tuple[Dict[str, str], ...] = tuple()
71+
schemas: Tuple[Optional[Dict[str, str]], ...] = tuple()
5972
n_paths: int = len(paths)
6073
if use_threads is False or n_paths == 1:
6174
schemas = tuple(
@@ -76,6 +89,7 @@ def _read_schemas_from_files(
7689
itertools.repeat(use_threads),
7790
)
7891
)
92+
schemas = cast(Tuple[Dict[str, str], ...], tuple(x for x in schemas if x is not None))
7993
_logger.debug("schemas: %s", schemas)
8094
return schemas
8195

@@ -125,6 +139,7 @@ def _read_parquet_metadata(
125139
path: Union[str, List[str]],
126140
path_suffix: Optional[str],
127141
path_ignore_suffix: Optional[str],
142+
ignore_empty: bool,
128143
dtype: Optional[Dict[str, str]],
129144
sampling: float,
130145
dataset: bool,
@@ -139,6 +154,7 @@ def _read_parquet_metadata(
139154
boto3_session=boto3_session,
140155
suffix=path_suffix,
141156
ignore_suffix=_get_path_ignore_suffix(path_ignore_suffix=path_ignore_suffix),
157+
ignore_empty=ignore_empty,
142158
)
143159

144160
# Files
@@ -279,7 +295,11 @@ def _read_parquet_chunked(
279295
s3_additional_kwargs=s3_additional_kwargs,
280296
boto3_session=boto3_session,
281297
) as f:
282-
pq_file: pyarrow.parquet.ParquetFile = pyarrow.parquet.ParquetFile(source=f, read_dictionary=categories)
298+
pq_file: Optional[pyarrow.parquet.ParquetFile] = _pyarrow_parquet_file_wrapper(
299+
source=f, read_dictionary=categories
300+
)
301+
if pq_file is None:
302+
continue
283303
schema: Dict[str, str] = _data_types.athena_types_from_pyarrow_schema(
284304
schema=pq_file.schema.to_arrow_schema(), partitions=None
285305
)[0]
@@ -342,7 +362,11 @@ def _read_parquet_file(
342362
s3_additional_kwargs=s3_additional_kwargs,
343363
boto3_session=boto3_session,
344364
) as f:
345-
pq_file: pyarrow.parquet.ParquetFile = pyarrow.parquet.ParquetFile(source=f, read_dictionary=categories)
365+
pq_file: Optional[pyarrow.parquet.ParquetFile] = _pyarrow_parquet_file_wrapper(
366+
source=f, read_dictionary=categories
367+
)
368+
if pq_file is None:
369+
raise exceptions.InvalidFile(f"Invalid Parquet file: {path}")
346370
return pq_file.read(columns=columns, use_threads=False, use_pandas_metadata=False)
347371

348372

@@ -362,7 +386,11 @@ def _count_row_groups(
362386
s3_additional_kwargs=s3_additional_kwargs,
363387
boto3_session=boto3_session,
364388
) as f:
365-
pq_file: pyarrow.parquet.ParquetFile = pyarrow.parquet.ParquetFile(source=f, read_dictionary=categories)
389+
pq_file: Optional[pyarrow.parquet.ParquetFile] = _pyarrow_parquet_file_wrapper(
390+
source=f, read_dictionary=categories
391+
)
392+
if pq_file is None:
393+
return 0
366394
n: int = cast(int, pq_file.num_row_groups)
367395
_logger.debug("Row groups count: %d", n)
368396
return n
@@ -401,6 +429,7 @@ def read_parquet(
401429
path: Union[str, List[str]],
402430
path_suffix: Union[str, List[str], None] = None,
403431
path_ignore_suffix: Union[str, List[str], None] = None,
432+
ignore_empty: bool = True,
404433
partition_filter: Optional[Callable[[Dict[str, str]], bool]] = None,
405434
columns: Optional[List[str]] = None,
406435
validate_schema: bool = False,
@@ -453,9 +482,13 @@ def read_parquet(
453482
S3 prefix (accepts Unix shell-style wildcards)
454483
(e.g. s3://bucket/prefix) or list of S3 objects paths (e.g. [s3://bucket/key0, s3://bucket/key1]).
455484
path_suffix: Union[str, List[str], None]
456-
Suffix or List of suffixes for filtering S3 keys.
485+
Suffix or List of suffixes to be read (e.g. [".gz.parquet", ".snappy.parquet"]).
486+
If None, will try to read all files. (default)
457487
path_ignore_suffix: Union[str, List[str], None]
458-
Suffix or List of suffixes for S3 keys to be ignored.
488+
Suffix or List of suffixes for S3 keys to be ignored.(e.g. [".csv", "_SUCCESS"]).
489+
If None, will try to read all files. (default)
490+
ignore_empty: bool
491+
Ignore files with 0 bytes.
459492
partition_filter: Optional[Callable[[Dict[str, str]], bool]]
460493
Callback Function filters to apply on PARTITION columns (PUSH-DOWN filter).
461494
This function MUST receive a single argument (Dict[str, str]) where keys are partitions
@@ -543,6 +576,7 @@ def read_parquet(
543576
ignore_suffix=_get_path_ignore_suffix(path_ignore_suffix=path_ignore_suffix),
544577
last_modified_begin=last_modified_begin,
545578
last_modified_end=last_modified_end,
579+
ignore_empty=ignore_empty,
546580
)
547581
path_root: Optional[str] = _get_path_root(path=path, dataset=dataset)
548582
if path_root is not None:
@@ -727,6 +761,7 @@ def read_parquet_metadata(
727761
path: Union[str, List[str]],
728762
path_suffix: Optional[str] = None,
729763
path_ignore_suffix: Optional[str] = None,
764+
ignore_empty: bool = True,
730765
dtype: Optional[Dict[str, str]] = None,
731766
sampling: float = 1.0,
732767
dataset: bool = False,
@@ -754,9 +789,13 @@ def read_parquet_metadata(
754789
S3 prefix (accepts Unix shell-style wildcards)
755790
(e.g. s3://bucket/prefix) or list of S3 objects paths (e.g. [s3://bucket/key0, s3://bucket/key1]).
756791
path_suffix: Union[str, List[str], None]
757-
Suffix or List of suffixes for filtering S3 keys.
792+
Suffix or List of suffixes to be read (e.g. [".gz.parquet", ".snappy.parquet"]).
793+
If None, will try to read all files. (default)
758794
path_ignore_suffix: Union[str, List[str], None]
759-
Suffix or List of suffixes for S3 keys to be ignored.
795+
Suffix or List of suffixes for S3 keys to be ignored.(e.g. [".csv", "_SUCCESS"]).
796+
If None, will try to read all files. (default)
797+
ignore_empty: bool
798+
Ignore files with 0 bytes.
760799
dtype : Dict[str, str], optional
761800
Dictionary of columns names and Athena/Glue types to be casted.
762801
Useful when you have columns with undetermined data types as partitions columns.
@@ -804,6 +843,7 @@ def read_parquet_metadata(
804843
path=path,
805844
path_suffix=path_suffix,
806845
path_ignore_suffix=path_ignore_suffix,
846+
ignore_empty=ignore_empty,
807847
dtype=dtype,
808848
sampling=sampling,
809849
dataset=dataset,

0 commit comments

Comments
 (0)