Skip to content
Open
Changes from 14 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
7bc1e8a
Create class CLPRemoteHandler
IreneLime Jan 3, 2025
0ae4a2a
Create multipart upload initiation
IreneLime Jan 3, 2025
8895b37
Create function that performs multipart upload on 5mb segments
IreneLime Jan 3, 2025
7475ed5
Create function that completes multipart upload
IreneLime Jan 3, 2025
96a831b
Handle corner case: file rotation when part number exceeds 10000
IreneLime Jan 3, 2025
055eaf0
Integrate timeout functionalities for CLPLogLevelTimeout usage and cl…
IreneLime Jan 3, 2025
b32b5f2
Avoid new upload initialization when previous upload has not completed
IreneLime Jan 3, 2025
ed3a57b
Ensure consistent string quotation
IreneLime Jan 3, 2025
964ae20
Remove print statements and fix aws segment limitation error
IreneLime Jan 5, 2025
1fa0163
Modify code to pass typing tests.
IreneLime Jan 5, 2025
85cac30
Code format & linting test
IreneLime Jan 5, 2025
065c28c
Redesign CLPS3Handler architecture and complete a functional draft of…
IreneLime Feb 9, 2025
828897e
Add aws credential configuration
IreneLime Mar 17, 2025
2267849
Add rotation after 10000 parts
IreneLime Mar 17, 2025
ba5c3dd
Fix part limit rotation
IreneLime Mar 18, 2025
a9ec06d
Fix aws credential checking
IreneLime Mar 18, 2025
0aef71d
Enable sha256 checksum on multipart upload
IreneLime Mar 18, 2025
4839528
Add error checking throughout the upload process
IreneLime Mar 18, 2025
8ab75d0
Define macros and add options to specify multipart upload size
IreneLime Mar 21, 2025
c273d0c
Always define remote file index
IreneLime Mar 21, 2025
b737d92
Use unix timestamp on file name
IreneLime Mar 21, 2025
8f94e68
Add configurable parameter on users's s3 directory
IreneLime Mar 21, 2025
b70df58
Updated pyproject.toml to include new dev and testing dependencies fo…
li-ruihao Mar 21, 2025
587d6cb
Modify naming of non-interface level variables
IreneLime Mar 25, 2025
9c8653a
Remove duplicate code
IreneLime Mar 25, 2025
be9bcba
Add PutObject option to flush to remote
IreneLime Mar 25, 2025
cfc7f6d
Merge branch 'main' into remote-handler
IreneLime Mar 25, 2025
8744baa
Add comments and error checking
IreneLime Mar 25, 2025
238d625
Add comments to explain parameters
IreneLime Mar 26, 2025
2c0a34a
Maintain consistent naming in S3 handler
IreneLime Mar 26, 2025
b448143
Fix object key naming issue
IreneLime Mar 26, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
182 changes: 181 additions & 1 deletion src/clp_logging/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,16 @@
from signal import SIGINT, signal, SIGTERM
from threading import Thread, Timer
from types import FrameType
from typing import Callable, ClassVar, Dict, IO, Optional, Tuple, Union
from typing import Any, Callable, ClassVar, Dict, List, IO, Optional, Tuple, Union

import tzlocal
from clp_ffi_py.ir import FourByteEncoder
from zstandard import FLUSH_FRAME, ZstdCompressionWriter, ZstdCompressor

import datetime
import io
import boto3

from clp_logging.protocol import (
BYTE_ORDER,
EOF_CHAR,
Expand Down Expand Up @@ -792,3 +796,179 @@ def __init__(
super().__init__(
open(fpath, mode), enable_compression, timestamp_format, timezone, loglevel_timeout
)


class CLPS3Handler(CLPBaseHandler):
"""
Log is written to stream in CLP IR encoding, and uploaded to s3_bucket

:param s3_bucket: S3 bucket to upload CLP encoded log messages to
"""

def init(self, stream: IO[bytes]) -> None:
self.cctx: ZstdCompressor = ZstdCompressor()
self.ostream: Union[ZstdCompressionWriter, IO[bytes]] = (
self.cctx.stream_writer(self.local_buffer) if self.enable_compression else stream
)
self.last_timestamp_ms: int = floor(time.time() * 1000) # convert to ms and truncate
self.ostream.write(
FourByteEncoder.encode_preamble(
self.last_timestamp_ms, self.timestamp_format, self.timezone
)
)

def __init__(
self,
s3_bucket: str,
stream: Optional[IO[bytes]] = None,
enable_compression: bool = True,
timestamp_format: Optional[str] = None,
timezone: Optional[str] = None,
aws_access_key_id: Optional[str] = None,
aws_secret_access_key: Optional[str] = None
) -> None:
super().__init__()
self.closed: bool = False
self.enable_compression: bool = enable_compression
self.local_buffer: io.BytesIO = io.BytesIO()
if stream is None:
stream = self.local_buffer
self.ostream: IO[bytes] = stream
self.timestamp_format: str
self.timezone: str
self.timestamp_format, self.timezone = _init_timeinfo(timestamp_format, timezone)
self.init(stream)

self.s3_bucket: str = s3_bucket
self.remote_folder_path: Optional[str] = None
self.remote_file_count: int = 0
self.start_timestamp: datetime = datetime.datetime.now()
self.obj_key: str = self._remote_log_naming(self.start_timestamp)
self.s3_resource: boto3.resources.factory.s3.ServiceResource = boto3.resource("s3")
self.s3_client: boto3.client
if aws_access_key_id and aws_secret_access_key:
self.s3_client = boto3.client("s3", aws_access_key_id, aws_secret_access_key)
else:
self.s3_client = boto3.client("s3")
self.buffer_size: int = 1024 * 1024 * 5
self.uploaded_parts: List[Dict[str, int | str]] = []
self.upload_index: int = 1
create_ret: Dict[str, Any] = self.s3_client.create_multipart_upload(
Bucket=self.s3_bucket, Key=self.obj_key#, ChecksumAlgorithm="SHA256"
)
self.upload_id: int = create_ret["UploadId"]


def _remote_log_naming(self, timestamp: datetime.datetime) -> str:
self.remote_folder_path: str = f"logs/{timestamp.year}/{timestamp.month}/{timestamp.day}"

new_filename: str
upload_time: str = timestamp.strftime("%Y-%m-%d-%H%M%S")

file_count: str = ""
if self.remote_file_count != 0:
file_count = f"-{self.remote_file_count}"

# HARD-CODED TO .clp.zst FOR NOW
if self.enable_compression:
new_filename = f"{self.remote_folder_path}/{upload_time}_log{file_count}.clp.zst"
else:
new_filename = f"{self.remote_folder_path}/{upload_time}_log{file_count}.clp"
return new_filename

# override
def _write(self, loglevel: int, msg: str) -> None:
if self.closed:
raise RuntimeError("Stream already closed")
clp_msg: bytearray
clp_msg, self.last_timestamp_ms = _encode_log_event(msg, self.last_timestamp_ms)
self.ostream.write(clp_msg)
self._flush()
if self.local_buffer.tell() >= self.buffer_size:
self.upload_index += 1
self.local_buffer.seek(0)
self.local_buffer.truncate(0)

# Rotate after 10000 parts (limitaion by s3)
if self.upload_index > 10000:
self.remote_file_count += 1
self.obj_key = self._remote_log_naming(self.start_timestamp)
self.uploaded_parts = []
self.upload_index = 1
create_ret = self.s3_client.create_multipart_upload(
Bucket=self.s3_bucket, Key=self.obj_key#, ChecksumAlgorithm="SHA256"
)
self.upload_id = create_ret["UploadId"]

def _flush(self) -> None:
self.ostream.flush()
data = self.local_buffer.getvalue()

try:
# sha256_checksum: str = self._calculate_part_sha256(upload_data)
response: Dict[str, Any] = self.s3_client.upload_part(
Bucket=self.s3_bucket,
Key=self.obj_key,
Body=data,
PartNumber=self.upload_index,
UploadId=self.upload_id,
# ChecksumSHA256=sha256_checksum,
)

# Store both ETag and SHA256 for validation
upload_status: Dict[str, int | str] = {
"PartNumber": self.upload_index,
"ETag": response["ETag"],
# "ChecksumSHA256": response["ChecksumSHA256"],
}

# Determine the part to which the new upload_status belongs
if len(self.uploaded_parts) > self.upload_index - 1:
self.uploaded_parts[self.upload_index-1] = upload_status
else:
self.uploaded_parts.append(upload_status)

except Exception as e:
self.s3_client.abort_multipart_upload(
Bucket=self.s3_bucket, Key=self.obj_key, UploadId=self.upload_id
)
raise Exception(
f'Multipart Upload on Part {self.upload_index}: {e}'
) from e


# override
def close(self) -> None:
self.ostream.write(EOF_CHAR)
self._flush()
self.local_buffer.seek(0)
self.local_buffer.truncate(0)

try:
print(self.uploaded_parts)
self.s3_client.complete_multipart_upload(
Bucket=self.s3_bucket,
Key=self.obj_key,
UploadId=self.upload_id,
MultipartUpload={
"Parts": [
{
"PartNumber": part["PartNumber"],
"ETag": part["ETag"],
# "ChecksumSHA256": part["ChecksumSHA256"],
}
for part in self.uploaded_parts
]
},
)
except Exception as e:
self.s3_client.abort_multipart_upload(
Bucket=self.s3_bucket, Key=self.obj_key, UploadId=self.upload_id
)
raise Exception(
f'Multipart Upload on Part {self.upload_index}: {e}'
) from e
finally:
self.ostream.close()
self.closed = True
super().close()