diff --git a/pyproject.toml b/pyproject.toml index ff3acbc..37b23d8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -27,6 +27,8 @@ classifiers = [ [project.optional-dependencies] dev = [ "black >= 24.4.0", + "boto3 >= 1.37.18", + "botocore >= 1.37.18", "build >= 0.8.0", "docformatter >= 1.7.5", "mypy >= 1.9.0", @@ -34,6 +36,7 @@ dev = [ "types-python-dateutil >= 2.8.19.2", ] test = [ + "moto >= 5.1.1", "smart_open == 6.4.0", ] diff --git a/src/clp_logging/handlers.py b/src/clp_logging/handlers.py index fdb7150..6aab524 100644 --- a/src/clp_logging/handlers.py +++ b/src/clp_logging/handlers.py @@ -12,14 +12,23 @@ from signal import SIGINT, signal, SIGTERM from threading import RLock, Thread, Timer from types import FrameType -from typing import Any, Callable, ClassVar, Dict, IO, Optional, Tuple, Union +from typing import Any, Callable, ClassVar, Dict, List, IO, Optional, Tuple, Union + import tzlocal from clp_ffi_py.ir import FourByteEncoder, Serializer from clp_ffi_py.utils import serialize_dict_to_msgpack from zstandard import FLUSH_FRAME, ZstdCompressionWriter, ZstdCompressor +import base64 +import boto3 +import botocore +import datetime +import hashlib +import io + from clp_logging.auto_generated_kv_pairs_utils import AutoGeneratedKeyValuePairsBuffer + from clp_logging.protocol import ( BYTE_ORDER, EOF_CHAR, @@ -39,6 +48,11 @@ AUTO_GENERATED_KV_PAIRS_KEY: str = "auto_generated_kv_pairs" USER_GENERATED_KV_PAIRS_KEY: str = "user_generated_kv_pairs" +# Define the multipart upload size limits +MIN_UPLOAD_PART_SIZE = 5 * 1024 * 1024 # 5 MB +MAX_UPLOAD_PART_SIZE = 5 * 1024 * 1024 * 1024 # 5 GB +MAX_PART_NUM_PER_UPLOAD = 10000 + def _init_timeinfo(fmt: Optional[str], tz: Optional[str]) -> Tuple[str, str]: """ @@ -859,10 +873,12 @@ class ClpKeyValuePairStreamHandler(logging.Handler): :param stream: A writable byte output stream to which the handler will write the serialized IR byte sequences. :param enable_compression: Whether to compress the serialized IR byte sequences using Zstandard. + """ def __init__( self, + stream: IO[bytes], enable_compression: bool = True, ) -> None: @@ -996,3 +1012,282 @@ def _serialize_kv_pair_log_event( serialize_dict_to_msgpack(auto_gen_kv_pairs), serialize_dict_to_msgpack(user_gen_kv_pairs), ) + + +class CLPS3Handler(CLPBaseHandler): + """ + Log is written to stream in CLP IR encoding, and uploaded to s3_bucket + + :param s3_bucket: S3 bucket to upload CLP encoded log messages to + :param stream: Target stream to write log messages to + :param enable_compression: Option to enable/disable stream compression + Default: True + :param timestamp_format: Timestamp format written in preamble to be + used when generating the logs with a reader. + :param timezone: Timezone written in preamble to be used when + generating the timestamp from Unix epoch time. + :param aws_access_key_id: User's public access key for the S3 bucket. + :param aws_secret_access_key: User's private access key for the S3 bucket. + :param s3_directory: S3 remote directory to upload objects to. + :param use_multipart_upload: Option to use multipart upload to upload + stream segments or use PutObject to upload the entire buffer. + Default: True + :param max_part_num: Maximum number of parts allowed for a multipart upload + session before uploading to a new object. Default: 10000 + :param upload_part_size: Maximum size of a part in a multipart upload + session before writing to a new part. Default: 5MB + """ + + def __init__( + self, + s3_bucket: str, + stream: Optional[IO[bytes]] = None, + enable_compression: bool = True, + timestamp_format: Optional[str] = None, + timezone: Optional[str] = None, + aws_access_key_id: Optional[str] = None, + aws_secret_access_key: Optional[str] = None, + s3_directory: Optional[str] = None, + use_multipart_upload: Optional[bool] = True, + max_part_num: Optional[int] = None, + upload_part_size: Optional[int] = MIN_UPLOAD_PART_SIZE + ) -> None: + super().__init__() + self.closed: bool = False + self.enable_compression: bool = enable_compression + self._local_buffer: io.BytesIO = io.BytesIO() + if stream is None: + stream = self._local_buffer + self._ostream: IO[bytes] = stream + self.timestamp_format: str + self.timezone: str + self.timestamp_format, self.timezone = _init_timeinfo(timestamp_format, timezone) + self._init_stream(stream) + + # Configure s3-related variables + self.s3_bucket: str = s3_bucket + try: + self._s3_client = boto3.client( + "s3", + aws_access_key_id=aws_access_key_id, + aws_secret_access_key=aws_secret_access_key + ) if aws_access_key_id and aws_secret_access_key else boto3.client("s3") + except botocore.exceptions.NoCredentialsError: + raise RuntimeError("AWS credentials not found. Please configure your credentials.") + except botocore.exceptions.ClientError as e: + raise RuntimeError(f"Failed to initialize AWS client: {e}") + self._remote_folder_path: Optional[str] = None + self._remote_file_count: int = 1 + self._start_timestamp: datetime = datetime.datetime.now() + self.s3_directory: str = (s3_directory.rstrip('/') + '/') if s3_directory else '' + self._obj_key: str = self._remote_log_naming() + + self.use_multipart_upload = use_multipart_upload + if self.use_multipart_upload: + # Configure size limit of a part in multipart upload + self.upload_part_size: int + if MIN_UPLOAD_PART_SIZE <= upload_part_size <= MAX_UPLOAD_PART_SIZE: + self.upload_part_size = upload_part_size + else: + raise RuntimeError( + f"Invalid upload_part_size: {upload_part_size}. " + f"It must be between {MIN_UPLOAD_PART_SIZE} and {MAX_UPLOAD_PART_SIZE}." + ) + self.max_part_num: int = max_part_num if max_part_num else MAX_PART_NUM_PER_UPLOAD + self._uploaded_parts: List[Dict[str, int | str]] = [] + self._upload_index: int = 1 + create_ret: Dict[str, Any] = self._s3_client.create_multipart_upload( + Bucket=self.s3_bucket, Key=self._obj_key, ChecksumAlgorithm="SHA256" + ) + self._upload_id: int = create_ret["UploadId"] + if not self._upload_id or not isinstance(self._upload_id, str): + raise RuntimeError("Failed to obtain a valid Upload ID from S3.") + + def _init_stream(self, stream: IO[bytes]) -> None: + """ + Initialize and configure output stream + + :param stream: Target stream to write log messages to + """ + self.cctx: ZstdCompressor = ZstdCompressor() + self._ostream: Union[ZstdCompressionWriter, IO[bytes]] = ( + self.cctx.stream_writer(self._local_buffer) if self.enable_compression else stream + ) + self.last_timestamp_ms: int = floor(time.time() * 1000) # convert to ms and truncate + self._ostream.write( + FourByteEncoder.encode_preamble( + self.last_timestamp_ms, self.timestamp_format, self.timezone + ) + ) + + def _remote_log_naming(self) -> str: + """ + Set the name of the target S3 object key to upload to + """ + self._remote_folder_path: str = f"{self.s3_directory}{self._start_timestamp.year}/{self._start_timestamp.month}/{self._start_timestamp.day}" + + new_filename: str + upload_time: str = str(int(self._start_timestamp.timestamp())) + + file_count: str = f"-{self._remote_file_count}" + + # Compression uses zstd format + if self.enable_compression: + new_filename = f"{self._remote_folder_path}/{upload_time}_log{file_count}.clp.zst" + else: + new_filename = f"{self._remote_folder_path}/{upload_time}_log{file_count}.clp" + return new_filename + + # override + def _write(self, loglevel: int, msg: str) -> None: + """ + Write the log message stream into a local buffer. + (With use_multipart_upload) Update the part number if the local buffer + exceeds a predetermined buffer size. Then clear the local buffer. + """ + if self.closed: + raise RuntimeError("Stream already closed") + clp_msg: bytearray + clp_msg, self.last_timestamp_ms = _encode_log_event(msg, self.last_timestamp_ms) + + # Write log stream to a local buffer and flush to upload + self._ostream.write(clp_msg) + if not self.use_multipart_upload: + self._ostream.write(EOF_CHAR) + self._flush() + + if self.use_multipart_upload and self._local_buffer.tell() >= self.upload_part_size: + # Rotate after maximum number of parts + if self._upload_index >= self.max_part_num: + self._complete_multipart_upload() + self._ostream.close() + self._local_buffer = io.BytesIO() + self._init_stream(self._local_buffer) + self._remote_file_count += 1 + self._obj_key = self._remote_log_naming() + self._uploaded_parts = [] + self._upload_index = 1 + create_ret = self._s3_client.create_multipart_upload( + Bucket=self.s3_bucket, Key=self._obj_key, ChecksumAlgorithm="SHA256" + ) + self._upload_id = create_ret["UploadId"] + if not self._upload_id: + raise RuntimeError("Failed to initialize new upload ID.") + else: + self._upload_index += 1 + self._local_buffer.seek(0) + self._local_buffer.truncate(0) + + + def _flush(self) -> None: + """ + Upload local buffer to the S3 bucket using upload_part if + use_multipart_upload = True, otherwise use put_object. + """ + self._ostream.flush() + data: bytes = self._local_buffer.getvalue() + sha256_checksum: str = base64.b64encode(hashlib.sha256(data).digest()).decode('utf-8') + + if self.use_multipart_upload: + try: + response: Dict[str, Any] = self._s3_client.upload_part( + Bucket=self.s3_bucket, + Key=self._obj_key, + Body=data, + PartNumber=self._upload_index, + UploadId=self._upload_id, + ChecksumSHA256=sha256_checksum, + ) + + # Verify integrity of the uploaded part using SHA256 Checksum + if response["ChecksumSHA256"] != sha256_checksum: + raise ValueError(f"Checksum mismatch for part {self._upload_index}. Upload aborted.") + + # Store both ETag and SHA256 for validation + upload_status: Dict[str, int | str] = { + "PartNumber": self._upload_index, + "ETag": response["ETag"], + "ChecksumSHA256": response["ChecksumSHA256"], + } + + # Determine the part to which the new upload_status belongs + if len(self._uploaded_parts) > self._upload_index - 1: + self._uploaded_parts[self._upload_index-1] = upload_status + else: + self._uploaded_parts.append(upload_status) + + except Exception as e: + self._s3_client.abort_multipart_upload( + Bucket=self.s3_bucket, Key=self._obj_key, UploadId=self._upload_id + ) + raise Exception( + f'Multipart Upload on Part {self._upload_index}: {e}' + ) from e + else: + self._ostream.write(EOF_CHAR) + try: + self._s3_client.put_object( + Bucket=self.s3_bucket, + Key=self._obj_key, + Body=data, + ContentEncoding='zstd' if self.enable_compression else 'binary', + ChecksumSHA256=sha256_checksum + ) + + # Verify integrity of the upload using SHA256 Checksum + response: Dict[str, Any] = self._s3_client.head_object( + Bucket=self.s3_bucket, + Key=self._obj_key + ) + if 'ChecksumSHA256' in response: + s3_checksum: str = response['ChecksumSHA256'] + if s3_checksum != sha256_checksum: + raise ValueError(f"Checksum mismatch. Upload aborted.") + + except Exception as e: + raise Exception(f'Failed to upload using PutObject: {e}') + + def _complete_multipart_upload(self) -> None: + """ + Complete a multipart upload session and clear the local buffer. + """ + # Flush EOF marker to the local buffer and upload + self._ostream.write(EOF_CHAR) + self._flush() + self._local_buffer.seek(0) + self._local_buffer.truncate(0) + + try: + self._s3_client.complete_multipart_upload( + Bucket=self.s3_bucket, + Key=self._obj_key, + UploadId=self._upload_id, + MultipartUpload={ + "Parts": [ + { + "PartNumber": part["PartNumber"], + "ETag": part["ETag"], + "ChecksumSHA256": part["ChecksumSHA256"], + } + for part in self._uploaded_parts + ] + }, + ) + except Exception as e: + self._s3_client.abort_multipart_upload( + Bucket=self.s3_bucket, Key=self._obj_key, UploadId=self._upload_id + ) + raise Exception( + f'Multipart Upload on Part {self._upload_index}: {e}' + ) from e + + # override + def close(self) -> None: + """ + Complete the upload if needed. Close the stream and the handler. + """ + if self.use_multipart_upload: + self._complete_multipart_upload() + self._ostream.close() + self.closed = True + super().close()