Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion docs/apidiffs/current_vs_latest/opentelemetry-sdk-logs.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,8 @@
Comparing source compatibility of opentelemetry-sdk-logs-1.55.0-SNAPSHOT.jar against opentelemetry-sdk-logs-1.54.0.jar
No changes.
*** MODIFIED CLASS: PUBLIC FINAL io.opentelemetry.sdk.logs.export.BatchLogRecordProcessorBuilder (not serializable)
=== CLASS FILE FORMAT VERSION: 52.0 <- 52.0
+++ NEW METHOD: PUBLIC(+) io.opentelemetry.sdk.logs.export.BatchLogRecordProcessorBuilder setErrorConsumer(java.util.function.Consumer<java.util.Collection<io.opentelemetry.sdk.logs.data.LogRecordData>>)
+++ NEW CLASS: PUBLIC(+) io.opentelemetry.sdk.logs.export.ExportErrorContext (not serializable)
+++ CLASS FILE FORMAT VERSION: 52.0 <- n.a.
+++ NEW SUPERCLASS: java.lang.Object
+++ NEW FIELD: PUBLIC(+) STATIC(+) FINAL(+) io.opentelemetry.context.ContextKey<java.util.function.Consumer<java.util.Collection<io.opentelemetry.sdk.logs.data.LogRecordData>>> KEY
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.opentelemetry.sdk.logs.ReadWriteLogRecord;
import io.opentelemetry.sdk.logs.data.LogRecordData;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
Expand All @@ -26,8 +27,10 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;

/**
* Implementation of the {@link LogRecordProcessor} that batches logs exported by the SDK then
Expand Down Expand Up @@ -71,15 +74,17 @@ public static BatchLogRecordProcessorBuilder builder(LogRecordExporter logRecord
long scheduleDelayNanos,
int maxQueueSize,
int maxExportBatchSize,
long exporterTimeoutNanos) {
long exporterTimeoutNanos,
@Nullable Consumer<Collection<LogRecordData>> errorConsumer) {
this.worker =
new Worker(
logRecordExporter,
meterProvider,
scheduleDelayNanos,
maxExportBatchSize,
exporterTimeoutNanos,
new ArrayBlockingQueue<>(maxQueueSize)); // TODO: use JcTools.newFixedSizeQueue(..)
new ArrayBlockingQueue<>(maxQueueSize), // TODO: use JcTools.newFixedSizeQueue(..)
errorConsumer);
Thread workerThread = new DaemonThreadFactory(WORKER_THREAD_NAME).newThread(worker);
workerThread.start();
}
Expand All @@ -89,7 +94,7 @@ public void onEmit(Context context, ReadWriteLogRecord logRecord) {
if (logRecord == null) {
return;
}
worker.addLog(logRecord);
worker.addLog(logRecord, context);
}

@Override
Expand Down Expand Up @@ -164,13 +169,16 @@ private static final class Worker implements Runnable {
private volatile boolean continueWork = true;
private final ArrayList<LogRecordData> batch;

@Nullable private final Consumer<Collection<LogRecordData>> errorConsumer;

private Worker(
LogRecordExporter logRecordExporter,
MeterProvider meterProvider,
long scheduleDelayNanos,
int maxExportBatchSize,
long exporterTimeoutNanos,
Queue<ReadWriteLogRecord> queue) {
Queue<ReadWriteLogRecord> queue,
@Nullable Consumer<Collection<LogRecordData>> errorConsumer) {
this.logRecordExporter = logRecordExporter;
this.scheduleDelayNanos = scheduleDelayNanos;
this.maxExportBatchSize = maxExportBatchSize;
Expand Down Expand Up @@ -211,11 +219,16 @@ private Worker(
false);

this.batch = new ArrayList<>(this.maxExportBatchSize);
this.errorConsumer = errorConsumer;
}

private void addLog(ReadWriteLogRecord logData) {
private void addLog(ReadWriteLogRecord logData, Context context) {
if (!queue.offer(logData)) {
processedLogsCounter.add(1, droppedAttrs);
Consumer<Collection<LogRecordData>> consumer = context.get(ExportErrorContext.KEY);
if (consumer != null) {
consumer.accept(Collections.singleton(logData.toLogRecordData()));
}
} else {
if (queue.size() >= logsNeeded.get()) {
signal.offer(true);
Expand Down Expand Up @@ -324,6 +337,12 @@ private void exportCurrentBatch() {
processedLogsCounter.add(batch.size(), exportedAttrs);
} else {
logger.log(Level.FINE, "Exporter failed");
if (errorConsumer != null) {
// If the exporter failed, we call the error consumer with the batch.
// This allows the user to handle the error, e.g., by logging it or sending it to a
// different exporter.
errorConsumer.accept(batch);
}
}
} catch (RuntimeException e) {
logger.log(Level.WARNING, "Exporter threw an Exception", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,14 @@
import static java.util.Objects.requireNonNull;

import io.opentelemetry.api.metrics.MeterProvider;
import io.opentelemetry.sdk.logs.data.LogRecordData;
import java.time.Duration;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;

/**
* Builder class for {@link BatchLogRecordProcessor}.
Expand All @@ -39,6 +43,8 @@ public final class BatchLogRecordProcessorBuilder {
private long exporterTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(DEFAULT_EXPORT_TIMEOUT_MILLIS);
private MeterProvider meterProvider = MeterProvider.noop();

@Nullable private Consumer<Collection<LogRecordData>> errorConsumer;

BatchLogRecordProcessorBuilder(LogRecordExporter logRecordExporter) {
this.logRecordExporter = requireNonNull(logRecordExporter, "logRecordExporter");
}
Expand Down Expand Up @@ -142,6 +148,18 @@ public BatchLogRecordProcessorBuilder setMeterProvider(MeterProvider meterProvid
return this;
}

/**
* Sets the error consumer to handle failed log record exports.
*
* @param errorConsumer the consumer to handle collections of failed LogRecordData
* @return this builder
*/
public BatchLogRecordProcessorBuilder setErrorConsumer(
Consumer<Collection<LogRecordData>> errorConsumer) {
this.errorConsumer = errorConsumer;
return this;
}

// Visible for testing
int getMaxExportBatchSize() {
return maxExportBatchSize;
Expand All @@ -167,6 +185,7 @@ public BatchLogRecordProcessor build() {
scheduleDelayNanos,
maxQueueSize,
maxExportBatchSize,
exporterTimeoutNanos);
exporterTimeoutNanos,
errorConsumer);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.logs.export;

import io.opentelemetry.context.ContextKey;
import io.opentelemetry.sdk.logs.data.LogRecordData;
import java.util.Collection;
import java.util.function.Consumer;
import javax.annotation.concurrent.Immutable;

@Immutable
public class ExportErrorContext {
public static final ContextKey<Consumer<Collection<LogRecordData>>> KEY =
ContextKey.named("export-error-consumer");

private ExportErrorContext() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@
import io.opentelemetry.sdk.logs.LogRecordProcessor;
import io.opentelemetry.sdk.logs.ReadWriteLogRecord;
import io.opentelemetry.sdk.logs.data.LogRecordData;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.logging.Level;
import java.util.logging.Logger;

Expand Down Expand Up @@ -77,6 +79,10 @@ public void onEmit(Context context, ReadWriteLogRecord logRecord) {
() -> {
pendingExports.remove(result);
if (!result.isSuccess()) {
Consumer<Collection<LogRecordData>> consumer = context.get(ExportErrorContext.KEY);
if (consumer != null) {
consumer.accept(Collections.singleton(logRecord.toLogRecordData()));
}
logger.log(Level.FINE, "Exporter failed");
}
});
Expand Down
Loading