diff --git a/application_sdk/observability/logger_adaptor.py b/application_sdk/observability/logger_adaptor.py index 3f719a785..2dade2993 100644 --- a/application_sdk/observability/logger_adaptor.py +++ b/application_sdk/observability/logger_adaptor.py @@ -7,9 +7,9 @@ from typing import Any, Dict, Optional, Tuple from loguru import logger -from opentelemetry._logs import SeverityNumber +from opentelemetry._logs import LogRecord, SeverityNumber from opentelemetry.exporter.otlp.proto.grpc._log_exporter import OTLPLogExporter -from opentelemetry.sdk._logs import LoggerProvider, LogRecord +from opentelemetry.sdk._logs import LoggerProvider from opentelemetry.sdk._logs._internal.export import BatchLogRecordProcessor from opentelemetry.sdk.resources import Resource from opentelemetry.trace.span import TraceFlags @@ -495,7 +495,6 @@ def _create_log_record(self, record: dict) -> LogRecord: severity_text=record["level"], severity_number=severity_number, body=record["message"], - resource=self.logger_provider.resource, attributes=attributes, ) diff --git a/application_sdk/outputs/parquet.py b/application_sdk/outputs/parquet.py index 880c2f0a4..a41e7b7d2 100644 --- a/application_sdk/outputs/parquet.py +++ b/application_sdk/outputs/parquet.py @@ -222,10 +222,6 @@ async def write_daft_dataframe( if isinstance(write_mode, str): write_mode = WriteMode(write_mode) - row_count = dataframe.count_rows() - if row_count == 0: - return - file_paths = [] # Use Daft's execution context for temporary configuration with daft.execution_config_ctx( @@ -239,6 +235,12 @@ async def write_daft_dataframe( partition_cols=partition_cols, ) file_paths = result.to_pydict().get("path", []) + + # If the dataframe is empty, return + # but write an empty file to the output path (necessary for ARS lookup) + row_count = dataframe.count_rows() + if row_count == 0: + return # Update counters self.chunk_count += 1