diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/failures/DlqObject.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/failures/DlqObject.java new file mode 100644 index 0000000000..607243c8e0 --- /dev/null +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/failures/DlqObject.java @@ -0,0 +1,154 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.model.failures; + +import org.apache.commons.lang3.StringUtils; + +import java.time.Instant; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.util.Objects; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * A model representing DLQ objects in Data Prepper + * + * @since 2.2 + */ +public class DlqObject { + + private static final String ISO8601_FORMAT_STRING = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"; + private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern(ISO8601_FORMAT_STRING) + .withZone(ZoneId.systemDefault());; + + private final String pluginId; + + private final String pluginName; + + private final String pipelineName; + + private final Object failedData; + + private final String timestamp; + + private DlqObject(final String pluginId, final String pluginName, final String pipelineName, + final String timestamp, final Object failedData) { + + checkNotNull(pluginId, "pluginId cannot be null"); + checkArgument(!pluginId.isEmpty(), "pluginId cannot be an empty string"); + checkNotNull(pluginName, "pluginName cannot be null"); + checkArgument(!pluginName.isEmpty(), "pluginName cannot be an empty string"); + checkNotNull(pipelineName, "pipelineName cannot be null"); + checkArgument(!pipelineName.isEmpty(), "pipelineName cannot be an empty string"); + checkNotNull(failedData, "failedData cannot be null"); + + this.pluginId = pluginId; + this.pluginName = pluginName; + this.pipelineName = pipelineName; + this.failedData = failedData; + + this.timestamp = StringUtils.isEmpty(timestamp) ? FORMATTER.format(Instant.now()) : timestamp; + } + + public String getPluginId() { + return pluginId; + } + + public String getPluginName() { + return pluginName; + } + + public String getPipelineName() { + return pipelineName; + } + + public Object getFailedData() { + return failedData; + } + + public String getTimestamp() { + return timestamp; + } + + @Override + public boolean equals(final Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + final DlqObject that = (DlqObject) o; + return Objects.equals(failedData, that.getFailedData()) + && Objects.equals(pluginId, that.pluginId) + && Objects.equals(pluginName, that.pluginName) + && Objects.equals(pipelineName, that.pipelineName) + && Objects.equals(timestamp, that.getTimestamp()); + } + + @Override + public int hashCode() { + return Objects.hash(pluginId, pluginName, pipelineName, timestamp, failedData); + } + + @Override + public String toString() { + return "DlqObject{" + + "pluginId='" + pluginId + '\'' + + ", pluginName='" + pluginName + '\'' + + ", pipelineName='" + pipelineName + '\'' + + ", timestamp='" + timestamp + '\'' + + ", failedData=" + failedData + + '}'; + } + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + + private String pluginId; + private String pluginName; + private String pipelineName; + private Object failedData; + + private String timestamp; + + public Builder withPluginId(final String pluginId) { + this.pluginId = pluginId; + return this; + } + + public Builder withPluginName(final String pluginName) { + this.pluginName = pluginName; + return this; + } + + public Builder withPipelineName(final String pipelineName) { + this.pipelineName = pipelineName; + return this; + } + + public Builder withFailedData(final Object failedData) { + this.failedData = failedData; + return this; + } + + public Builder withTimestamp(final String timestamp) { + this.timestamp = timestamp; + return this; + } + + public Builder withTimestamp(final Instant instant) { + this.timestamp = FORMATTER.format(instant); + return this; + } + + public DlqObject build() { + return new DlqObject(this.pluginId, this.pluginName, this.pipelineName, this.timestamp, this.failedData); + } + + } +} diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/failures/DlqObjectTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/failures/DlqObjectTest.java new file mode 100644 index 0000000000..4905599c32 --- /dev/null +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/failures/DlqObjectTest.java @@ -0,0 +1,273 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.model.failures; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.ZoneOffset; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; + +import static java.util.UUID.randomUUID; +import static org.hamcrest.CoreMatchers.allOf; +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.hamcrest.Matchers.lessThanOrEqualTo; + +public class DlqObjectTest { + + private static final String ISO8601_FORMAT_STRING = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"; + private String pluginId; + private String pluginName; + private String pipelineName; + private Object failedData; + + @BeforeEach + public void setUp() { + pluginId = randomUUID().toString(); + pluginName = randomUUID().toString(); + pipelineName = randomUUID().toString(); + failedData = randomUUID(); + } + + @Test + public void test_build_with_timestamp() { + + final DlqObject testObject = DlqObject.builder() + .withPluginId(pluginId) + .withPluginName(pluginName) + .withPipelineName(pipelineName) + .withFailedData(failedData) + .withTimestamp(randomUUID().toString()) + .build(); + + assertThat(testObject, is(notNullValue())); + } + + @Test + public void test_build_without_timestamp() { + + final DlqObject testObject = DlqObject.builder() + .withPluginId(pluginId) + .withPluginName(pluginName) + .withPipelineName(pipelineName) + .withFailedData(failedData) + .build(); + + assertThat(testObject, is(notNullValue())); + } + + @Nested + public class InvalidBuildParameters { + + private void createTestObject() { + DlqObject.builder() + .withPluginId(pluginId) + .withPluginName(pluginName) + .withPipelineName(pipelineName) + .withFailedData(failedData) + .build(); + } + @Test + public void test_invalid_pluginId() { + pluginId = null; + assertThrows(NullPointerException.class, this::createTestObject); + pluginId = ""; + assertThrows(IllegalArgumentException.class, this::createTestObject); + } + + @Test + public void test_invalid_pluginName() { + pluginName = null; + assertThrows(NullPointerException.class, this::createTestObject); + pluginName = ""; + assertThrows(IllegalArgumentException.class, this::createTestObject); + } + + @Test + public void test_invalid_pipelineName() { + pipelineName = null; + assertThrows(NullPointerException.class, this::createTestObject); + pipelineName = ""; + assertThrows(IllegalArgumentException.class, this::createTestObject); + } + + @Test + public void test_invalid_failedData() { + failedData = null; + assertThrows(NullPointerException.class, this::createTestObject); + } + } + + @Nested + class Getters { + + private DlqObject testObject; + + @BeforeEach + public void setup() { + + testObject = DlqObject.builder() + .withPluginId(pluginId) + .withPluginName(pluginName) + .withPipelineName(pipelineName) + .withFailedData(failedData) + .build(); + } + + @Test + public void test_get_pluginId() { + final String actualPluginId = testObject.getPluginId(); + assertThat(actualPluginId, is(notNullValue())); + assertThat(actualPluginId, is(pluginId)); + } + + @Test + public void test_get_pluginName() { + final String actualPluginName = testObject.getPluginName(); + assertThat(actualPluginName, is(notNullValue())); + assertThat(actualPluginName, is(pluginName)); + } + + @Test + public void test_get_pipelineName() { + final String actualPipelineName = testObject.getPipelineName(); + assertThat(actualPipelineName, is(notNullValue())); + assertThat(actualPipelineName, is(pipelineName)); + } + + @Test + public void test_get_failedData() { + final Object actualFailedData = testObject.getFailedData(); + assertThat(actualFailedData, is(notNullValue())); + assertThat(actualFailedData, is(failedData)); + } + + @Test + public void test_get_timestamp() { + final String string = testObject.getTimestamp(); + assertThat(string, is(notNullValue())); + + DateTimeFormatter formatter = DateTimeFormatter.ofPattern(ISO8601_FORMAT_STRING); // Specify locale to determine human language and cultural norms used in translating that input string. + Instant actualTimestamp = LocalDateTime.parse(testObject.getTimestamp(), formatter) + .atZone(ZoneId.systemDefault().normalized()) + .toInstant(); + + assertThat(actualTimestamp, is(lessThanOrEqualTo(Instant.now()))); + } + + } + + + @Nested + class EqualsAndHashCodeAndToString { + + private DlqObject testObject; + + @BeforeEach + public void setup() { + testObject = DlqObject.builder() + .withPluginId(pluginId) + .withPluginName(pluginName) + .withPipelineName(pipelineName) + .withFailedData(failedData) + .build(); + } + + @Test + void test_equals_returns_false_for_null() { + assertThat(testObject.equals(null), is(equalTo(false))); + } + + @Test + void test_equals_returns_false_for_other_class() { + assertThat(testObject.equals(randomUUID()), is(equalTo(false))); + } + + @Test + void test_equals_on_same_instance_returns_true() { + assertThat(testObject.equals(testObject), is(equalTo(true))); + } + + @Test + void test_equals_a_clone_of_the_same_instance_returns_true() { + + final DlqObject otherTestObject = DlqObject.builder() + .withPluginId(pluginId) + .withPluginName(pluginName) + .withPipelineName(pipelineName) + .withFailedData(failedData) + .withTimestamp(testObject.getTimestamp()) + .build(); + + assertThat(testObject.equals(otherTestObject), is(equalTo(true))); + } + + @Test + void test_equals_returns_false_for_two_instances_with_different_values() { + + final ZonedDateTime now = Instant.now().atZone(ZoneOffset.UTC); + + final Instant olderInstant = Instant.now().atZone(ZoneOffset.UTC) + .withHour(now.getHour() - 1) + .toInstant(); + + final DlqObject otherTestObject = DlqObject.builder() + .withPluginId(pluginId) + .withPluginName(pluginName) + .withPipelineName(pipelineName) + .withFailedData(failedData) + .withTimestamp(olderInstant) + .build(); + + assertThat(testObject, is(not(equalTo(otherTestObject)))); + } + + @Test + void test_hash_codes_for_two_instances_have_different_values() { + + final ZonedDateTime now = Instant.now().atZone(ZoneOffset.UTC); + + final Instant olderInstant = Instant.now().atZone(ZoneOffset.UTC) + .withHour(now.getHour() - 1) + .toInstant(); + + final DlqObject otherTestObject = DlqObject.builder() + .withPluginId(pluginId) + .withPluginName(pluginName) + .withPipelineName(pipelineName) + .withFailedData(failedData) + .withTimestamp(olderInstant) + .build(); + + assertThat(testObject.hashCode(), is(not(equalTo(otherTestObject.hashCode())))); + } + + @Test + void test_toString_has_all_values() { + final String string = testObject.toString(); + + assertThat(string, notNullValue()); + assertThat(string, allOf( + containsString("DlqObject"), + containsString(pluginId), + containsString(pluginName), + containsString(pipelineName), + containsString(failedData.toString()) + )); + } + } +} diff --git a/data-prepper-plugins/failures-common/build.gradle b/data-prepper-plugins/failures-common/build.gradle new file mode 100644 index 0000000000..5de9fad270 --- /dev/null +++ b/data-prepper-plugins/failures-common/build.gradle @@ -0,0 +1,9 @@ + +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +dependencies { + implementation project(':data-prepper-api') +} diff --git a/data-prepper-plugins/failures-common/src/main/java/org/opensearch/dataprepper/dlq/DlqProvider.java b/data-prepper-plugins/failures-common/src/main/java/org/opensearch/dataprepper/dlq/DlqProvider.java new file mode 100644 index 0000000000..de86df73c2 --- /dev/null +++ b/data-prepper-plugins/failures-common/src/main/java/org/opensearch/dataprepper/dlq/DlqProvider.java @@ -0,0 +1,27 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package main.java.org.opensearch.dataprepper.dlq; + +import java.util.Optional; + +/** + * An interface for providing {@link DlqWriter}s. + *
+ * Plugin authors can use this interface for providing {@link DlqWriter}s
+ *
+ * @since 2.2
+ */
+public interface DlqProvider {
+
+
+ /**
+ * Allows implementors to provide a {@link DlqWriter}. This may be optional, in which case it is not used.
+ * @since 2.2
+ */
+ default Optional