diff --git a/docs/apidiffs/current_vs_latest/opentelemetry-sdk-logs.txt b/docs/apidiffs/current_vs_latest/opentelemetry-sdk-logs.txt index 4f532de43dd..953d116ce42 100644 --- a/docs/apidiffs/current_vs_latest/opentelemetry-sdk-logs.txt +++ b/docs/apidiffs/current_vs_latest/opentelemetry-sdk-logs.txt @@ -1,2 +1,8 @@ Comparing source compatibility of opentelemetry-sdk-logs-1.55.0-SNAPSHOT.jar against opentelemetry-sdk-logs-1.54.0.jar -No changes. \ No newline at end of file +*** MODIFIED CLASS: PUBLIC FINAL io.opentelemetry.sdk.logs.export.BatchLogRecordProcessorBuilder (not serializable) + === CLASS FILE FORMAT VERSION: 52.0 <- 52.0 + +++ NEW METHOD: PUBLIC(+) io.opentelemetry.sdk.logs.export.BatchLogRecordProcessorBuilder setErrorConsumer(java.util.function.Consumer>) ++++ NEW CLASS: PUBLIC(+) io.opentelemetry.sdk.logs.export.ExportErrorContext (not serializable) + +++ CLASS FILE FORMAT VERSION: 52.0 <- n.a. + +++ NEW SUPERCLASS: java.lang.Object + +++ NEW FIELD: PUBLIC(+) STATIC(+) FINAL(+) io.opentelemetry.context.ContextKey>> KEY diff --git a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/BatchLogRecordProcessor.java b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/BatchLogRecordProcessor.java index 601bfdfa208..d7c53584a8c 100644 --- a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/BatchLogRecordProcessor.java +++ b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/BatchLogRecordProcessor.java @@ -17,6 +17,7 @@ import io.opentelemetry.sdk.logs.ReadWriteLogRecord; import io.opentelemetry.sdk.logs.data.LogRecordData; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Queue; @@ -26,8 +27,10 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import java.util.logging.Level; import java.util.logging.Logger; +import javax.annotation.Nullable; /** * Implementation of the {@link LogRecordProcessor} that batches logs exported by the SDK then @@ -71,7 +74,8 @@ public static BatchLogRecordProcessorBuilder builder(LogRecordExporter logRecord long scheduleDelayNanos, int maxQueueSize, int maxExportBatchSize, - long exporterTimeoutNanos) { + long exporterTimeoutNanos, + @Nullable Consumer> errorConsumer) { this.worker = new Worker( logRecordExporter, @@ -79,7 +83,8 @@ public static BatchLogRecordProcessorBuilder builder(LogRecordExporter logRecord scheduleDelayNanos, maxExportBatchSize, exporterTimeoutNanos, - new ArrayBlockingQueue<>(maxQueueSize)); // TODO: use JcTools.newFixedSizeQueue(..) + new ArrayBlockingQueue<>(maxQueueSize), // TODO: use JcTools.newFixedSizeQueue(..) + errorConsumer); Thread workerThread = new DaemonThreadFactory(WORKER_THREAD_NAME).newThread(worker); workerThread.start(); } @@ -89,7 +94,7 @@ public void onEmit(Context context, ReadWriteLogRecord logRecord) { if (logRecord == null) { return; } - worker.addLog(logRecord); + worker.addLog(logRecord, context); } @Override @@ -164,13 +169,16 @@ private static final class Worker implements Runnable { private volatile boolean continueWork = true; private final ArrayList batch; + @Nullable private final Consumer> errorConsumer; + private Worker( LogRecordExporter logRecordExporter, MeterProvider meterProvider, long scheduleDelayNanos, int maxExportBatchSize, long exporterTimeoutNanos, - Queue queue) { + Queue queue, + @Nullable Consumer> errorConsumer) { this.logRecordExporter = logRecordExporter; this.scheduleDelayNanos = scheduleDelayNanos; this.maxExportBatchSize = maxExportBatchSize; @@ -211,11 +219,16 @@ private Worker( false); this.batch = new ArrayList<>(this.maxExportBatchSize); + this.errorConsumer = errorConsumer; } - private void addLog(ReadWriteLogRecord logData) { + private void addLog(ReadWriteLogRecord logData, Context context) { if (!queue.offer(logData)) { processedLogsCounter.add(1, droppedAttrs); + Consumer> consumer = context.get(ExportErrorContext.KEY); + if (consumer != null) { + consumer.accept(Collections.singleton(logData.toLogRecordData())); + } } else { if (queue.size() >= logsNeeded.get()) { signal.offer(true); @@ -324,6 +337,12 @@ private void exportCurrentBatch() { processedLogsCounter.add(batch.size(), exportedAttrs); } else { logger.log(Level.FINE, "Exporter failed"); + if (errorConsumer != null) { + // If the exporter failed, we call the error consumer with the batch. + // This allows the user to handle the error, e.g., by logging it or sending it to a + // different exporter. + errorConsumer.accept(batch); + } } } catch (RuntimeException e) { logger.log(Level.WARNING, "Exporter threw an Exception", e); diff --git a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/BatchLogRecordProcessorBuilder.java b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/BatchLogRecordProcessorBuilder.java index 5e17848c774..09b4a3b1b48 100644 --- a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/BatchLogRecordProcessorBuilder.java +++ b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/BatchLogRecordProcessorBuilder.java @@ -9,10 +9,14 @@ import static java.util.Objects.requireNonNull; import io.opentelemetry.api.metrics.MeterProvider; +import io.opentelemetry.sdk.logs.data.LogRecordData; import java.time.Duration; +import java.util.Collection; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import java.util.logging.Level; import java.util.logging.Logger; +import javax.annotation.Nullable; /** * Builder class for {@link BatchLogRecordProcessor}. @@ -39,6 +43,8 @@ public final class BatchLogRecordProcessorBuilder { private long exporterTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(DEFAULT_EXPORT_TIMEOUT_MILLIS); private MeterProvider meterProvider = MeterProvider.noop(); + @Nullable private Consumer> errorConsumer; + BatchLogRecordProcessorBuilder(LogRecordExporter logRecordExporter) { this.logRecordExporter = requireNonNull(logRecordExporter, "logRecordExporter"); } @@ -142,6 +148,18 @@ public BatchLogRecordProcessorBuilder setMeterProvider(MeterProvider meterProvid return this; } + /** + * Sets the error consumer to handle failed log record exports. + * + * @param errorConsumer the consumer to handle collections of failed LogRecordData + * @return this builder + */ + public BatchLogRecordProcessorBuilder setErrorConsumer( + Consumer> errorConsumer) { + this.errorConsumer = errorConsumer; + return this; + } + // Visible for testing int getMaxExportBatchSize() { return maxExportBatchSize; @@ -167,6 +185,7 @@ public BatchLogRecordProcessor build() { scheduleDelayNanos, maxQueueSize, maxExportBatchSize, - exporterTimeoutNanos); + exporterTimeoutNanos, + errorConsumer); } } diff --git a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/ExportErrorContext.java b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/ExportErrorContext.java new file mode 100644 index 00000000000..3198f2842a5 --- /dev/null +++ b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/ExportErrorContext.java @@ -0,0 +1,20 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.logs.export; + +import io.opentelemetry.context.ContextKey; +import io.opentelemetry.sdk.logs.data.LogRecordData; +import java.util.Collection; +import java.util.function.Consumer; +import javax.annotation.concurrent.Immutable; + +@Immutable +public class ExportErrorContext { + public static final ContextKey>> KEY = + ContextKey.named("export-error-consumer"); + + private ExportErrorContext() {} +} diff --git a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/SimpleLogRecordProcessor.java b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/SimpleLogRecordProcessor.java index cc75b50ae1f..25b1e2b04fb 100644 --- a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/SimpleLogRecordProcessor.java +++ b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/SimpleLogRecordProcessor.java @@ -12,11 +12,13 @@ import io.opentelemetry.sdk.logs.LogRecordProcessor; import io.opentelemetry.sdk.logs.ReadWriteLogRecord; import io.opentelemetry.sdk.logs.data.LogRecordData; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; import java.util.logging.Level; import java.util.logging.Logger; @@ -77,6 +79,10 @@ public void onEmit(Context context, ReadWriteLogRecord logRecord) { () -> { pendingExports.remove(result); if (!result.isSuccess()) { + Consumer> consumer = context.get(ExportErrorContext.KEY); + if (consumer != null) { + consumer.accept(Collections.singleton(logRecord.toLogRecordData())); + } logger.log(Level.FINE, "Exporter failed"); } });