Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions application_sdk/observability/logger_adaptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
from typing import Any, Dict, Optional, Tuple

from loguru import logger
from opentelemetry._logs import SeverityNumber
from opentelemetry._logs import LogRecord, SeverityNumber
from opentelemetry.exporter.otlp.proto.grpc._log_exporter import OTLPLogExporter
from opentelemetry.sdk._logs import LoggerProvider, LogRecord
from opentelemetry.sdk._logs import LoggerProvider
from opentelemetry.sdk._logs._internal.export import BatchLogRecordProcessor
from opentelemetry.sdk.resources import Resource
from opentelemetry.trace.span import TraceFlags
Expand Down Expand Up @@ -495,7 +495,6 @@ def _create_log_record(self, record: dict) -> LogRecord:
severity_text=record["level"],
severity_number=severity_number,
body=record["message"],
resource=self.logger_provider.resource,
attributes=attributes,
)

Expand Down
10 changes: 6 additions & 4 deletions application_sdk/outputs/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,10 +222,6 @@ async def write_daft_dataframe(
if isinstance(write_mode, str):
write_mode = WriteMode(write_mode)

row_count = dataframe.count_rows()
if row_count == 0:
return

file_paths = []
# Use Daft's execution context for temporary configuration
with daft.execution_config_ctx(
Expand All @@ -239,6 +235,12 @@ async def write_daft_dataframe(
partition_cols=partition_cols,
)
file_paths = result.to_pydict().get("path", [])

# If the dataframe is empty, return
# but write an empty file to the output path (necessary for ARS lookup)
row_count = dataframe.count_rows()
if row_count == 0:
return

# Update counters
self.chunk_count += 1
Expand Down
Loading