Skip to content
Open
Changes from 11 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
7bc1e8a
Create class CLPRemoteHandler
IreneLime Jan 3, 2025
0ae4a2a
Create multipart upload initiation
IreneLime Jan 3, 2025
8895b37
Create function that performs multipart upload on 5mb segments
IreneLime Jan 3, 2025
7475ed5
Create function that completes multipart upload
IreneLime Jan 3, 2025
96a831b
Handle corner case: file rotation when part number exceeds 10000
IreneLime Jan 3, 2025
055eaf0
Integrate timeout functionalities for CLPLogLevelTimeout usage and cl…
IreneLime Jan 3, 2025
b32b5f2
Avoid new upload initialization when previous upload has not completed
IreneLime Jan 3, 2025
ed3a57b
Ensure consistent string quotation
IreneLime Jan 3, 2025
964ae20
Remove print statements and fix aws segment limitation error
IreneLime Jan 5, 2025
1fa0163
Modify code to pass typing tests.
IreneLime Jan 5, 2025
85cac30
Code format & linting test
IreneLime Jan 5, 2025
065c28c
Redesign CLPS3Handler architecture and complete a functional draft of…
IreneLime Feb 9, 2025
828897e
Add aws credential configuration
IreneLime Mar 17, 2025
2267849
Add rotation after 10000 parts
IreneLime Mar 17, 2025
ba5c3dd
Fix part limit rotation
IreneLime Mar 18, 2025
a9ec06d
Fix aws credential checking
IreneLime Mar 18, 2025
0aef71d
Enable sha256 checksum on multipart upload
IreneLime Mar 18, 2025
4839528
Add error checking throughout the upload process
IreneLime Mar 18, 2025
8ab75d0
Define macros and add options to specify multipart upload size
IreneLime Mar 21, 2025
c273d0c
Always define remote file index
IreneLime Mar 21, 2025
b737d92
Use unix timestamp on file name
IreneLime Mar 21, 2025
8f94e68
Add configurable parameter on users's s3 directory
IreneLime Mar 21, 2025
b70df58
Updated pyproject.toml to include new dev and testing dependencies fo…
li-ruihao Mar 21, 2025
587d6cb
Modify naming of non-interface level variables
IreneLime Mar 25, 2025
9c8653a
Remove duplicate code
IreneLime Mar 25, 2025
be9bcba
Add PutObject option to flush to remote
IreneLime Mar 25, 2025
cfc7f6d
Merge branch 'main' into remote-handler
IreneLime Mar 25, 2025
8744baa
Add comments and error checking
IreneLime Mar 25, 2025
238d625
Add comments to explain parameters
IreneLime Mar 26, 2025
2c0a34a
Maintain consistent naming in S3 handler
IreneLime Mar 26, 2025
b448143
Fix object key naming issue
IreneLime Mar 26, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
246 changes: 246 additions & 0 deletions src/clp_logging/remote_handlers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
import base64
import datetime
import hashlib
from pathlib import Path
from typing import Any, Dict, List, Optional

import boto3
from botocore.exceptions import NoCredentialsError

from clp_logging.handlers import CLPFileHandler


class CLPRemoteHandler(CLPFileHandler):
"""
Handles CLP file upload and comparison to AWS S3 bucket. Configuration of
AWS access key is required. Run command `aws configure`.
Copy link
Member

@junhaoliao junhaoliao Jan 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Configuration of AWS access key is required. Run command aws configure.

If I'm not wrong the credentials are stored in ~/.aws by aws configure. Let's try to make the credentials directly configurable in our handler's configuration (probably via some __init__ parameter just like how the HTTPHandler requires a credentials parameter). Only when such credentials are not provided, we fallback to credentials in ~/.aws.


:param s3_bucket: Name of the AWS S3 Bucket where log files are transferred
"""

def __init__(
self,
s3_bucket: str,
) -> None:
self.s3_resource: boto3.resources.factory.s3.ServiceResource = boto3.resource("s3")
self.s3_client: boto3.client = boto3.client("s3")
self.bucket: str = s3_bucket

self.log_name: Optional[str] = None
self.log_path: Optional[Path] = None
self.remote_folder_path: Optional[str] = None
self.obj_key: Optional[str] = None

self.multipart_upload_config: Dict[str, int] = {
"size": 1024 * 1024 * 5,
"index": 1,
"pos": 0,
}
self.uploaded_parts: List[Dict[str, int | str]] = []
self.upload_id: Optional[int] = None
self.remote_file_count: int = 0
self.upload_in_progress: bool = False

def _calculate_part_sha256(self, data: bytes) -> str:
sha256_hash: hashlib._Hash = hashlib.sha256()
sha256_hash.update(data)
return base64.b64encode(sha256_hash.digest()).decode("utf-8")

def _remote_log_naming(self, timestamp: datetime.datetime) -> str:
if self.log_name is None:
raise ValueError("No input file.")

new_filename: str
ext: int = self.log_name.find(".")
upload_time: str = timestamp.strftime("%Y-%m-%d-%H%M%S")
# Naming of multiple remote files from the same local file
if self.remote_file_count != 0:
upload_time += "-" + str(self.remote_file_count)

if ext != -1:
new_filename = f"log_{upload_time}{self.log_name[ext:]}"
else:
new_filename = f"{upload_time}_{self.log_name}"
new_filename = f"{self.remote_folder_path}/{new_filename}"
return new_filename

def _upload_part(self) -> Dict[str, int | str]:
if self.log_path is None:
raise ValueError("No input file.")

upload_data: bytes
# Read the latest version of the file
try:
with open(self.log_path, "rb") as file:
file.seek(self.multipart_upload_config["pos"])
upload_data = file.read(self.multipart_upload_config["size"])
except FileNotFoundError as e:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Handle empty or incomplete data reads more gracefully.
If the file has no new data, _upload_part may attempt to upload an empty byte array, which could be wasteful or unneeded. Ensure that logic checks for empty data before proceeding with the upload.

raise FileNotFoundError(f"The log file {self.log_path} cannot be found: {e}") from e
except IOError as e:
raise IOError(f"IO Error occurred while reading file {self.log_path}: {e}") from e
except Exception as e:
raise e

try:
sha256_checksum: str = self._calculate_part_sha256(upload_data)
response: Dict[str, Any] = self.s3_client.upload_part(
Bucket=self.bucket,
Key=self.obj_key,
Body=upload_data,
PartNumber=self.multipart_upload_config["index"],
UploadId=self.upload_id,
ChecksumSHA256=sha256_checksum,
)

# Store both ETag and SHA256 for validation
return {
"PartNumber": self.multipart_upload_config["index"],
"ETag": response["ETag"],
"ChecksumSHA256": response["ChecksumSHA256"],
}
except Exception as e:
self.s3_client.abort_multipart_upload(
Bucket=self.bucket, Key=self.obj_key, UploadId=self.upload_id
)
raise Exception(
f'Multipart Upload on Part {self.multipart_upload_config["index"]}: {e}'
) from e

def get_obj_key(self) -> str | None:
return self.obj_key

def set_obj_key(self, obj_key: str) -> None:
self.obj_key = obj_key

def initiate_upload(self, log_path: Path) -> None:
if self.upload_in_progress:
raise Exception("An upload is already in progress. Cannot initiate another upload.")

self.log_path = log_path
self.log_name = log_path.name
self.upload_in_progress = True
timestamp: datetime.datetime = datetime.datetime.now()
self.remote_folder_path = f"logs/{timestamp.year}/{timestamp.month}/{timestamp.day}"

self.obj_key = self._remote_log_naming(timestamp)
create_ret: Dict[str, Any] = self.s3_client.create_multipart_upload(
Bucket=self.bucket, Key=self.obj_key, ChecksumAlgorithm="SHA256"
)
self.upload_id = create_ret["UploadId"]

def multipart_upload(self) -> None:
# Upload initiation is required before multipart_upload
if not self.upload_id:
raise Exception("No upload process.")
if self.log_path is None:
raise ValueError("No input file.")

file_size: int = self.log_path.stat().st_size
try:
while (
file_size - self.multipart_upload_config["pos"]
>= self.multipart_upload_config["size"]
):
# Perform upload and label the uploaded part
upload_status: Dict[str, int | str] = self._upload_part()
self.multipart_upload_config["index"] += 1
self.multipart_upload_config["pos"] += self.multipart_upload_config["size"]
self.uploaded_parts.append(upload_status)

# AWS S3 limits object part count to 10000
if self.multipart_upload_config["index"] >= 10000:
self.s3_client.complete_multipart_upload(
Bucket=self.bucket,
Key=self.obj_key,
UploadId=self.upload_id,
MultipartUpload={
"Parts": [
{
"PartNumber": part["PartNumber"],
"ETag": part["ETag"],
"ChecksumSHA256": part["ChecksumSHA256"],
}
for part in self.uploaded_parts
]
},
)

# Initiate multipart upload to a new S3 object
self.remote_file_count += 1
timestamp: datetime.datetime = datetime.datetime.now()
self.remote_folder_path = (
f"logs/{timestamp.year}/{timestamp.month}/{timestamp.day}"
)
self.obj_key = self._remote_log_naming(timestamp)
self.multipart_upload_config["index"] = 1
self.uploaded_parts = []
create_ret = self.s3_client.create_multipart_upload(
Bucket=self.bucket, Key=self.obj_key, ChecksumAlgorithm="SHA256"
)
self.upload_id = create_ret["UploadId"]

except NoCredentialsError as e:
raise e
except Exception as e:
self.s3_client.abort_multipart_upload(
Bucket=self.bucket, Key=self.obj_key, UploadId=self.upload_id
)
raise e

def complete_upload(self) -> None:
# Upload initiation is required before complete_upload
if not self.upload_id:
raise Exception("No upload process to complete.")
if self.log_path is None:
raise ValueError("No input file.")

file_size: int = self.log_path.stat().st_size
try:
# Upload the remaining segment
if (
file_size - self.multipart_upload_config["pos"]
< self.multipart_upload_config["size"]
):
self.multipart_upload_config["size"] = (
file_size - self.multipart_upload_config["pos"]
)
upload_status: Dict[str, int | str] = self._upload_part()
self.multipart_upload_config["index"] += 1
self.uploaded_parts.append(upload_status)
except Exception as e:
self.s3_client.abort_multipart_upload(
Bucket=self.bucket, Key=self.obj_key, UploadId=self.upload_id
)
raise e

self.s3_client.complete_multipart_upload(
Bucket=self.bucket,
Key=self.obj_key,
UploadId=self.upload_id,
MultipartUpload={
"Parts": [
{
"PartNumber": part["PartNumber"],
"ETag": part["ETag"],
"ChecksumSHA256": part["ChecksumSHA256"],
}
for part in self.uploaded_parts
]
},
)
self.upload_in_progress = False
self.upload_id = None
self.obj_key = None

def timeout(self, log_path: Path) -> None:
# Upload latest segment upon CLPLogLevelTimeout
if not self.upload_id:
super().__init__(fpath=log_path)
self.initiate_upload(log_path)

self.multipart_upload()

def close(self) -> None:
super().close()
if self.closed:
self.complete_upload()
Loading