Skip to content
This repository was archived by the owner on May 31, 2023. It is now read-only.

Commit

Permalink
Update copy function to use boto3
Browse files Browse the repository at this point in the history
The s3fs copy function was attemting to use a multipart upload to copy,
which was failing. Switching to boto3 resolves this issue.
  • Loading branch information
accorvin committed Jan 25, 2022
1 parent 7e57def commit 8f79a9b
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 23 deletions.
2 changes: 1 addition & 1 deletion solgate/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def copy(files: List[S3File]) -> None:
log_args = dict(source=dict(client=a.client, key=a.key), destination=dict(client=b.client, key=b.key))
if a.client == b.client:
logger.info("Copying within the same clients", log_args)
a.client.copy(a.key, b.key)
a.client.copy(a.client.bucket, a.key, b.client.bucket, b.key)
continue

logger.info("Copying to a different client", log_args)
Expand Down
24 changes: 15 additions & 9 deletions solgate/utils/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,13 @@ def __init__(
secret=self.aws_secret_access_key,
client_kwargs=dict(endpoint_url=self.endpoint_url),
)
self.boto3 = boto3.resource(
self.s3_client = boto3.resource(
"s3",
aws_access_key_id=self.aws_access_key_id,
aws_secret_access_key=self.aws_secret_access_key,
endpoint_url=self.endpoint_url,
).Bucket(self.__base_path.split("/")[0])
)
self.boto3 = self.s3_client.Bucket(self.__base_path.split("/")[0])

@classmethod
def from_config_file(
Expand Down Expand Up @@ -166,19 +167,24 @@ def rm(self, path: str) -> None:
"""
return self.s3fs.rm(f"{self.__base_path}/{path}")

def copy(self, source: str, dest: str, dest_base_path: str = None) -> None:
def copy(self, source_bucket: str, source_key: str, dest_bucket: str, dest_key: str) -> None:
"""Copy files within a bucket.
Args:
source (str): Source path
dest (str): Destination path
dest_base_path (str, optional): Bucket name and base path to the destinatation within
the same client
source_bucket (str): Source bucket
source_key (str): Source key within bucket
dest_bucket (str): Destination bucket
dest_key (str): Destination key within destination bucket
"""
dest_base_path = dest_base_path or self.__base_path

return self.s3fs.copy(f"{self.__base_path}/{source}", f"{dest_base_path.rstrip('/')}/{dest}")
if dest_key is None or dest_key == '':
dest_key = source_key
copy_source = {
'Bucket': source_bucket,
'Key': '/'.join([self.path, source_key]).lstrip('/')
}
return self.s3_client.meta.client.copy(copy_source, dest_bucket, '/'.join([self.path, dest_key]).lstrip('/'))

def __eq__(self, other: object) -> bool:
"""Compare S3FileSystem to other objects."""
Expand Down
9 changes: 7 additions & 2 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,14 @@ def mocked_s3(fixture_dir, request):
for instance in s3fs_instances:
instance.s3fs = s3fs.S3FileSystem(key=instance.aws_access_key_id, secret=instance.aws_secret_access_key)
instance.s3fs.s3.create_bucket(Bucket=instance._S3FileSystem__base_path.split("/")[0])
instance.boto3 = boto3.resource(
instance.s3_client = boto3.resource(
"s3", aws_access_key_id=instance.aws_access_key_id, aws_secret_access_key=instance.aws_secret_access_key
).Bucket(instance._S3FileSystem__base_path.split("/")[0])
)
instance.s3c = boto3.client(
"s3", aws_access_key_id=instance.aws_access_key_id, aws_secret_access_key=instance.aws_secret_access_key
)
instance.boto3 = instance.s3_client.Bucket(instance._S3FileSystem__base_path.split("/")[0])
instance.s3_client.create_bucket(Bucket=instance._S3FileSystem__base_path.split("/")[0])
yield s3fs_instances


Expand Down
6 changes: 3 additions & 3 deletions tests/transfer_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ def test__transfer_single_file_dry_run(mocked_s3, mocker):
@pytest.mark.parametrize("mocked_s3", ["same_client.yaml", "same_flags.yaml"], indirect=["mocked_s3"])
def test__transfer_single_file_same_client(mocked_s3):
"""Should transfer faster between the same clients."""
mocked_s3[0].s3fs.touch("BUCKET/a/b.csv")
mocked_s3[0].s3c.put_object(Bucket='BUCKET', Key='a/b.csv', Body='foo')
files = ["a/b.csv", "a-copy/b.csv"]

assert transfer._transfer_single_file("a/b.csv", mocked_s3, 1, 1) is None
Expand Down Expand Up @@ -167,14 +167,14 @@ def test__transfer_single_file_fails(mocked_s3, disable_backoff):
@pytest.mark.parametrize("mocked_s3", ["same_client.yaml"], indirect=["mocked_s3"])
def test_copy_same_client(mocked_s3, mocker):
"""Should use client.copy when the clients are the same."""
mocked_s3[0].s3fs.touch("BUCKET/a/b.csv")
mocked_s3[0].s3c.put_object(Bucket='BUCKET', Key='a/b.csv', Body='foo')
spies_copy = [mocker.spy(client, "copy") for client in mocked_s3]
spies_open = [mocker.spy(client, "open") for client in mocked_s3]

transfer.copy([S3File(client, "a/b.csv") for client in mocked_s3])

[spy.assert_not_called() for spy in spies_open]
spies_copy[0].assert_called_once_with("a/b.csv", "a/b.csv")
spies_copy[0].assert_called_once_with("BUCKET", "a/b.csv", "BUCKET", "a/b.csv")
spies_copy[1].assert_not_called()


Expand Down
16 changes: 8 additions & 8 deletions tests/utils/s3_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,21 +75,21 @@ def test_s3_file_system_str():


@pytest.mark.parametrize(
"dest_base,result",
"dest_key,result",
[
(None, "BUCKET/c/d.csv"),
("BUCKET/different_base", "BUCKET/different_base/c/d.csv"),
("BUCKET/base_with_slash/", "BUCKET/base_with_slash/c/d.csv"),
(None, "a/b.csv"),
("", "a/b.csv"),
("c/d.csv", "c/d.csv")
],
)
@pytest.mark.parametrize("mocked_s3", ["same_client.yaml"], indirect=["mocked_s3"])
def test_s3_file_system_copy(dest_base, result, mocked_s3):
def test_s3_file_system_copy(dest_key, result, mocked_s3):
"""Should copy within the same S3fs."""
fs = mocked_s3[0]
fs.s3fs.touch("BUCKET/a/b.csv")
fs.copy("a/b.csv", "c/d.csv", dest_base)
fs.s3c.put_object(Bucket='BUCKET', Key='a/b.csv', Body='foo')
fs.copy("BUCKET", "a/b.csv", "BUCKET", dest_key)

assert fs.s3fs.find(result)
assert fs.s3c.get_object(Bucket='BUCKET', Key=result)


@pytest.mark.parametrize(
Expand Down

0 comments on commit 8f79a9b

Please sign in to comment.