diff --git a/docs/apidiffs/current_vs_latest/opentelemetry-sdk-logs.txt b/docs/apidiffs/current_vs_latest/opentelemetry-sdk-logs.txt index 4f532de43dd..33f8aa8f16a 100644 --- a/docs/apidiffs/current_vs_latest/opentelemetry-sdk-logs.txt +++ b/docs/apidiffs/current_vs_latest/opentelemetry-sdk-logs.txt @@ -1,2 +1,45 @@ Comparing source compatibility of opentelemetry-sdk-logs-1.55.0-SNAPSHOT.jar against opentelemetry-sdk-logs-1.54.0.jar -No changes. \ No newline at end of file ++++ NEW CLASS: PUBLIC(+) io.opentelemetry.sdk.logs.export.AuditException (compatible) + +++ CLASS FILE FORMAT VERSION: 52.0 <- n.a. + +++ NEW INTERFACE: java.io.Serializable + +++ NEW SUPERCLASS: java.lang.RuntimeException + +++ NEW FIELD: PUBLIC(+) io.opentelemetry.context.Context context + +++ NEW ANNOTATION: javax.annotation.Nullable + +++ NEW FIELD: PUBLIC(+) java.util.Collection logRecords ++++ NEW INTERFACE: PUBLIC(+) ABSTRACT(+) io.opentelemetry.sdk.logs.export.AuditExceptionHandler (not serializable) + +++ CLASS FILE FORMAT VERSION: 52.0 <- n.a. + +++ NEW SUPERCLASS: java.lang.Object + +++ NEW METHOD: PUBLIC(+) ABSTRACT(+) void handle(io.opentelemetry.sdk.logs.export.AuditException) ++++ NEW CLASS: PUBLIC(+) FINAL(+) io.opentelemetry.sdk.logs.export.AuditLogRecordProcessor (not serializable) + +++ CLASS FILE FORMAT VERSION: 52.0 <- n.a. + +++ NEW INTERFACE: io.opentelemetry.sdk.logs.LogRecordProcessor + +++ NEW INTERFACE: java.io.Closeable + +++ NEW INTERFACE: java.lang.AutoCloseable + +++ NEW SUPERCLASS: java.lang.Object + +++ NEW METHOD: PUBLIC(+) STATIC(+) io.opentelemetry.sdk.logs.export.AuditLogRecordProcessorBuilder builder(io.opentelemetry.sdk.logs.export.LogRecordExporter, io.opentelemetry.sdk.logs.export.AuditLogStore) + +++ NEW METHOD: PUBLIC(+) io.opentelemetry.sdk.common.CompletableResultCode forceFlush() + +++ NEW METHOD: PUBLIC(+) io.opentelemetry.sdk.common.CompletableResultCode getLastResultCode() + +++ NEW ANNOTATION: javax.annotation.Nullable + +++ NEW METHOD: PUBLIC(+) void onEmit(io.opentelemetry.context.Context, io.opentelemetry.sdk.logs.ReadWriteLogRecord) + +++ NEW METHOD: PUBLIC(+) io.opentelemetry.sdk.common.CompletableResultCode shutdown() + +++ NEW METHOD: PUBLIC(+) java.lang.String toString() ++++ NEW CLASS: PUBLIC(+) FINAL(+) io.opentelemetry.sdk.logs.export.AuditLogRecordProcessorBuilder (not serializable) + +++ CLASS FILE FORMAT VERSION: 52.0 <- n.a. + +++ NEW SUPERCLASS: java.lang.Object + +++ NEW FIELD: PUBLIC(+) STATIC(+) FINAL(+) long DEFAULT_SCHEDULE_DELAY_MILLIS + +++ NEW FIELD: PUBLIC(+) STATIC(+) FINAL(+) int DEFAULT_MAX_EXPORT_BATCH_SIZE + +++ NEW FIELD: PUBLIC(+) STATIC(+) FINAL(+) int DEFAULT_EXPORT_TIMEOUT_MILLIS + +++ NEW METHOD: PUBLIC(+) io.opentelemetry.sdk.logs.export.AuditLogRecordProcessor build() + +++ NEW METHOD: PUBLIC(+) io.opentelemetry.sdk.logs.export.AuditLogRecordProcessorBuilder setExceptionHandler(io.opentelemetry.sdk.logs.export.AuditExceptionHandler) + +++ NEW METHOD: PUBLIC(+) io.opentelemetry.sdk.logs.export.AuditLogRecordProcessorBuilder setExporterTimeout(long, java.util.concurrent.TimeUnit) + +++ NEW METHOD: PUBLIC(+) io.opentelemetry.sdk.logs.export.AuditLogRecordProcessorBuilder setMaxExportBatchSize(int) + +++ NEW METHOD: PUBLIC(+) io.opentelemetry.sdk.logs.export.AuditLogRecordProcessorBuilder setRetryPolicy(io.opentelemetry.sdk.common.export.RetryPolicy) + +++ NEW METHOD: PUBLIC(+) io.opentelemetry.sdk.logs.export.AuditLogRecordProcessorBuilder setScheduleDelay(long, java.util.concurrent.TimeUnit) + +++ NEW METHOD: PUBLIC(+) io.opentelemetry.sdk.logs.export.AuditLogRecordProcessorBuilder setWaitOnExport(boolean) ++++ NEW INTERFACE: PUBLIC(+) ABSTRACT(+) io.opentelemetry.sdk.logs.export.AuditLogStore (not serializable) + +++ CLASS FILE FORMAT VERSION: 52.0 <- n.a. + +++ NEW SUPERCLASS: java.lang.Object + +++ NEW METHOD: PUBLIC(+) ABSTRACT(+) java.util.Collection getAll() + +++ NEW METHOD: PUBLIC(+) ABSTRACT(+) void removeAll(java.util.Collection) + +++ NEW METHOD: PUBLIC(+) ABSTRACT(+) void save(io.opentelemetry.sdk.logs.data.LogRecordData) + +++ NEW EXCEPTION: java.io.IOException diff --git a/exporters/otlp/common/build.gradle.kts b/exporters/otlp/common/build.gradle.kts index 5ad623561b2..4f854ddaf20 100644 --- a/exporters/otlp/common/build.gradle.kts +++ b/exporters/otlp/common/build.gradle.kts @@ -23,6 +23,7 @@ dependencies { compileOnly(project(":sdk:trace")) compileOnly(project(":sdk:logs")) compileOnly(project(":api:incubator")) + compileOnly("com.fasterxml.jackson.core:jackson-databind") testImplementation(project(":sdk:metrics")) testImplementation(project(":sdk:trace")) diff --git a/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/logs/AuditLogFileStore.java b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/logs/AuditLogFileStore.java new file mode 100644 index 00000000000..d0bd80a87d3 --- /dev/null +++ b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/logs/AuditLogFileStore.java @@ -0,0 +1,238 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.internal.otlp.logs; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.opentelemetry.sdk.logs.data.LogRecordData; +import io.opentelemetry.sdk.logs.export.AuditLogStore; +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.logging.Logger; +import java.util.stream.Stream; +import javax.annotation.Nullable; + +/** + * This class is internal and experimental. Its APIs are unstable and can change at any time. Its + * APIs (or a version of them) may be promoted to the public stable API in the future, but no + * guarantees are made. + * + *

A file-based implementation of {@link AuditLogStore} that provides thread-safe concurrent + * reading and writing of audit log records to/from the file system. + */ +public final class AuditLogFileStore implements AuditLogStore { + + private static final String DEFAULT_LOG_FILE_EXTENSION = ".log"; + + public static final String DEFAULT_LOG_FILE_NAME = "audit" + DEFAULT_LOG_FILE_EXTENSION; + + private static final Logger logger = Logger.getLogger(AuditLogFileStore.class.getName()); + + /** Generates a unique ID for a log record based on its content. */ + static String generateRecordId(@Nullable LogRecordData logRecord) { + if (logRecord == null) { + return ""; + } + return String.valueOf( + (logRecord.getTimestampEpochNanos() + + String.valueOf(logRecord.getBodyValue()) + + logRecord.getSeverity().toString()) + .hashCode()); + } + + private final ReadWriteLock lock = new ReentrantReadWriteLock(); + + private final Path logFilePath; + + private final Set loggedRecords = ConcurrentHashMap.newKeySet(); + + @Nullable private ObjectMapper objectMapper; + + /** + * Creates a new AuditLogFileStore that stores logs in the specified path. If the path is a + * directory, logs will be stored in a default file within that directory. If the path is a file, + * logs will be stored directly in that file. + * + * @param path the path to the log file or directory + * @throws IOException if the file or directory cannot be created or accessed + */ + public AuditLogFileStore(Path path) throws IOException { + if (Files.isDirectory(path)) { + this.logFilePath = path.resolve(DEFAULT_LOG_FILE_NAME); + } else { + this.logFilePath = path; + } + + // Ensure parent directories exist + if (logFilePath.getParent() != null) { + Files.createDirectories(logFilePath.getParent()); + } + + // Create the file if it doesn't exist + if (!Files.exists(logFilePath)) { + Files.createFile(logFilePath); + } + + // verify that we can read and write to the file + if (!Files.isReadable(logFilePath)) { + throw new IOException("Can't read: " + logFilePath); + } + if (!Files.isWritable(logFilePath)) { + throw new IOException("Can't write: " + logFilePath); + } + + // Load existing log record IDs to avoid duplicates + loadExistingRecordIds(); + } + + /** + * Creates a new AuditLogFileStore that stores logs in the specified file. + * + * @param filePath the path to the log file + * @throws IOException if the file cannot be created or accessed + */ + public AuditLogFileStore(String filePath) throws IOException { + this(Paths.get(filePath)); + } + + @Override + public Collection getAll() { + Collection records = new ArrayList<>(); + lock.readLock().lock(); + try (Stream lines = Files.lines(logFilePath)) { + lines.forEach( + line -> { + LogRecordData record = parseLogRecord(line); + if (record != null) { + records.add(record); + } + }); + } catch (IOException e) { + logger.throwing(AuditLogFileStore.class.getName(), "getAll", e); + } + lock.readLock().unlock(); + return records; + } + + ObjectMapper json() { + if (objectMapper == null) { + objectMapper = new ObjectMapper(); + } + return objectMapper; + } + + /** Loads existing record IDs from the log file to prevent duplicates. */ + private void loadExistingRecordIds() throws IOException { + if (!Files.exists(logFilePath) || Files.size(logFilePath) == 0) { + return; + } + + lock.readLock().lock(); + try (Stream lines = Files.lines(logFilePath)) { + lines.forEach( + line -> { + String recordId = generateRecordId(parseLogRecord(line)); + if (recordId != null) { + loggedRecords.add(recordId); + } + }); + } finally { + lock.readLock().unlock(); + } + } + + /** + * Parses a log record from a stored line (simplified implementation). In a production system, you + * might want to use JSON or another structured format. + */ + @Nullable + LogRecordData parseLogRecord(String line) { + try { + return json().readValue(line, JsonLogRecordData.class); + } catch (JsonProcessingException e) { + logger.throwing(AuditLogFileStore.class.getName(), "parseLogRecord", e); + } + return null; + } + + @Override + public void removeAll(Collection logs) { + lock.writeLock().lock(); + try { + Set recordIdsToRemove = new HashSet<>(); + for (LogRecordData log : logs) { + recordIdsToRemove.add(generateRecordId(log)); + } + + // Read all lines, filter out the ones to remove, then write back + Collection remainingLines = new ArrayList<>(); + try (BufferedReader reader = Files.newBufferedReader(logFilePath)) { + String line; + while ((line = reader.readLine()) != null) { + String recordId = generateRecordId(parseLogRecord(line)); + if (recordId == null || !recordIdsToRemove.contains(recordId)) { + remainingLines.add(line); + } else { + loggedRecords.remove(recordId); + } + } + } catch (IOException e) { + logger.throwing(AuditLogFileStore.class.getName(), "removeAll", e); + } + + // Write the remaining lines back to the file + try (BufferedWriter writer = + Files.newBufferedWriter( + logFilePath, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING)) { + for (String line : remainingLines) { + writer.write(line); + writer.newLine(); + } + } catch (IOException e) { + logger.throwing(AuditLogFileStore.class.getName(), "removeAll", e); + } + } finally { + lock.writeLock().unlock(); + } + } + + @Override + public void save(LogRecordData logRecord) throws IOException { + String recordId = generateRecordId(logRecord); + + // Check if we've already logged this record + if (loggedRecords.contains(recordId)) { + return; + } + + lock.writeLock().lock(); + try (OutputStream out = + Files.newOutputStream(logFilePath, StandardOpenOption.CREATE, StandardOpenOption.APPEND)) { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + LogMarshaler.create(logRecord).writeJsonTo(baos); // closes the stream! + out.write(baos.toByteArray()); + out.write(System.lineSeparator().getBytes(StandardCharsets.UTF_8)); + loggedRecords.add(recordId); + } finally { + lock.writeLock().unlock(); + } + } +} diff --git a/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/logs/JsonLogRecordData.java b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/logs/JsonLogRecordData.java new file mode 100644 index 00000000000..fe98f83ce03 --- /dev/null +++ b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/logs/JsonLogRecordData.java @@ -0,0 +1,299 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.internal.otlp.logs; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.common.AttributesBuilder; +import io.opentelemetry.api.logs.Severity; +import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.api.trace.TraceFlags; +import io.opentelemetry.api.trace.TraceState; +import io.opentelemetry.sdk.common.InstrumentationScopeInfo; +import io.opentelemetry.sdk.logs.data.LogRecordData; +import io.opentelemetry.sdk.resources.Resource; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import javax.annotation.Nullable; + +/** + * This class is internal and experimental. Its APIs are unstable and can change at any time. Its + * APIs (or a version of them) may be promoted to the public stable API in the future, but no + * guarantees are made. + * + *

A JSON-based implementation of {@link LogRecordData} that maps JSON log record fields to the + * corresponding OpenTelemetry SDK log record data fields. + * + * @see LogMarshaler#writeJsonTo(java.io.OutputStream) + */ +public class JsonLogRecordData implements LogRecordData { + + static class ArrayWrapper { + @Nullable + @JsonProperty("values") + @JsonDeserialize(using = AttrValueArrayDeserializer.class) + List values; + } + + static class AttributesWrapper { + @Nullable + @JsonProperty("key") + String key; + + @Nullable + @JsonProperty("value") + ValueWrapper value; + } + + /** Custom deserializer for array values that handles different value types. */ + static class AttrValueArrayDeserializer extends JsonDeserializer> { + @Override + public List deserialize(JsonParser parser, DeserializationContext context) + throws IOException { + JsonNode arrayNode = parser.getCodec().readTree(parser); + List result = new ArrayList<>(); + + if (arrayNode.isArray()) { + for (JsonNode valueNode : arrayNode) { + Object value = extractValue(valueNode); + if (value != null) { + result.add(value); + } + } + } + + return result; + } + + static Object extractValue(JsonNode valueNode) { + if (valueNode.has("boolValue")) { + return valueNode.get("boolValue").asBoolean(); + } + if (valueNode.has("doubleValue")) { + return valueNode.get("doubleValue").asDouble(); + } + if (valueNode.has("intValue")) { + String intStr = valueNode.get("intValue").asText(); + try { + return Long.parseLong(intStr); + } catch (NumberFormatException e) { + return intStr; + } + } + if (valueNode.has("stringValue")) { + return valueNode.get("stringValue").asText(); + } + if (valueNode.has("bytesValue")) { + return valueNode.get("bytesValue").asText(); + } + return valueNode.asText(); + } + } + + static class BodyWrapper { + @Nullable + @JsonProperty("stringValue") + String stringValue; + } + + static class ValueWrapper { + @Nullable + @JsonProperty("arrayValue") + ArrayWrapper arrayValue; + + @Nullable + @JsonProperty("boolValue") + Boolean boolValue; + + @Nullable + @JsonProperty("doubleValue") + Double doubleValue; + + @Nullable + @JsonProperty("intValue") + String intValue; + + @Nullable + @JsonProperty("stringValue") + String stringValue; + } + + @Nullable + @JsonProperty("attributes") + private Collection attributes; + + @Nullable + @JsonProperty("body") + private BodyWrapper body; + + @JsonProperty("droppedAttributesCount") + private int droppedAttributesCount; + + @JsonProperty("observedTimeUnixNano") + private long observedTimestampEpochNanos; + + @Nullable @JsonIgnore private Severity severity; + + @JsonProperty("severityNumber") + private int severityNumber; + + @Nullable + @JsonProperty("severityText") + private String severityText; + + @Nullable + @JsonProperty("spanId") + private String spanId; + + @JsonProperty("timeUnixNano") + private long timestampEpochNanos; + + @Nullable + @JsonProperty("traceId") + private String traceId; + + @Nullable @JsonIgnore private transient Attributes attrs; + + @Override + public Attributes getAttributes() { + if (attrs != null) { + return attrs; + } + + AttributesBuilder builder = Attributes.builder(); + if (attributes != null) { + for (AttributesWrapper attr : attributes) { + if (attr.key == null || attr.value == null) { + continue; + } + if (attr.value.stringValue != null) { + builder.put(attr.key, attr.value.stringValue); + } else if (attr.value.intValue != null) { + builder.put(attr.key, Long.valueOf(attr.value.intValue)); + } else if (attr.value.boolValue != null) { + builder.put(attr.key, Boolean.valueOf(attr.value.boolValue)); + } else if (attr.value.doubleValue != null) { + builder.put(attr.key, Double.valueOf(attr.value.doubleValue)); + } else if (attr.value.arrayValue != null && attr.value.arrayValue.values != null) { + // Handle array values - convert to appropriate array types + List values = attr.value.arrayValue.values; + if (!values.isEmpty()) { + Object firstValue = values.get(0); + if (firstValue instanceof String) { + builder.put(attr.key, values.toArray(new String[values.size()])); + } else if (firstValue instanceof Long) { + builder.put(attr.key, values.stream().mapToLong(v -> (Long) v).toArray()); + } else if (firstValue instanceof Double) { + builder.put(attr.key, values.stream().mapToDouble(v -> (Double) v).toArray()); + } else if (firstValue instanceof Boolean) { + // Convert Boolean list to string representation since Attributes doesn't support + // Boolean arrays + boolean[] boolArray = new boolean[values.size()]; + for (int i = 0; i < values.size(); i++) { + boolArray[i] = (Boolean) values.get(i); + } + builder.put(attr.key, boolArray); + } + } + } + } + } + + attrs = builder.build(); + return attrs; + } + + @Override + @Deprecated + public io.opentelemetry.sdk.logs.data.Body getBody() { + return new io.opentelemetry.sdk.logs.data.Body() { + @Override + public String asString() { + if (body == null || body.stringValue == null) { + return ""; + } + return body.stringValue; + } + + @Override + public Type getType() { + return io.opentelemetry.sdk.logs.data.Body.Type.STRING; + } + }; + } + + /** + * Returns the empty instrumentation scope. + * + * @return {@link InstrumentationScopeInfo#empty()}. + */ + @Override + public InstrumentationScopeInfo getInstrumentationScopeInfo() { + return InstrumentationScopeInfo.empty(); + } + + @Override + public long getObservedTimestampEpochNanos() { + return observedTimestampEpochNanos; + } + + @Override + public Resource getResource() { + return Resource.empty(); + } + + @Override + public Severity getSeverity() { + if (severity != null) { + return severity; + } + for (Severity s : Severity.values()) { + if (s.getSeverityNumber() == severityNumber) { + severity = s; + return severity; + } + } + + return Severity.UNDEFINED_SEVERITY_NUMBER; + } + + @Override + public String getSeverityText() { + if (severityText == null || severityText.isEmpty()) { + return getSeverity().name(); + } + return severityText; + } + + @Override + public SpanContext getSpanContext() { + if (traceId == null || traceId.isEmpty() || spanId == null || spanId.isEmpty()) { + return SpanContext.getInvalid(); + } + return SpanContext.create(traceId, spanId, TraceFlags.getDefault(), TraceState.getDefault()); + } + + @Override + public long getTimestampEpochNanos() { + return timestampEpochNanos; + } + + @Override + public int getTotalAttributeCount() { + if (droppedAttributesCount < 0) { + return getAttributes().size() + droppedAttributesCount; + } + return getAttributes().size(); + } +} diff --git a/exporters/otlp/common/src/test/java/io/opentelemetry/exporter/internal/otlp/logs/AuditLogFileStoreTest.java b/exporters/otlp/common/src/test/java/io/opentelemetry/exporter/internal/otlp/logs/AuditLogFileStoreTest.java new file mode 100644 index 00000000000..608b1a05f0f --- /dev/null +++ b/exporters/otlp/common/src/test/java/io/opentelemetry/exporter/internal/otlp/logs/AuditLogFileStoreTest.java @@ -0,0 +1,384 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.internal.otlp.logs; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; + +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.AttributeType; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.common.AttributesBuilder; +import io.opentelemetry.api.logs.Severity; +import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.api.trace.SpanId; +import io.opentelemetry.api.trace.TraceFlags; +import io.opentelemetry.api.trace.TraceId; +import io.opentelemetry.api.trace.TraceState; +import io.opentelemetry.sdk.common.InstrumentationScopeInfo; +import io.opentelemetry.sdk.logs.data.LogRecordData; +import io.opentelemetry.sdk.logs.export.AuditLogStore; +import io.opentelemetry.sdk.resources.Resource; +import io.opentelemetry.sdk.testing.logs.TestLogRecordData; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.concurrent.TimeUnit; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +class AuditLogFileStoreTest { + + @TempDir Path tempDir; + + private AuditLogStore auditLogStore; + private Path logFilePath; + + @BeforeEach + void setUp() throws IOException { + logFilePath = tempDir.resolve("test-audit.log"); + auditLogStore = new AuditLogFileStore(logFilePath); + } + + @Test + void constructor_withFilePath_createsFile() throws IOException { + Path newLogFile = tempDir.resolve("new-audit.log"); + + new AuditLogFileStore(newLogFile.toString()); + + assertThat(Files.exists(newLogFile)).isTrue(); + } + + @Test + void constructor_withDirectory_createsDefaultFile() throws IOException { + Path dirPath = tempDir.resolve("audit-logs"); + Files.createDirectories(dirPath); + + new AuditLogFileStore(dirPath); + + assertThat(Files.exists(dirPath.resolve("audit.log"))).isTrue(); + } + + @Test + void constructor_withNonExistentDirectory_createsDirectoriesAndFile() throws IOException { + Path dirPath = + tempDir + .resolve("nested") + .resolve("audit-logs") + .resolve(AuditLogFileStore.DEFAULT_LOG_FILE_NAME); + Path filePath = dirPath.resolve(AuditLogFileStore.DEFAULT_LOG_FILE_NAME); + + new AuditLogFileStore(filePath); + + assertThat(Files.exists(dirPath.resolve(AuditLogFileStore.DEFAULT_LOG_FILE_NAME))).isTrue(); + } + + @Test + void save_singleLogRecord_storesSuccessfully() throws IOException { + LogRecordData logRecord = createTestLogRecord("Test message", Severity.INFO, 1000L); + + auditLogStore.save(logRecord); + + Collection storedRecords = auditLogStore.getAll(); + assertThat(storedRecords).hasSize(1); + // Note: The current implementation returns a basic LogRecordData with limited field mapping + assertThat(storedRecords.iterator().next().getBodyValue().getValue().toString()) + .contains("Test message"); + } + + @Test + void save_duplicateLogRecord_ignoresDuplicate() throws IOException { + LogRecordData logRecord = createTestLogRecord("Test message", Severity.INFO, 1000L); + + auditLogStore.save(logRecord); + auditLogStore.save(logRecord); // Same record + + Collection storedRecords = auditLogStore.getAll(); + assertThat(storedRecords).hasSize(1); + } + + @Test + void save_multipleUniqueLogRecords_storesAll() throws IOException { + LogRecordData logRecord1 = createTestLogRecord("Message 1", Severity.INFO, 1000L); + LogRecordData logRecord2 = createTestLogRecord("Message 2", Severity.WARN, 2000L); + LogRecordData logRecord3 = createTestLogRecord("Message 3", Severity.ERROR, 3000L); + + auditLogStore.save(logRecord1); + auditLogStore.save(logRecord2); + auditLogStore.save(logRecord3); + + Collection storedRecords = auditLogStore.getAll(); + assertThat(storedRecords).hasSize(3); + } + + @Test + void save_concurrentWrites_handlesCorrectly() { + LogRecordData logRecord1 = createTestLogRecord("Concurrent 1", Severity.INFO, 1000L); + LogRecordData logRecord2 = createTestLogRecord("Concurrent 2", Severity.WARN, 2000L); + + assertDoesNotThrow( + () -> { + Thread thread1 = + new Thread( + () -> { + try { + auditLogStore.save(logRecord1); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + + Thread thread2 = + new Thread( + () -> { + try { + auditLogStore.save(logRecord2); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + + thread1.start(); + thread2.start(); + thread1.join(); + thread2.join(); + }); + + Collection storedRecords = auditLogStore.getAll(); + assertThat(storedRecords).hasSize(2); + } + + @Test + void getAll_emptyStore_returnsEmptyCollection() { + Collection records = auditLogStore.getAll(); + + assertThat(records).isEmpty(); + } + + @Test + void getAll_withStoredRecords_returnsAllRecords() throws IOException { + LogRecordData logRecord1 = createTestLogRecord("Message 1", Severity.INFO, 1000L); + LogRecordData logRecord2 = createTestLogRecord("Message 2", Severity.WARN, 2000L); + + auditLogStore.save(logRecord1); + auditLogStore.save(logRecord2); + + Collection records = auditLogStore.getAll(); + + assertThat(records).hasSize(2); + } + + @Test + void removeAll_emptyCollection_doesNotThrow() { + assertDoesNotThrow(() -> auditLogStore.removeAll(Collections.emptyList())); + } + + @Test + void removeAll_nonExistentRecords_doesNotThrow() { + LogRecordData nonExistentRecord = createTestLogRecord("Non-existent", Severity.INFO, 9999L); + + assertDoesNotThrow(() -> auditLogStore.removeAll(Arrays.asList(nonExistentRecord))); + } + + @Test + void removeAll_existingRecords_removesSuccessfully() throws IOException { + LogRecordData logRecord1 = createTestLogRecord("Message 1", Severity.INFO, 1000L); + LogRecordData logRecord2 = createTestLogRecord("Message 2", Severity.WARN, 2000L); + LogRecordData logRecord3 = createTestLogRecord("Message 3", Severity.ERROR, 3000L); + + // Save all records + auditLogStore.save(logRecord1); + auditLogStore.save(logRecord2); + auditLogStore.save(logRecord3); + + // Remove two records + auditLogStore.removeAll(Arrays.asList(logRecord1, logRecord3)); + + Collection remainingRecords = auditLogStore.getAll(); + assertThat(remainingRecords).hasSize(1); + assertThat(remainingRecords.iterator().next().getSeverityText()) + .contains(String.valueOf(Severity.WARN)); + } + + @Test + void removeAll_allRecords_leavesEmptyStore() throws IOException { + LogRecordData logRecord1 = createTestLogRecord("Message 1", Severity.INFO, 1000L); + LogRecordData logRecord2 = createTestLogRecord("Message 2", Severity.WARN, 2000L); + + auditLogStore.save(logRecord1); + auditLogStore.save(logRecord2); + + auditLogStore.removeAll(Arrays.asList(logRecord1, logRecord2)); + + Collection remainingRecords = auditLogStore.getAll(); + assertThat(remainingRecords).isEmpty(); + } + + @Test + void interfaceContract_saveRetrieveRemove_worksEndToEnd() throws IOException { + // Test the complete AuditLogStore interface contract + LogRecordData logRecord1 = createTestLogRecord("End-to-end 1", Severity.INFO, 1000L); + LogRecordData logRecord2 = createTestLogRecord("End-to-end 2", Severity.WARN, 2000L); + LogRecordData logRecord3 = createTestLogRecord("End-to-end 3", Severity.ERROR, 3000L); + + // Initially empty + assertThat(auditLogStore.getAll()).isEmpty(); + + // Save records + auditLogStore.save(logRecord1); + auditLogStore.save(logRecord2); + auditLogStore.save(logRecord3); + + // Verify all saved + assertThat(auditLogStore.getAll()).hasSize(3); + + // Remove some records + auditLogStore.removeAll(Arrays.asList(logRecord1, logRecord3)); + + // Verify only one remains + Collection remainingRecords = auditLogStore.getAll(); + assertThat(remainingRecords).hasSize(1); + + // Remove all remaining + auditLogStore.removeAll(remainingRecords); + + // Verify empty + assertThat(auditLogStore.getAll()).isEmpty(); + } + + @Test + void generateRecordId_sameContent_generatesSameId() { + LogRecordData logRecord1 = createTestLogRecord("Same message", Severity.INFO, 1000L); + LogRecordData logRecord2 = createTestLogRecord("Same message", Severity.INFO, 1000L); + + String id1 = AuditLogFileStore.generateRecordId(logRecord1); + String id2 = AuditLogFileStore.generateRecordId(logRecord2); + + assertThat(id1).isEqualTo(id2); + } + + @Test + void generateRecordId_differentContent_generatesDifferentIds() { + LogRecordData logRecord1 = createTestLogRecord("Message 1", Severity.INFO, 1000L); + LogRecordData logRecord2 = createTestLogRecord("Message 2", Severity.INFO, 1000L); + + String id1 = AuditLogFileStore.generateRecordId(logRecord1); + String id2 = AuditLogFileStore.generateRecordId(logRecord2); + + assertThat(id1).isNotEqualTo(id2); + } + + @Test + void persistence_acrossInstances_maintainsData() throws IOException { + LogRecordData logRecord = createTestLogRecord("Persistent message", Severity.INFO, 1000L); + + // Save with first instance + auditLogStore.save(logRecord); + + // Create new instance pointing to same file + AuditLogStore newInstance = new AuditLogFileStore(logFilePath); + + // Verify data is still there + Collection records = newInstance.getAll(); + assertThat(records).hasSize(1); + } + + static LogRecordData createTestLogRecord(String message, Severity severity, long timestampNanos) { + + AttributesBuilder attr = Attributes.builder(); + // iterate over all available AttributeTypes and create attributes for each type + for (AttributeType type : AttributeType.values()) { + switch (type) { + case STRING: + attr.put(AttributeKey.stringKey("stringAttr"), "stringValue"); + break; + case LONG: + attr.put(AttributeKey.longKey("longAttr"), 12345L); + break; + case DOUBLE: + attr.put(AttributeKey.doubleKey("doubleAttr"), 123.45); + break; + case BOOLEAN: + attr.put(AttributeKey.booleanKey("booleanAttr"), true); + break; + case STRING_ARRAY: + attr.put( + AttributeKey.stringArrayKey("stringArrayAttr"), Arrays.asList("one", "two", "three")); + break; + case LONG_ARRAY: + attr.put(AttributeKey.longArrayKey("longArrayAttr"), Arrays.asList(1L, 2L, 3L)); + break; + case DOUBLE_ARRAY: + attr.put(AttributeKey.doubleArrayKey("doubleArrayAttr"), Arrays.asList(1.1, 2.2, 3.3)); + break; + case BOOLEAN_ARRAY: + attr.put( + AttributeKey.booleanArrayKey("booleanArrayAttr"), Arrays.asList(true, false, true)); + break; + } + } + + return TestLogRecordData.builder() + .setResource(Resource.getDefault()) + .setInstrumentationScopeInfo(InstrumentationScopeInfo.create("test")) + .setTimestamp(timestampNanos, TimeUnit.NANOSECONDS) + .setObservedTimestamp(timestampNanos, TimeUnit.NANOSECONDS) + .setSeverity(severity) + .setSeverityText(severity.name()) + .setBody(message) + .setAttributes(attr.build()) + .setTotalAttributeCount(attr.build().size()) + .setSpanContext( + SpanContext.create( + TraceId.fromLongs(0, 1), + SpanId.fromLong(1), + TraceFlags.getDefault(), + TraceState.getDefault())) + .build(); + } + + /* + * Let's test our deserialization logic as well, by first serializing a LogRecordData to JSON using the SDK and then deserializing it back using our JsonLogRecordData class. + */ + @Test + void serializationDeserialization_cycle_worksCorrectly() throws IOException { + LogRecordData logRecord1 = createTestLogRecord("End-to-end 1", Severity.INFO, 1000L); + LogRecordData logRecord2 = createTestLogRecord("End-to-end 2", Severity.WARN, 2000L); + LogRecordData logRecord3 = createTestLogRecord("End-to-end 3", Severity.ERROR, 3000L); + + // Initially empty + assertThat(auditLogStore.getAll()).isEmpty(); + + // Save records + auditLogStore.save(logRecord1); + auditLogStore.save(logRecord2); + auditLogStore.save(logRecord3); + + // Verify all saved + Collection storedRecords = auditLogStore.getAll(); + assertThat(storedRecords).hasSize(3); + + // create another AuditLogFileStore instance to write to the same file + Path otherFilePath = tempDir.resolve("other-audit.log"); + AuditLogStore anotherInstance = new AuditLogFileStore(otherFilePath); + for (LogRecordData record : storedRecords) { + anotherInstance.save(record); + } + + // Verify that both files have the same byte content + byte[] originalFileBytes = Files.readAllBytes(logFilePath); + byte[] otherFileBytes = Files.readAllBytes(otherFilePath); + + // Uncomment to debug content differences + // System.out.println("Org file content:\n" + new String(originalFileBytes)); + // System.out.println("Oth file content:\n" + new String(otherFileBytes)); + assertThat(originalFileBytes).isEqualTo(otherFileBytes); + } +} diff --git a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/AuditException.java b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/AuditException.java new file mode 100644 index 00000000000..02330524dde --- /dev/null +++ b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/AuditException.java @@ -0,0 +1,37 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.logs.export; + +import io.opentelemetry.context.Context; +import io.opentelemetry.sdk.logs.data.LogRecordData; +import java.util.Collection; +import java.util.Collections; +import javax.annotation.Nullable; + +public class AuditException extends RuntimeException { + + private static final long serialVersionUID = 5791873097754062413L; + + @Nullable public Context context; + + public Collection logRecords; + + private AuditException(@Nullable String message, @Nullable Throwable cause) { + super(message, cause); + logRecords = Collections.emptyList(); + } + + AuditException(String message, @Nullable Throwable cause, Collection logs) { + this(message, cause); + this.logRecords = logs; + } + + AuditException(Throwable cause, Context context, Collection logs) { + super(cause); + this.logRecords = logs; + this.context = context; + } +} diff --git a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/AuditExceptionHandler.java b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/AuditExceptionHandler.java new file mode 100644 index 00000000000..c9fe7ec60aa --- /dev/null +++ b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/AuditExceptionHandler.java @@ -0,0 +1,11 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.logs.export; + +public interface AuditExceptionHandler { + + void handle(AuditException exception); +} diff --git a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/AuditLogRecordProcessor.java b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/AuditLogRecordProcessor.java new file mode 100644 index 00000000000..ee2f4b1c747 --- /dev/null +++ b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/AuditLogRecordProcessor.java @@ -0,0 +1,421 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.logs.export; + +import io.opentelemetry.context.Context; +import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.common.export.RetryPolicy; +import io.opentelemetry.sdk.logs.LogRecordProcessor; +import io.opentelemetry.sdk.logs.ReadWriteLogRecord; +import io.opentelemetry.sdk.logs.data.LogRecordData; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Locale; +import java.util.Queue; +import java.util.concurrent.Executors; +import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import javax.annotation.Nullable; + +/** + * An implementation of {@link LogRecordProcessor} that processes logs for auditing purposes. It has + * an automatic retry mechanism implemented, in case the exporter takes longer or throws exceptions. + * Internally it uses a priority (-> LogLevel) queue for the log messages and the given {@link + * AuditLogStore} for persistence, which holds the logs until they are successfully exported. At + * startup, it loads all previously persisted logs from the {@link AuditLogStore} and adds them to + * the queue for processing. Logs with higher severity are processed first. The processor + * periodically checks the queue and exports logs in batches, regardless of the queue size. If the + * queue reaches a certain size, it triggers an immediate export. If an export fails, it retries + * exporting the logs with exponential backoff. When the maximum number of retry attempts is + * reached, it either throws an {@link AuditException} or calls the provided {@link + * AuditExceptionHandler}. + */ +public final class AuditLogRecordProcessor implements LogRecordProcessor { + + /** + * Returns a new Builder for {@link AuditLogRecordProcessor}. + * + * @param logRecordExporter the {@link LogRecordExporter} to which the Logs are pushed. + * OtlpGrpcLogRecordExporter is a good choice. + * @return a new {@link AuditLogRecordProcessor}. + * @throws NullPointerException if the {@code logRecordExporter} is {@code null}. + */ + public static AuditLogRecordProcessorBuilder builder( + LogRecordExporter logRecordExporter, AuditLogStore logStore) { + return new AuditLogRecordProcessorBuilder(logRecordExporter, logStore); + } + + /** The exporter to export logs. OtlpGrpcLogRecordExporter is recommended. */ + private final LogRecordExporter exporter; + + /** The exception handler to handle exceptions during log export. */ + @Nullable private final AuditExceptionHandler handler; + + @Nullable private CompletableResultCode lastResultCode; + + /** The persistent storage for logs. */ + private final AuditLogStore persistency; + + /** The PriorityBlockingQueue to hold the logs before exporting. */ + private final Queue queue; + + /** A flag to indicate whether the processor is shutdown. */ + private final AtomicBoolean shutdown = new AtomicBoolean(false); + + /** The maximum number of logs to export in a single batch. */ + private final int size; + + /** The timeout for exporting logs in nanoseconds. */ + private final long timeout; + + /** The retry policy for handling failed exports. */ + private final RetryPolicy retryPolicy; + + /** Current retry attempt counter. */ + private final AtomicInteger currentRetryAttempt = new AtomicInteger(0); + + /** Last retry timestamp to implement backpressure. */ + private final AtomicLong lastRetryTimestamp = new AtomicLong(0); + + /** Whether to wait for each export try to complete. */ + private final boolean waitOnExport; + + /** + * Creates a new AuditLogRecordProcessor with the given parameters. + * + * @param logRecordExporter the {@link LogRecordExporter} to which the Logs are pushed. + * @param exceptionHandler the {@link AuditExceptionHandler} to handle exceptions during log + * export. + * @param scheduleDelayNanos the delay in nanoseconds between periodic exports. + * @param maxExportBatchSize the maximum number of logs to export in a single batch. + * @param exporterTimeoutNanos the timeout for exporting logs in nanoseconds. + * @param retryPolicy the retry policy for handling failed exports. + * @param waitOnExport whether to wait for the export to complete. + */ + AuditLogRecordProcessor( + LogRecordExporter logRecordExporter, + @Nullable AuditExceptionHandler exceptionHandler, + AuditLogStore logStore, + long scheduleDelayNanos, + int maxExportBatchSize, + long exporterTimeoutNanos, + RetryPolicy retryPolicy, + boolean waitOnExport) { + exporter = logRecordExporter; + size = maxExportBatchSize; + timeout = exporterTimeoutNanos; + handler = exceptionHandler; + this.retryPolicy = retryPolicy; + this.waitOnExport = waitOnExport; + queue = + new PriorityBlockingQueue<>( + maxExportBatchSize, + (record1, record2) -> { + // compare by severity, higher severity first + return Integer.compare( + record2.getSeverity().getSeverityNumber(), + record1.getSeverity().getSeverityNumber()); + }); + persistency = logStore; + + // Get all logs from persistent storage and add them to the queue + queue.addAll(persistency.getAll()); + exportLogs(); // export logs immediately to ensure no logs are missed + + scheduler = Executors.newSingleThreadScheduledExecutor(); + future = + scheduler.scheduleAtFixedRate( + () -> { + exportLogs(); // export logs periodically, regardless of the queue size + }, + scheduleDelayNanos, + scheduleDelayNanos, + TimeUnit.NANOSECONDS); + } + + private final ScheduledFuture future; + private final ScheduledExecutorService scheduler; + + /** + * Exports logs from the queue to the exporter. If the queue is empty, it does nothing. If the + * export fails, it retries exporting logs with exponential backoff and handles exceptions using + * the provided handler. + */ + void exportLogs() { + if (queue.isEmpty()) { + return; + } + + // Check if we're in a backpressure period + long currentTime = System.currentTimeMillis(); + if (currentRetryAttempt.get() > 0) { + long timeSinceLastRetry = currentTime - lastRetryTimestamp.get(); + long requiredDelay = calculateRetryDelay(currentRetryAttempt.get()); + + if (timeSinceLastRetry < requiredDelay) { + // Still in backpressure period, skip this export attempt + return; + } + } + + Collection results = new ArrayList<>(); + Collection allFailedLogs = new ArrayList<>(); + + while (!queue.isEmpty()) { + // Create a collection of LogRecordData from the queue with a maximum size + Object[] arr = queue.stream().limit(size).toArray(); + Collection logs = new ArrayList<>(arr.length); + for (Object o : arr) { + logs.add((LogRecordData) o); + } + + CompletableResultCode export = tryExport(logs); + // lastResultCode = export; + results.add(export); + + export.whenComplete( + () -> { + if (export.isSuccess()) { + // Reset retry counter on successful export + currentRetryAttempt.set(0); + lastRetryTimestamp.set(0); + persistency.removeAll(logs); + } else { + allFailedLogs.addAll(logs); + // if (handler != null) { + // handler.handle(new AuditException("Export failed", export.getFailureThrowable(), + // logs)); + // } + } + }); + + // Remove logs from queue regardless of success/failure to prevent infinite loops + queue.removeAll(logs); + } + + CompletableResultCode all = CompletableResultCode.ofAll(results); + if (waitOnExport) { + all.join(timeout * results.size(), TimeUnit.NANOSECONDS); + } + + if (all.isDone() && !all.isSuccess()) { + if (waitOnExport) { + // Export failed, prepare for retry if attempts remain + if (currentRetryAttempt.getAndAdd(1) < retryPolicy.getMaxAttempts()) { + lastRetryTimestamp.set(System.currentTimeMillis()); + queue.addAll(allFailedLogs); + try { + Thread.sleep(calculateRetryDelay(currentRetryAttempt.get())); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + exportLogs(); // retry exporting failed logs + return; + } + } + // only, when we wait on export, we can really retry + handleExportFailure(allFailedLogs, all.getFailureThrowable()); + } + + lastResultCode = all; + } + + /** + * Exports logs with retry logic and exponential backoff. + * + * @param logs the collection of logs to export + * @return CompletableResultCode representing the final result after all retry attempts + */ + private CompletableResultCode tryExport(Collection logs) { + CompletableResultCode lastResult = CompletableResultCode.ofFailure(); + try { + lastResult = exporter.export(logs); + // Wait for the export to complete with timeout + if (waitOnExport) { + lastResult.join(timeout, TimeUnit.NANOSECONDS); + } + } catch (RuntimeException e) { + return CompletableResultCode.ofExceptionalFailure(e); + } + return lastResult; + } + + /** + * Calculates the retry delay using exponential backoff with jitter. + * + * @param attemptNumber the current attempt number (1-based) + * @return delay in milliseconds + */ + private long calculateRetryDelay(int attemptNumber) { + long delay = + (long) + (retryPolicy.getInitialBackoff().toMillis() + * Math.pow(retryPolicy.getBackoffMultiplier(), attemptNumber - 1)); + + // Cap the delay to maximum + delay = Math.min(delay, retryPolicy.getMaxBackoff().toMillis()); + + // Add jitter to prevent thundering herd (±25% random variation) + double jitter = 0.25 * delay * (Math.random() - 0.5); + delay += (long) jitter; + + return Math.max(delay, 0); // Ensure non-negative delay + } + + /** + * Handles export failure by updating retry state and potentially throwing exceptions. + * + * @param failedLogs the logs that failed to export + * @param cause the cause of the failure + */ + private void handleExportFailure( + Collection failedLogs, @Nullable Throwable cause) { + if (!waitOnExport && handler != null) { + handler.handle(new AuditException("Export failed", cause, failedLogs)); + return; + } + if (currentRetryAttempt.get() < retryPolicy.getMaxAttempts()) { + // If retries haven't been exhausted, the retry logic will handle the next attempt + return; + } + + // Max retries exceeded, reset counter and handle as final failure + currentRetryAttempt.set(0); + lastRetryTimestamp.set(0); + + String message = + String.format( + Locale.ENGLISH, + "Export failed after %d retry attempts. Last error: %s", + retryPolicy.getMaxAttempts(), + cause != null ? cause.getMessage() : "Unknown error"); + + if (handler != null) { + handler.handle(new AuditException(message, cause, failedLogs)); + } else { + throw new AuditException(message, cause, failedLogs); + } + } + + /** + * Exports logs immediately, regardless of the queue size. This is useful for flushing logs when + * the processor is shutdown. + * + * @return {@link CompletableResultCode#ofSuccess()}. + */ + @Override + public CompletableResultCode forceFlush() { + exportLogs(); + return lastResultCode != null ? lastResultCode : CompletableResultCode.ofSuccess(); + } + + /** + * Returns the last export operation ({@link #exportLogs()}) result. If the processor was never + * triggered, it returns null. Useful to wait for the last export to finish via + * {@link CompletableResultCode#join(long, TimeUnit)}. + * + * @return CompletableResultCode from {@link LogRecordExporter#export(Collection)} of {@link + * #exporter}. + */ + @Nullable + public CompletableResultCode getLastResultCode() { + return lastResultCode; + } + + /** + * Accepts a log record and adds it to the queue. If the processor is shutdown, it throws an + * exception or calls the handler. + * + * @param context the context of the log record. + * @param logRecord the log record to be processed. + */ + @Override + public void onEmit(Context context, ReadWriteLogRecord logRecord) { + if (logRecord == null) { + return; + } + + if (shutdown.get()) { + AuditException exception = + new AuditException( + new IllegalStateException( + "AuditLogRecordProcessor is shutdown, cannot accept new logs."), + context, + Collections.singletonList(logRecord.toLogRecordData())); + if (handler != null) { + handler.handle(exception); + } else { + throw exception; + } + } + + try { + LogRecordData data = logRecord.toLogRecordData(); + persistency.save(data); + queue.add(data); + } catch (IOException e) { + AuditException exception = + new AuditException(e, context, Collections.singletonList(logRecord.toLogRecordData())); + if (handler != null) { + handler.handle(exception); + } else { + throw exception; + } + } + + if (queue.size() >= size) { + // when we have reached certain size, we export logs immediately + exportLogs(); + } + } + + /** + * Shuts down the processor. This method will export all remaining logs in the queue before + * shutting down. If this method is called multiple times, it will only export logs once. + * + * @return {@link CompletableResultCode#ofSuccess()}. + */ + @Override + public CompletableResultCode shutdown() { + if (!shutdown.getAndSet(true)) { + // First time shutdown is called, we export all remaining logs + future.cancel(false); + scheduler.shutdown(); + return forceFlush(); + } + return lastResultCode != null ? lastResultCode : CompletableResultCode.ofSuccess(); + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("AuditLogRecordProcessor [exporter="); + builder.append(exporter); + builder.append(", handler="); + builder.append(handler); + builder.append(", queue="); + builder.append(queue); + builder.append(", shutdown="); + builder.append(shutdown); + builder.append(", size="); + builder.append(size); + builder.append(", timeout="); + builder.append(timeout); + builder.append(", retryPolicy="); + builder.append(retryPolicy); + builder.append(", persistency="); + builder.append(persistency); + builder.append("]"); + return builder.toString(); + } +} diff --git a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/AuditLogRecordProcessorBuilder.java b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/AuditLogRecordProcessorBuilder.java new file mode 100644 index 00000000000..9692f5088af --- /dev/null +++ b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/AuditLogRecordProcessorBuilder.java @@ -0,0 +1,169 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.logs.export; + +import static io.opentelemetry.api.internal.Utils.checkArgument; +import static java.util.Objects.requireNonNull; + +import io.opentelemetry.sdk.common.export.RetryPolicy; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +/** + * Builder class for {@link AuditLogRecordProcessor}. + * + * @since 1.27.0 + */ +public final class AuditLogRecordProcessorBuilder { + + // Visible for testing + public static final int DEFAULT_EXPORT_TIMEOUT_MILLIS = 30_000; + // Visible for testing + public static final int DEFAULT_MAX_EXPORT_BATCH_SIZE = 512; + // Visible for testing + public static final long DEFAULT_SCHEDULE_DELAY_MILLIS = 1000; + + @Nullable private AuditExceptionHandler exceptionHandler; + + private long exporterTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(DEFAULT_EXPORT_TIMEOUT_MILLIS); + + @Nonnull private final LogRecordExporter logRecordExporter; + + @Nonnull private final AuditLogStore logStore; + + private int maxExportBatchSize = DEFAULT_MAX_EXPORT_BATCH_SIZE; + + private RetryPolicy retryPolicy = RetryPolicy.getDefault(); + + private long scheduleDelayNanos = TimeUnit.MILLISECONDS.toNanos(DEFAULT_SCHEDULE_DELAY_MILLIS); + + private boolean waitOnExport = false; + + AuditLogRecordProcessorBuilder( + @Nonnull LogRecordExporter logRecordExporter, @Nonnull AuditLogStore logStore) { + this.logRecordExporter = requireNonNull(logRecordExporter, "logRecordExporter"); + this.logStore = requireNonNull(logStore, "logStore"); + } + + /** + * Returns a new {@link AuditLogRecordProcessor} that batches, then forwards them to the given + * {@code logRecordExporter}. + * + * @return a new {@link AuditLogRecordProcessor}. + */ + public AuditLogRecordProcessor build() { + return new AuditLogRecordProcessor( + logRecordExporter, + exceptionHandler, + logStore, + scheduleDelayNanos, + maxExportBatchSize, + exporterTimeoutNanos, + retryPolicy, + waitOnExport); + } + + @Nullable + AuditExceptionHandler getExceptionHandler() { + return exceptionHandler; + } + + // Visible for testing + long getExporterTimeoutNanos() { + return exporterTimeoutNanos; + } + + AuditLogStore getLogStore() { + return logStore; + } + + // Visible for testing + int getMaxExportBatchSize() { + return maxExportBatchSize; + } + + // Visible for testing + RetryPolicy getRetryPolicy() { + return retryPolicy; + } + + // Visible for testing + long getScheduleDelayNanos() { + return scheduleDelayNanos; + } + + boolean isWaitOnExport() { + return waitOnExport; + } + + public AuditLogRecordProcessorBuilder setExceptionHandler( + @Nonnull AuditExceptionHandler exceptionHandler) { + requireNonNull(exceptionHandler, "exceptionHandler"); + this.exceptionHandler = exceptionHandler; + return this; + } + + /** + * Sets the maximum time an export will be allowed to run before being cancelled. If unset, + * defaults to {@value DEFAULT_EXPORT_TIMEOUT_MILLIS}ms. + */ + public AuditLogRecordProcessorBuilder setExporterTimeout(long timeout, @Nonnull TimeUnit unit) { + requireNonNull(unit, "unit"); + checkArgument(timeout >= 0, "timeout must be non-negative"); + exporterTimeoutNanos = timeout == 0 ? Long.MAX_VALUE : unit.toNanos(timeout); + return this; + } + + /** + * Sets the maximum batch size for every export. This must be smaller or equal to {@code + * maxQueueSize}. + * + *

Default value is {@code 512}. + * + * @param maxExportBatchSize the maximum batch size for every export. + * @return this. + * @see AuditLogRecordProcessorBuilder#DEFAULT_MAX_EXPORT_BATCH_SIZE + */ + public AuditLogRecordProcessorBuilder setMaxExportBatchSize(int maxExportBatchSize) { + checkArgument(maxExportBatchSize > 0, "maxExportBatchSize must be positive."); + this.maxExportBatchSize = maxExportBatchSize; + return this; + } + + /** + * Sets the retry policy for failed exports. If unset, defaults to {@link + * RetryPolicy#getDefault()}. + * + * @param retryPolicy the retry policy to use for failed exports + * @return this + */ + public AuditLogRecordProcessorBuilder setRetryPolicy(@Nonnull RetryPolicy retryPolicy) { + requireNonNull(retryPolicy, "retryPolicy"); + this.retryPolicy = retryPolicy; + return this; + } + + /** + * Sets the delay interval between two consecutive exports. If unset, defaults to {@value + * DEFAULT_SCHEDULE_DELAY_MILLIS}ms. + */ + public AuditLogRecordProcessorBuilder setScheduleDelay(long delay, TimeUnit unit) { + requireNonNull(unit, "unit"); + checkArgument(delay >= 0, "delay must be non-negative"); + scheduleDelayNanos = unit.toNanos(delay); + return this; + } + + /** + * Sets whether to wait for the export to complete before processing new logs. If unset, defaults + * to {@code false}. + */ + public AuditLogRecordProcessorBuilder setWaitOnExport(boolean waitOnExport) { + this.waitOnExport = waitOnExport; + return this; + } +} diff --git a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/AuditLogStore.java b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/AuditLogStore.java new file mode 100644 index 00000000000..e27aa94b328 --- /dev/null +++ b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/AuditLogStore.java @@ -0,0 +1,19 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.logs.export; + +import io.opentelemetry.sdk.logs.data.LogRecordData; +import java.io.IOException; +import java.util.Collection; + +public interface AuditLogStore { + + void save(LogRecordData logRecord) throws IOException; + + void removeAll(Collection logs); + + Collection getAll(); +} diff --git a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/MultiLogRecordExporter.java b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/MultiLogRecordExporter.java index 33d1a1b726a..0fa6a97a0e8 100644 --- a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/MultiLogRecordExporter.java +++ b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/MultiLogRecordExporter.java @@ -36,23 +36,21 @@ private MultiLogRecordExporter(LogRecordExporter[] logRecordExporters) { * @return the aggregate log exporter */ static LogRecordExporter create(List logRecordExporters) { - return new MultiLogRecordExporter(logRecordExporters.toArray(new LogRecordExporter[0])); + return new MultiLogRecordExporter( + logRecordExporters.toArray(new LogRecordExporter[logRecordExporters.size()])); } @Override public CompletableResultCode export(Collection logs) { List results = new ArrayList<>(logRecordExporters.length); for (LogRecordExporter logRecordExporter : logRecordExporters) { - CompletableResultCode exportResult; try { - exportResult = logRecordExporter.export(logs); + results.add(logRecordExporter.export(logs)); } catch (RuntimeException e) { // If an exception was thrown by the exporter logger.log(Level.WARNING, "Exception thrown by the export.", e); - results.add(CompletableResultCode.ofFailure()); - continue; + results.add(CompletableResultCode.ofExceptionalFailure(e)); } - results.add(exportResult); } return CompletableResultCode.ofAll(results); } @@ -66,16 +64,13 @@ public CompletableResultCode export(Collection logs) { public CompletableResultCode flush() { List results = new ArrayList<>(logRecordExporters.length); for (LogRecordExporter logRecordExporter : logRecordExporters) { - CompletableResultCode flushResult; try { - flushResult = logRecordExporter.flush(); + results.add(logRecordExporter.flush()); } catch (RuntimeException e) { // If an exception was thrown by the exporter logger.log(Level.WARNING, "Exception thrown by the flush.", e); - results.add(CompletableResultCode.ofFailure()); - continue; + results.add(CompletableResultCode.ofExceptionalFailure(e)); } - results.add(flushResult); } return CompletableResultCode.ofAll(results); } @@ -84,16 +79,13 @@ public CompletableResultCode flush() { public CompletableResultCode shutdown() { List results = new ArrayList<>(logRecordExporters.length); for (LogRecordExporter logRecordExporter : logRecordExporters) { - CompletableResultCode shutdownResult; try { - shutdownResult = logRecordExporter.shutdown(); + results.add(logRecordExporter.shutdown()); } catch (RuntimeException e) { // If an exception was thrown by the exporter logger.log(Level.WARNING, "Exception thrown by the shutdown.", e); - results.add(CompletableResultCode.ofFailure()); - continue; + results.add(CompletableResultCode.ofExceptionalFailure(e)); } - results.add(shutdownResult); } return CompletableResultCode.ofAll(results); } diff --git a/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/export/AuditLogRecordProcessorTest.java b/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/export/AuditLogRecordProcessorTest.java new file mode 100644 index 00000000000..40cbe48c56f --- /dev/null +++ b/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/export/AuditLogRecordProcessorTest.java @@ -0,0 +1,554 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.logs.export; + +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.AssertionsForClassTypes.assertThatCode; +import static org.awaitility.Awaitility.await; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import io.opentelemetry.internal.testing.slf4j.SuppressLogger; +import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.common.export.RetryPolicy; +import io.opentelemetry.sdk.logs.SdkLoggerProvider; +import io.opentelemetry.sdk.logs.data.LogRecordData; +import io.opentelemetry.sdk.logs.export.BatchLogRecordProcessorTest.CompletableLogRecordExporter; +import io.opentelemetry.sdk.logs.export.BatchLogRecordProcessorTest.WaitingLogRecordExporter; +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; + +@SuppressWarnings("PreferJavaTimeOverload") +@ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = Strictness.LENIENT) +class AuditLogRecordProcessorTest { + + // Simple in-memory implementation for testing + static class InMemoryAuditLogStore implements AuditLogStore { + private final List logs = new ArrayList<>(); + + @Override + public Collection getAll() { + return logs; + } + + @Override + public void removeAll(Collection logs) { + this.logs.removeAll(logs); + } + + @Override + public void save(LogRecordData logRecord) throws IOException { + logs.add(logRecord); + } + } + + private static final String LOG_MESSAGE_1 = "Hello audit world 1!"; + private static final String LOG_MESSAGE_2 = "Hello audit world 2!"; + + private static final long MAX_SCHEDULE_DELAY_MILLIS = 100; + + @Mock private AuditExceptionHandler mockExceptionHandler; + @Mock private LogRecordExporter mockLogRecordExporter; + @Mock private AuditLogStore mockLogStore; + + @Test + void builderDefaults() { + AuditLogRecordProcessorBuilder builder = + AuditLogRecordProcessor.builder(mockLogRecordExporter, mockLogStore); + assertThat(builder.getScheduleDelayNanos()) + .isEqualTo( + TimeUnit.MILLISECONDS.toNanos( + AuditLogRecordProcessorBuilder.DEFAULT_SCHEDULE_DELAY_MILLIS)); + assertThat(builder.getMaxExportBatchSize()) + .isEqualTo(AuditLogRecordProcessorBuilder.DEFAULT_MAX_EXPORT_BATCH_SIZE); + assertThat(builder.getExporterTimeoutNanos()) + .isEqualTo( + TimeUnit.MILLISECONDS.toNanos( + AuditLogRecordProcessorBuilder.DEFAULT_EXPORT_TIMEOUT_MILLIS)); + assertThat(builder.getRetryPolicy()).isEqualTo(RetryPolicy.getDefault()); + } + + @Test + void builderInvalidConfig() { + assertThatThrownBy( + () -> + AuditLogRecordProcessor.builder(mockLogRecordExporter, mockLogStore) + .setScheduleDelay(-1, TimeUnit.MILLISECONDS)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("delay must be non-negative"); + assertThatThrownBy( + () -> + AuditLogRecordProcessor.builder(mockLogRecordExporter, mockLogStore) + .setScheduleDelay(1, null)) + .isInstanceOf(NullPointerException.class) + .hasMessage("unit"); + assertThatThrownBy( + () -> + AuditLogRecordProcessor.builder(mockLogRecordExporter, mockLogStore) + .setExporterTimeout(-1, TimeUnit.MILLISECONDS)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("timeout must be non-negative"); + assertThatThrownBy( + () -> + AuditLogRecordProcessor.builder(mockLogRecordExporter, mockLogStore) + .setExporterTimeout(1, null)) + .isInstanceOf(NullPointerException.class) + .hasMessage("unit"); + assertThatThrownBy( + () -> + AuditLogRecordProcessor.builder(mockLogRecordExporter, mockLogStore) + .setMaxExportBatchSize(0)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("maxExportBatchSize must be positive."); + assertThatThrownBy( + () -> + AuditLogRecordProcessor.builder(mockLogRecordExporter, mockLogStore) + .setRetryPolicy(null)) + .isInstanceOf(NullPointerException.class) + .hasMessage("retryPolicy"); + } + + private void emitLog(SdkLoggerProvider sdkLoggerProvider, String message) { + sdkLoggerProvider + .loggerBuilder(getClass().getName()) + .build() + .logRecordBuilder() + .setBody(message) + .emit(); + } + + @Test + void emitLogsToMultipleExporters() throws IOException { + WaitingLogRecordExporter waitingLogRecordExporter1 = + new WaitingLogRecordExporter(2, CompletableResultCode.ofSuccess()); + WaitingLogRecordExporter waitingLogRecordExporter2 = + new WaitingLogRecordExporter(2, CompletableResultCode.ofSuccess()); + InMemoryAuditLogStore logStore = new InMemoryAuditLogStore(); + + SdkLoggerProvider sdkLoggerProvider = + SdkLoggerProvider.builder() + .addLogRecordProcessor( + AuditLogRecordProcessor.builder( + LogRecordExporter.composite( + Arrays.asList(waitingLogRecordExporter1, waitingLogRecordExporter2)), + logStore) + .setScheduleDelay(MAX_SCHEDULE_DELAY_MILLIS, TimeUnit.MILLISECONDS) + .build()) + .build(); + + emitLog(sdkLoggerProvider, LOG_MESSAGE_1); + emitLog(sdkLoggerProvider, LOG_MESSAGE_2); + List exported1 = waitingLogRecordExporter1.waitForExport(); + List exported2 = waitingLogRecordExporter2.waitForExport(); + assertThat(exported1) + .hasSize(2) + .satisfiesExactly( + logRecordData -> assertThat(logRecordData).hasBody(LOG_MESSAGE_1), + logRecordData -> assertThat(logRecordData).hasBody(LOG_MESSAGE_2)); + assertThat(exported2) + .hasSize(2) + .satisfiesExactly( + logRecordData -> assertThat(logRecordData).hasBody(LOG_MESSAGE_1), + logRecordData -> assertThat(logRecordData).hasBody(LOG_MESSAGE_2)); + } + + @Test + void emitMoreLogsThanBufferSize() throws IOException { + CompletableLogRecordExporter logRecordExporter = new CompletableLogRecordExporter(); + InMemoryAuditLogStore logStore = new InMemoryAuditLogStore(); + + SdkLoggerProvider sdkLoggerProvider = + SdkLoggerProvider.builder() + .addLogRecordProcessor( + AuditLogRecordProcessor.builder(logRecordExporter, logStore) + .setMaxExportBatchSize(2) + .setScheduleDelay(MAX_SCHEDULE_DELAY_MILLIS, TimeUnit.MILLISECONDS) + .build()) + .build(); + + emitLog(sdkLoggerProvider, LOG_MESSAGE_1); + emitLog(sdkLoggerProvider, LOG_MESSAGE_1); + emitLog(sdkLoggerProvider, LOG_MESSAGE_1); + emitLog(sdkLoggerProvider, LOG_MESSAGE_1); + emitLog(sdkLoggerProvider, LOG_MESSAGE_1); + emitLog(sdkLoggerProvider, LOG_MESSAGE_1); + + logRecordExporter.succeed(); + + await() + .untilAsserted( + () -> + assertThat(logRecordExporter.getExported()) + .hasSize(6) + .allSatisfy(logRecordData -> assertThat(logRecordData).hasBody(LOG_MESSAGE_1))); + } + + @Test + void emitMultipleLogs() throws IOException { + WaitingLogRecordExporter waitingLogRecordExporter = + new WaitingLogRecordExporter(2, CompletableResultCode.ofSuccess()); + InMemoryAuditLogStore logStore = new InMemoryAuditLogStore(); + + SdkLoggerProvider loggerProvider = + SdkLoggerProvider.builder() + .addLogRecordProcessor( + AuditLogRecordProcessor.builder(waitingLogRecordExporter, logStore) + .setScheduleDelay(MAX_SCHEDULE_DELAY_MILLIS, TimeUnit.MILLISECONDS) + .build()) + .build(); + + emitLog(loggerProvider, LOG_MESSAGE_1); + emitLog(loggerProvider, LOG_MESSAGE_2); + List exported = waitingLogRecordExporter.waitForExport(); + assertThat(exported) + .satisfiesExactly( + logRecordData -> assertThat(logRecordData).hasBody(LOG_MESSAGE_1), + logRecordData -> assertThat(logRecordData).hasBody(LOG_MESSAGE_2)); + } + + @Test + void exceptionHandlerCalledOnStorageFailure() throws IOException { + doThrow(new IOException("Storage failed")).when(mockLogStore).save(any()); + + AuditLogRecordProcessor processor = + AuditLogRecordProcessor.builder(mockLogRecordExporter, mockLogStore) + .setExceptionHandler(mockExceptionHandler) + .build(); + + SdkLoggerProvider sdkLoggerProvider = + SdkLoggerProvider.builder().addLogRecordProcessor(processor).build(); + + emitLog(sdkLoggerProvider, LOG_MESSAGE_1); + + verify(mockExceptionHandler, times(1)).handle(any(AuditException.class)); + } + + @Test + @SuppressLogger(MultiLogRecordExporter.class) + void exporterThrowsException() throws Exception { + WaitingLogRecordExporter waitingLogRecordExporter = + new WaitingLogRecordExporter(1, CompletableResultCode.ofSuccess(), 10); + doThrow(new IllegalArgumentException("No export for you.")) + .when(mockLogRecordExporter) + .export(anyList()); + InMemoryAuditLogStore logStore = new InMemoryAuditLogStore(); + + AtomicBoolean wasCalled = new AtomicBoolean(false); + AuditExceptionHandler exceptionHandler = + exception -> { + wasCalled.set(true); + assertThat(exception.logRecords).isNotNull(); + assertThat(exception.logRecords).isNotEmpty(); + }; + + AuditLogRecordProcessor processor = + AuditLogRecordProcessor.builder( + LogRecordExporter.composite( + Arrays.asList(mockLogRecordExporter, waitingLogRecordExporter)), + logStore) + .setScheduleDelay(MAX_SCHEDULE_DELAY_MILLIS, TimeUnit.MILLISECONDS) + .setExceptionHandler(exceptionHandler) + .build(); + SdkLoggerProvider sdkLoggerProvider = + SdkLoggerProvider.builder().addLogRecordProcessor(processor).build(); + + emitLog(sdkLoggerProvider, LOG_MESSAGE_1); + + List exported = waitingLogRecordExporter.waitForExport(); + waitOn(processor); + + assertThat(wasCalled.get()).isTrue(); + + assertThat(exported) + .satisfiesExactly(logRecordData -> assertThat(logRecordData).hasBody(LOG_MESSAGE_1)); + waitingLogRecordExporter.reset(); + wasCalled.set(false); + // Continue to export after the exception was received. + emitLog(sdkLoggerProvider, LOG_MESSAGE_2); + + exported = waitingLogRecordExporter.waitForExport(); + assertThat(exported) + .satisfiesExactly(logRecordData -> assertThat(logRecordData).hasBody(LOG_MESSAGE_2)); + + assertThat(wasCalled.get()).isTrue(); + } + + @Test + void exporterThrowsException2() throws Exception { + doThrow(new IllegalArgumentException("No export for you.")) + .when(mockLogRecordExporter) + .export(anyList()); + InMemoryAuditLogStore logStore = new InMemoryAuditLogStore(); + + AtomicBoolean wasCalled = new AtomicBoolean(false); + AuditExceptionHandler exceptionHandler = + exception -> { + wasCalled.set(true); + assertThat(exception.logRecords).isNotNull(); + assertThat(exception.logRecords).isNotEmpty(); + }; + + AuditLogRecordProcessor processor = + AuditLogRecordProcessor.builder(mockLogRecordExporter, logStore) + .setScheduleDelay(MAX_SCHEDULE_DELAY_MILLIS, TimeUnit.MILLISECONDS) + .setExceptionHandler(exceptionHandler) + .build(); + SdkLoggerProvider sdkLoggerProvider = + SdkLoggerProvider.builder().addLogRecordProcessor(processor).build(); + + emitLog(sdkLoggerProvider, LOG_MESSAGE_1); + + waitOn(processor); + + assertThat(wasCalled.get()).isTrue(); + } + + @Test + void testRetry() throws Exception { + when(mockLogRecordExporter.export(anyList())) + .thenThrow(new IllegalArgumentException("No export for you.")) // first call, fails + .thenThrow(new IllegalArgumentException("No export for you again.")) // second call, fails + .thenReturn(CompletableResultCode.ofSuccess()); + + InMemoryAuditLogStore logStore = new InMemoryAuditLogStore(); + + AtomicBoolean wasCalled = new AtomicBoolean(false); + AuditExceptionHandler exceptionHandler = + exception -> { + wasCalled.set(true); + assertThat(exception.logRecords).isNotNull(); + assertThat(exception.logRecords).isNotEmpty(); + }; + + RetryPolicy retryPolicy = + RetryPolicy.builder() + .setMaxAttempts(3) + .setInitialBackoff(Duration.ofMillis(1)) + .setMaxBackoff(Duration.ofMillis(1)) + .build(); + + AuditLogRecordProcessor processor = + AuditLogRecordProcessor.builder(mockLogRecordExporter, logStore) + .setScheduleDelay(MAX_SCHEDULE_DELAY_MILLIS, TimeUnit.MILLISECONDS) + .setExceptionHandler(exceptionHandler) + .setWaitOnExport(true) // enable waiting on export to ensure retries are attempted + .setMaxExportBatchSize(1) // ensure each log is exported individually + .setRetryPolicy(retryPolicy) + .build(); + SdkLoggerProvider sdkLoggerProvider = + SdkLoggerProvider.builder().addLogRecordProcessor(processor).build(); + + emitLog(sdkLoggerProvider, LOG_MESSAGE_1); + + waitOn(processor); + + assertThat(processor.getLastResultCode().isDone()).isTrue(); + assertThat(processor.getLastResultCode().isSuccess()).isTrue(); + assertThat(wasCalled.get()).isFalse(); // should be false as the third attempt should succeed + } + + @Test + void testRetryFails() throws Exception { + doThrow(new IllegalArgumentException("No export for you.")) + .when(mockLogRecordExporter) + .export(anyList()); + + InMemoryAuditLogStore logStore = new InMemoryAuditLogStore(); + + AtomicBoolean wasCalled = new AtomicBoolean(false); + AuditExceptionHandler exceptionHandler = + exception -> { + wasCalled.set(true); + assertThat(exception.logRecords).isNotNull(); + assertThat(exception.logRecords).isNotEmpty(); + }; + + RetryPolicy retryPolicy = + RetryPolicy.builder() + .setMaxAttempts(2) // Only 1 retry attempt (2 total attempts) + .setInitialBackoff(Duration.ofMillis(1)) + .setMaxBackoff(Duration.ofMillis(1)) + .build(); + + AuditLogRecordProcessor processor = + AuditLogRecordProcessor.builder(mockLogRecordExporter, logStore) + .setScheduleDelay(MAX_SCHEDULE_DELAY_MILLIS, TimeUnit.MILLISECONDS) + .setExceptionHandler(exceptionHandler) + .setWaitOnExport(true) // enable waiting on export to ensure retries are attempted + .setMaxExportBatchSize(1) // ensure each log is exported individually + .setRetryPolicy(retryPolicy) + .build(); + SdkLoggerProvider sdkLoggerProvider = + SdkLoggerProvider.builder().addLogRecordProcessor(processor).build(); + + emitLog(sdkLoggerProvider, LOG_MESSAGE_1); + + waitOn(processor); + + assertThat(wasCalled.get()).isTrue(); // should be true as all retry attempts fail + assertThat(processor.getLastResultCode().isDone()).isTrue(); + assertThat(processor.getLastResultCode().isSuccess()).isFalse(); + } + + @Test + void forceFlush() throws IOException { + WaitingLogRecordExporter waitingLogRecordExporter = + new WaitingLogRecordExporter(100, CompletableResultCode.ofSuccess(), 1); + InMemoryAuditLogStore logStore = new InMemoryAuditLogStore(); + + AuditLogRecordProcessor auditLogRecordProcessor = + AuditLogRecordProcessor.builder(waitingLogRecordExporter, logStore) + .setMaxExportBatchSize(49) + .setScheduleDelay(10, TimeUnit.SECONDS) + .build(); + + SdkLoggerProvider sdkLoggerProvider = + SdkLoggerProvider.builder().addLogRecordProcessor(auditLogRecordProcessor).build(); + + for (int i = 0; i < 50; i++) { + emitLog(sdkLoggerProvider, "notExported"); + } + List exported = waitingLogRecordExporter.waitForExport(); + assertThat(exported).isNotNull(); + assertThat(exported.size()).isEqualTo(49); + + for (int i = 0; i < 50; i++) { + emitLog(sdkLoggerProvider, "notExported"); + } + exported = waitingLogRecordExporter.waitForExport(); + assertThat(exported).isNotNull(); + assertThat(exported.size()).isEqualTo(49); + + auditLogRecordProcessor.forceFlush().join(10, TimeUnit.SECONDS); + exported = waitingLogRecordExporter.getExported(); + assertThat(exported).isNotNull(); + assertThat(exported.size()).isEqualTo(2); + } + + @Test + void ignoresNullLogs() throws IOException { + InMemoryAuditLogStore logStore = new InMemoryAuditLogStore(); + AuditLogRecordProcessor processor = + AuditLogRecordProcessor.builder(mockLogRecordExporter, logStore).build(); + try { + assertThatCode(() -> processor.onEmit(null, null)).doesNotThrowAnyException(); + } finally { + processor.shutdown(); + } + } + + @BeforeEach + void setUp() { + when(mockLogRecordExporter.shutdown()).thenReturn(CompletableResultCode.ofSuccess()); + when(mockLogStore.getAll()).thenReturn(new ArrayList<>()); + } + + @Test + void shutdownAfterEmitThrowsException() throws IOException { + InMemoryAuditLogStore logStore = new InMemoryAuditLogStore(); + AuditLogRecordProcessor processor = + AuditLogRecordProcessor.builder(mockLogRecordExporter, logStore) + .setExceptionHandler(mockExceptionHandler) + .build(); + + processor.shutdown(); + + SdkLoggerProvider sdkLoggerProvider = + SdkLoggerProvider.builder().addLogRecordProcessor(processor).build(); + + emitLog(sdkLoggerProvider, LOG_MESSAGE_1); + + verify(mockExceptionHandler, times(1)).handle(any(AuditException.class)); + } + + @Test + @Timeout(10) + void shutdownFlushes() throws IOException { + WaitingLogRecordExporter waitingLogRecordExporter = + new WaitingLogRecordExporter(1, CompletableResultCode.ofSuccess()); + InMemoryAuditLogStore logStore = new InMemoryAuditLogStore(); + + SdkLoggerProvider sdkLoggerProvider = + SdkLoggerProvider.builder() + .addLogRecordProcessor( + AuditLogRecordProcessor.builder(waitingLogRecordExporter, logStore) + .setScheduleDelay(10, TimeUnit.SECONDS) + .build()) + .build(); + + emitLog(sdkLoggerProvider, LOG_MESSAGE_2); + + // Force a shutdown, which forces processing of all remaining logs. + sdkLoggerProvider.shutdown().join(10, TimeUnit.SECONDS); + + List exported = waitingLogRecordExporter.getExported(); + assertThat(exported) + .satisfiesExactly(logRecordData -> assertThat(logRecordData).hasBody(LOG_MESSAGE_2)); + } + + @Test + void shutdownPropagatesFailure() throws IOException { + when(mockLogRecordExporter.shutdown()).thenReturn(CompletableResultCode.ofFailure()); + InMemoryAuditLogStore logStore = new InMemoryAuditLogStore(); + AuditLogRecordProcessor processor = + AuditLogRecordProcessor.builder(mockLogRecordExporter, logStore).build(); + CompletableResultCode result = processor.shutdown(); + result.join(1, TimeUnit.SECONDS); + assertThat(result.isSuccess()).isTrue(); + } + + @Test + void shutdownPropagatesSuccess() throws IOException { + InMemoryAuditLogStore logStore = new InMemoryAuditLogStore(); + AuditLogRecordProcessor processor = + AuditLogRecordProcessor.builder(mockLogRecordExporter, logStore).build(); + CompletableResultCode result = processor.shutdown(); + result.join(1, TimeUnit.SECONDS); + assertThat(result.isSuccess()).isTrue(); + } + + // Helper classes similar to BatchLogRecordProcessorTest + + @Test + void toString_Valid() throws IOException { + when(mockLogRecordExporter.toString()).thenReturn("MockLogRecordExporter"); + InMemoryAuditLogStore logStore = new InMemoryAuditLogStore(); + AuditLogRecordProcessor processor = + AuditLogRecordProcessor.builder(mockLogRecordExporter, logStore).build(); + + String result = processor.toString(); + assertThat(result).contains("AuditLogRecordProcessor"); + assertThat(result).contains("MockLogRecordExporter"); + } + + void waitOn(AuditLogRecordProcessor processor) throws InterruptedException { + while (processor.getLastResultCode() == null) { + Thread.sleep(MAX_SCHEDULE_DELAY_MILLIS); + } + processor.getLastResultCode().join(1, TimeUnit.SECONDS); + } +} diff --git a/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/export/BatchLogRecordProcessorTest.java b/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/export/BatchLogRecordProcessorTest.java index 4f388a5ca49..ff0d6969d50 100644 --- a/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/export/BatchLogRecordProcessorTest.java +++ b/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/export/BatchLogRecordProcessorTest.java @@ -546,7 +546,7 @@ private void unblock() { } } - private static class CompletableLogRecordExporter implements LogRecordExporter { + static class CompletableLogRecordExporter implements LogRecordExporter { private final List results = new ArrayList<>();