diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-nar/pom.xml b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-nar/pom.xml
index 34121fdfd167..43f1d63a2254 100644
--- a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-nar/pom.xml
+++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-nar/pom.xml
@@ -32,6 +32,11 @@
nifi-couchbase-processors
2.7.0-SNAPSHOT
+
+ org.apache.nifi
+ nifi-couchbase-services
+ 2.7.0-SNAPSHOT
+
org.apache.nifi
nifi-couchbase-services-api-nar
diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/PutCouchbase.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/PutCouchbase.java
index a2bc22816dd9..1ffe1045f1cb 100644
--- a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/PutCouchbase.java
+++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/PutCouchbase.java
@@ -28,7 +28,6 @@
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.services.couchbase.CouchbaseClient;
-import org.apache.nifi.services.couchbase.CouchbaseConnectionService;
import org.apache.nifi.services.couchbase.exception.CouchbaseException;
import org.apache.nifi.services.couchbase.utils.CouchbaseContext;
import org.apache.nifi.services.couchbase.utils.CouchbaseUpsertResult;
@@ -69,7 +68,6 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
}
final long startNanos = System.nanoTime();
- final CouchbaseConnectionService connectionService = context.getProperty(COUCHBASE_CONNECTION_SERVICE).asControllerService(CouchbaseConnectionService.class);
final String documentId = context.getProperty(DOCUMENT_ID).evaluateAttributeExpressions(flowFile).getValue();
final CouchbaseContext couchbaseContext = getCouchbaseContext(context, flowFile);
diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessorTest.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessorTest.java
new file mode 100644
index 000000000000..5cbf02439214
--- /dev/null
+++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessorTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.couchbase;
+
+import org.apache.nifi.services.couchbase.CouchbaseClient;
+import org.apache.nifi.services.couchbase.CouchbaseConnectionService;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public abstract class AbstractCouchbaseProcessorTest {
+
+ protected static final String SERVICE_ID = "couchbaseConnectionService";
+ protected static final String TEST_DOCUMENT_ID = "test-document-id";
+ protected static final String TEST_DOCUMENT_CONTENT = "{\"key\":\"value\"}";
+ protected static final String TEST_SERVICE_LOCATION = "couchbase://test-location";
+ protected static final long TEST_CAS = 1L;
+
+ protected static CouchbaseConnectionService mockConnectionService(CouchbaseClient client) {
+ final CouchbaseConnectionService connectionService = mock(CouchbaseConnectionService.class);
+ when(connectionService.getIdentifier()).thenReturn(SERVICE_ID);
+ when(connectionService.getClient(any())).thenReturn(client);
+ when(connectionService.getServiceLocation()).thenReturn(TEST_SERVICE_LOCATION);
+ return connectionService;
+ }
+}
diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/GetCouchbaseTest.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/GetCouchbaseTest.java
index 97595b7d1d4b..4f535e34f9f0 100644
--- a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/GetCouchbaseTest.java
+++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/GetCouchbaseTest.java
@@ -31,8 +31,8 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import java.nio.charset.StandardCharsets;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -60,12 +60,7 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-public class GetCouchbaseTest {
-
- private static final String SERVICE_ID = "couchbaseConnectionService";
- private static final String TEST_DOCUMENT_ID = "test-document-id";
- private static final String TEST_SERVICE_LOCATION = "couchbase://test-location";
- private static final long TEST_CAS = 1L;
+public class GetCouchbaseTest extends AbstractCouchbaseProcessorTest {
private TestRunner runner;
@@ -76,20 +71,15 @@ public void init() {
@Test
public void testOnTriggerWithProvidedDocumentId() throws CouchbaseException, InitializationException {
- final String content = "{\"key\":\"value\"}";
-
- final CouchbaseGetResult result = new CouchbaseGetResult(content.getBytes(), TEST_CAS);
+ final CouchbaseGetResult result = new CouchbaseGetResult(TEST_DOCUMENT_CONTENT.getBytes(), TEST_CAS);
final CouchbaseClient client = mock(CouchbaseClient.class);
when(client.getDocument(anyString())).thenReturn(result);
- final CouchbaseConnectionService service = mock(CouchbaseConnectionService.class);
- when(service.getIdentifier()).thenReturn(SERVICE_ID);
- when(service.getClient(any())).thenReturn(client);
- when(service.getServiceLocation()).thenReturn(TEST_SERVICE_LOCATION);
+ final CouchbaseConnectionService connectionService = mockConnectionService(client);
- runner.addControllerService(SERVICE_ID, service);
- runner.enableControllerService(service);
+ runner.addControllerService(SERVICE_ID, connectionService);
+ runner.enableControllerService(connectionService);
runner.setProperty(DOCUMENT_ID, TEST_DOCUMENT_ID);
runner.setProperty(COUCHBASE_CONNECTION_SERVICE, SERVICE_ID);
runner.enqueue(new byte[0]);
@@ -101,7 +91,7 @@ public void testOnTriggerWithProvidedDocumentId() throws CouchbaseException, Ini
runner.assertTransferCount(REL_FAILURE, 0);
final MockFlowFile outFile = runner.getFlowFilesForRelationship(REL_SUCCESS).getFirst();
- outFile.assertContentEquals(content);
+ outFile.assertContentEquals(TEST_DOCUMENT_CONTENT);
outFile.assertAttributeEquals(BUCKET_ATTRIBUTE, DEFAULT_BUCKET);
outFile.assertAttributeEquals(SCOPE_ATTRIBUTE, DEFAULT_SCOPE);
outFile.assertAttributeEquals(COLLECTION_ATTRIBUTE, DEFAULT_COLLECTION);
@@ -116,24 +106,18 @@ public void testOnTriggerWithProvidedDocumentId() throws CouchbaseException, Ini
@Test
public void testWithDocumentIdAsFlowFileAttribute() throws CouchbaseException, InitializationException {
- final String content = "{\"key\":\"value\"}";
-
- final CouchbaseGetResult result = new CouchbaseGetResult(content.getBytes(), TEST_CAS);
+ final CouchbaseGetResult result = new CouchbaseGetResult(TEST_DOCUMENT_CONTENT.getBytes(), TEST_CAS);
final CouchbaseClient client = mock(CouchbaseClient.class);
when(client.getDocument(anyString())).thenReturn(result);
- final CouchbaseConnectionService service = mock(CouchbaseConnectionService.class);
- when(service.getIdentifier()).thenReturn(SERVICE_ID);
- when(service.getClient(any())).thenReturn(client);
+ final CouchbaseConnectionService connectionService = mockConnectionService(client);
final MockFlowFile flowFile = new MockFlowFile(0);
- final Map attributes = new HashMap<>();
- attributes.put("flowfile_document_id", TEST_DOCUMENT_ID);
- flowFile.putAttributes(attributes);
+ flowFile.putAttributes(Collections.singletonMap("flowfile_document_id", TEST_DOCUMENT_ID));
- runner.addControllerService(SERVICE_ID, service);
- runner.enableControllerService(service);
+ runner.addControllerService(SERVICE_ID, connectionService);
+ runner.enableControllerService(connectionService);
runner.setProperty(DOCUMENT_ID, "${flowfile_document_id}");
runner.setProperty(COUCHBASE_CONNECTION_SERVICE, SERVICE_ID);
runner.enqueue(flowFile);
@@ -145,7 +129,7 @@ public void testWithDocumentIdAsFlowFileAttribute() throws CouchbaseException, I
runner.assertTransferCount(REL_FAILURE, 0);
final MockFlowFile outFile = runner.getFlowFilesForRelationship(REL_SUCCESS).getFirst();
- outFile.assertContentEquals(content);
+ outFile.assertContentEquals(TEST_DOCUMENT_CONTENT);
outFile.assertAttributeEquals(BUCKET_ATTRIBUTE, DEFAULT_BUCKET);
outFile.assertAttributeEquals(SCOPE_ATTRIBUTE, DEFAULT_SCOPE);
outFile.assertAttributeEquals(COLLECTION_ATTRIBUTE, DEFAULT_COLLECTION);
@@ -154,24 +138,20 @@ public void testWithDocumentIdAsFlowFileAttribute() throws CouchbaseException, I
@Test
public void testWithFlowFileAttributes() throws CouchbaseException, InitializationException {
- final String documentId = "test-document-id";
- final String content = "{\"key\":\"value\"}";
final String testBucket = "test-bucket";
final String testScope = "test-scope";
final String testCollection = "test-collection";
- final CouchbaseGetResult result = new CouchbaseGetResult(content.getBytes(), TEST_CAS);
+ final CouchbaseGetResult result = new CouchbaseGetResult(TEST_DOCUMENT_CONTENT.getBytes(), TEST_CAS);
final CouchbaseClient client = mock(CouchbaseClient.class);
when(client.getDocument(anyString())).thenReturn(result);
- final CouchbaseConnectionService service = mock(CouchbaseConnectionService.class);
- when(service.getIdentifier()).thenReturn(SERVICE_ID);
- when(service.getClient(any())).thenReturn(client);
+ final CouchbaseConnectionService connectionService = mockConnectionService(client);
- runner.addControllerService(SERVICE_ID, service);
- runner.enableControllerService(service);
- runner.setProperty(DOCUMENT_ID, documentId);
+ runner.addControllerService(SERVICE_ID, connectionService);
+ runner.enableControllerService(connectionService);
+ runner.setProperty(DOCUMENT_ID, TEST_DOCUMENT_ID);
runner.setProperty(BUCKET_NAME, "${bucket.attribute}");
runner.setProperty(SCOPE_NAME, "${scope.attribute}");
runner.setProperty(COLLECTION_NAME, "${collection.attribute}");
@@ -181,17 +161,16 @@ public void testWithFlowFileAttributes() throws CouchbaseException, Initializati
attributes.put("bucket.attribute", testBucket);
attributes.put("scope.attribute", testScope);
attributes.put("collection.attribute", testCollection);
- final byte[] input = documentId.getBytes(StandardCharsets.UTF_8);
- runner.enqueue(input, attributes);
+ runner.enqueue(new byte[0], attributes);
runner.run();
- verify(client, times(1)).getDocument(eq(documentId));
+ verify(client, times(1)).getDocument(eq(TEST_DOCUMENT_ID));
runner.assertTransferCount(REL_SUCCESS, 1);
runner.assertTransferCount(REL_FAILURE, 0);
final MockFlowFile outFile = runner.getFlowFilesForRelationship(REL_SUCCESS).getFirst();
- outFile.assertContentEquals(content);
+ outFile.assertContentEquals(TEST_DOCUMENT_CONTENT);
outFile.assertAttributeEquals(BUCKET_ATTRIBUTE, testBucket);
outFile.assertAttributeEquals(SCOPE_ATTRIBUTE, testScope);
outFile.assertAttributeEquals(COLLECTION_ATTRIBUTE, testCollection);
@@ -204,13 +183,11 @@ public void testWithFailure() throws CouchbaseException, InitializationException
when(client.getExceptionCategory(any())).thenReturn(ExceptionCategory.FAILURE);
when(client.getDocument(anyString())).thenThrow(new CouchbaseException("", new TestCouchbaseException("Test exception")));
- final CouchbaseConnectionService service = mock(CouchbaseConnectionService.class);
- when(service.getIdentifier()).thenReturn(SERVICE_ID);
- when(service.getClient(any())).thenReturn(client);
+ final CouchbaseConnectionService connectionService = mockConnectionService(client);
runner.setProperty(DOCUMENT_ID, TEST_DOCUMENT_ID);
- runner.addControllerService(SERVICE_ID, service);
- runner.enableControllerService(service);
+ runner.addControllerService(SERVICE_ID, connectionService);
+ runner.enableControllerService(connectionService);
runner.setProperty(COUCHBASE_CONNECTION_SERVICE, SERVICE_ID);
runner.enqueue(new byte[0]);
runner.run();
diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/PutCouchbaseTest.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/PutCouchbaseTest.java
index 60cd3a114a26..320beb848ebc 100644
--- a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/PutCouchbaseTest.java
+++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/PutCouchbaseTest.java
@@ -57,12 +57,7 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-public class PutCouchbaseTest {
-
- private static final String SERVICE_ID = "couchbaseConnectionService";
- private static final String TEST_DOCUMENT_ID = "test-document-id";
- private static final String TEST_SERVICE_LOCATION = "couchbase://test-location";
- private static final long TEST_CAS = 1L;
+public class PutCouchbaseTest extends AbstractCouchbaseProcessorTest {
private TestRunner runner;
@@ -76,18 +71,15 @@ public void testWithDocumentIdAsFlowFileAttribute() throws CouchbaseException, I
final CouchbaseClient client = mock(CouchbaseClient.class);
when(client.upsertDocument(anyString(), any())).thenReturn(new CouchbaseUpsertResult(TEST_CAS));
- final CouchbaseConnectionService service = mock(CouchbaseConnectionService.class);
- when(service.getIdentifier()).thenReturn(SERVICE_ID);
- when(service.getClient(any())).thenReturn(client);
- when(service.getServiceLocation()).thenReturn(TEST_SERVICE_LOCATION);
+ final CouchbaseConnectionService connectionService = mockConnectionService(client);
final MockFlowFile flowFile = new MockFlowFile(0);
final Map attributes = new HashMap<>();
attributes.put("flowfile_document_id", TEST_DOCUMENT_ID);
flowFile.putAttributes(attributes);
- runner.addControllerService(SERVICE_ID, service);
- runner.enableControllerService(service);
+ runner.addControllerService(SERVICE_ID, connectionService);
+ runner.enableControllerService(connectionService);
runner.setProperty(DOCUMENT_ID, "${flowfile_document_id}");
runner.setProperty(COUCHBASE_CONNECTION_SERVICE, SERVICE_ID);
runner.setValidateExpressionUsage(false);
@@ -120,12 +112,10 @@ public void testWithFailure() throws CouchbaseException, InitializationException
when(client.getExceptionCategory(any())).thenReturn(ExceptionCategory.FAILURE);
when(client.upsertDocument(anyString(), any())).thenThrow(new CouchbaseException("", new TestCouchbaseException("Test exception")));
- final CouchbaseConnectionService service = mock(CouchbaseConnectionService.class);
- when(service.getIdentifier()).thenReturn(SERVICE_ID);
- when(service.getClient(any())).thenReturn(client);
+ final CouchbaseConnectionService connectionService = mockConnectionService(client);
- runner.addControllerService(SERVICE_ID, service);
- runner.enableControllerService(service);
+ runner.addControllerService(SERVICE_ID, connectionService);
+ runner.enableControllerService(connectionService);
runner.setProperty(DOCUMENT_ID, TEST_DOCUMENT_ID);
runner.setProperty(COUCHBASE_CONNECTION_SERVICE, SERVICE_ID);
runner.enqueue(new byte[0]);
@@ -143,12 +133,10 @@ public void testWithRetry() throws CouchbaseException, InitializationException {
when(client.getExceptionCategory(any())).thenReturn(ExceptionCategory.RETRY);
when(client.upsertDocument(anyString(), any())).thenThrow(new CouchbaseException("", new TestCouchbaseException("Test exception")));
- final CouchbaseConnectionService service = mock(CouchbaseConnectionService.class);
- when(service.getIdentifier()).thenReturn(SERVICE_ID);
- when(service.getClient(any())).thenReturn(client);
+ final CouchbaseConnectionService connectionService = mockConnectionService(client);
- runner.addControllerService(SERVICE_ID, service);
- runner.enableControllerService(service);
+ runner.addControllerService(SERVICE_ID, connectionService);
+ runner.enableControllerService(connectionService);
runner.setProperty(DOCUMENT_ID, TEST_DOCUMENT_ID);
runner.setProperty(COUCHBASE_CONNECTION_SERVICE, SERVICE_ID);
runner.enqueue(new byte[0]);
@@ -166,12 +154,10 @@ public void testWithRollback() throws CouchbaseException, InitializationExceptio
when(client.getExceptionCategory(any())).thenReturn(ExceptionCategory.ROLLBACK);
when(client.upsertDocument(anyString(), any())).thenThrow(new CouchbaseException("", new TestCouchbaseException("Test exception")));
- final CouchbaseConnectionService service = mock(CouchbaseConnectionService.class);
- when(service.getIdentifier()).thenReturn(SERVICE_ID);
- when(service.getClient(any())).thenReturn(client);
+ final CouchbaseConnectionService connectionService = mockConnectionService(client);
- runner.addControllerService(SERVICE_ID, service);
- runner.enableControllerService(service);
+ runner.addControllerService(SERVICE_ID, connectionService);
+ runner.enableControllerService(connectionService);
runner.setProperty(DOCUMENT_ID, TEST_DOCUMENT_ID);
runner.setProperty(COUCHBASE_CONNECTION_SERVICE, SERVICE_ID);
runner.enqueue(new byte[0]);
@@ -189,12 +175,10 @@ public void testWithUnknownException() throws CouchbaseException, Initialization
when(client.getExceptionCategory(any())).thenReturn(ExceptionCategory.FAILURE);
when(client.upsertDocument(anyString(), any())).thenThrow(new CouchbaseException("", new TestCouchbaseException("Test exception")));
- final CouchbaseConnectionService service = mock(CouchbaseConnectionService.class);
- when(service.getIdentifier()).thenReturn(SERVICE_ID);
- when(service.getClient(any())).thenReturn(client);
+ final CouchbaseConnectionService connectionService = mockConnectionService(client);
- runner.addControllerService(SERVICE_ID, service);
- runner.enableControllerService(service);
+ runner.addControllerService(SERVICE_ID, connectionService);
+ runner.enableControllerService(connectionService);
runner.setProperty(DOCUMENT_ID, TEST_DOCUMENT_ID);
runner.setProperty(COUCHBASE_CONNECTION_SERVICE, SERVICE_ID);
runner.enqueue(new byte[0]);
diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/CouchbaseClient.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/CouchbaseClient.java
index 9ad04fbe01df..4cc354f26b7d 100644
--- a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/CouchbaseClient.java
+++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/CouchbaseClient.java
@@ -19,6 +19,7 @@
import org.apache.nifi.services.couchbase.exception.CouchbaseException;
import org.apache.nifi.services.couchbase.exception.ExceptionCategory;
import org.apache.nifi.services.couchbase.utils.CouchbaseGetResult;
+import org.apache.nifi.services.couchbase.utils.CouchbaseLookupInResult;
import org.apache.nifi.services.couchbase.utils.CouchbaseUpsertResult;
public interface CouchbaseClient {
@@ -27,5 +28,15 @@ public interface CouchbaseClient {
CouchbaseUpsertResult upsertDocument(String documentId, byte[] content) throws CouchbaseException;
+ boolean documentExists(String documentId) throws CouchbaseException;
+
+ void insertDocument(String documentId, byte[] content) throws CouchbaseException;
+
+ void removeDocument(String documentId) throws CouchbaseException;
+
+ void replaceDocument(String documentId, byte[] content) throws CouchbaseException;
+
+ CouchbaseLookupInResult lookUpIn(String documentId, String subDocPath) throws CouchbaseException;
+
ExceptionCategory getExceptionCategory(Throwable throwable);
}
diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/exception/CouchbaseException.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/exception/CouchbaseException.java
index ea2436e621fa..600e38dc1c9d 100644
--- a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/exception/CouchbaseException.java
+++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/exception/CouchbaseException.java
@@ -23,6 +23,6 @@ public CouchbaseException(final String message) {
}
public CouchbaseException(final String message, final Throwable cause) {
- super(cause);
+ super(message, cause);
}
}
diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/utils/CouchbaseLookupInResult.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/utils/CouchbaseLookupInResult.java
new file mode 100644
index 000000000000..c916e29f3435
--- /dev/null
+++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/utils/CouchbaseLookupInResult.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.couchbase.utils;
+
+public record CouchbaseLookupInResult(Object resultContent, long cas) {
+}
diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/pom.xml b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/pom.xml
new file mode 100644
index 000000000000..e918d055e62f
--- /dev/null
+++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/pom.xml
@@ -0,0 +1,68 @@
+
+
+
+ 4.0.0
+
+ org.apache.nifi
+ nifi-couchbase-bundle
+ 2.7.0-SNAPSHOT
+
+
+ nifi-couchbase-services
+ jar
+
+
+
+
+ org.apache.nifi
+ nifi-couchbase-services-api
+ 2.7.0-SNAPSHOT
+ provided
+
+
+ org.apache.nifi
+ nifi-lookup-service-api
+
+
+ org.apache.nifi
+ nifi-record
+
+
+ org.apache.nifi
+ nifi-record-serialization-service-api
+
+
+ org.apache.nifi
+ nifi-json-record-utils
+ 2.7.0-SNAPSHOT
+
+
+ org.apache.nifi
+ nifi-avro-record-utils
+ 2.7.0-SNAPSHOT
+
+
+ org.apache.nifi
+ nifi-distributed-cache-client-service-api
+ compile
+
+
+
+
+
+
\ No newline at end of file
diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/AbstractCouchbaseService.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/AbstractCouchbaseService.java
new file mode 100644
index 000000000000..47fdc4ac7657
--- /dev/null
+++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/AbstractCouchbaseService.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.couchbase;
+
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.services.couchbase.utils.CouchbaseContext;
+import org.apache.nifi.services.couchbase.utils.DocumentType;
+
+import java.util.Set;
+
+public class AbstractCouchbaseService extends AbstractControllerService {
+
+ protected static final String KEY = "key";
+ protected static final Set REQUIRED_KEYS = Set.of(KEY);
+
+ private static final String DEFAULT_BUCKET = "default";
+ private static final String DEFAULT_SCOPE = "_default";
+ private static final String DEFAULT_COLLECTION = "_default";
+
+ protected CouchbaseClient couchbaseClient;
+
+ public static final PropertyDescriptor COUCHBASE_CONNECTION_SERVICE = new PropertyDescriptor.Builder()
+ .name("Couchbase Connection Service")
+ .description("A Couchbase Connection Service which manages connections to a Couchbase cluster.")
+ .required(true)
+ .identifiesControllerService(CouchbaseConnectionService.class)
+ .build();
+
+ public static final PropertyDescriptor BUCKET_NAME = new PropertyDescriptor.Builder()
+ .name("Bucket Name")
+ .description("The name of the bucket where documents will be stored. Each bucket contains a hierarchy of scopes and collections to group keys and values logically.")
+ .required(true)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .defaultValue(DEFAULT_BUCKET)
+ .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+ .build();
+
+ public static final PropertyDescriptor SCOPE_NAME = new PropertyDescriptor.Builder()
+ .name("Scope Name")
+ .description("The name of the scope which is a logical namespace within a bucket, serving to categorize and organize related collections.")
+ .required(true)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .defaultValue(DEFAULT_SCOPE)
+ .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+ .build();
+
+ public static final PropertyDescriptor COLLECTION_NAME = new PropertyDescriptor.Builder()
+ .name("Collection Name")
+ .description("The name of collection which is a logical container within a scope, used to hold documents.")
+ .required(true)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .defaultValue(DEFAULT_COLLECTION)
+ .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+ .build();
+
+ @OnEnabled
+ public void onEnabled(final ConfigurationContext context) {
+ final CouchbaseConnectionService connectionService = context.getProperty(COUCHBASE_CONNECTION_SERVICE).asControllerService(CouchbaseConnectionService.class);
+ final CouchbaseContext couchbaseContext = getCouchbaseContext(context);
+ couchbaseClient = connectionService.getClient(couchbaseContext);
+ }
+
+ public Set getRequiredKeys() {
+ return REQUIRED_KEYS;
+ }
+
+ private CouchbaseContext getCouchbaseContext(ConfigurationContext context) {
+ final String bucketName = context.getProperty(BUCKET_NAME).evaluateAttributeExpressions().getValue();
+ final String scopeName = context.getProperty(SCOPE_NAME).evaluateAttributeExpressions().getValue();
+ final String collectionName = context.getProperty(COLLECTION_NAME).evaluateAttributeExpressions().getValue();
+
+ return new CouchbaseContext(bucketName, scopeName, collectionName, DocumentType.JSON);
+ }
+}
diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/CouchbaseKeyValueLookupService.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/CouchbaseKeyValueLookupService.java
new file mode 100644
index 000000000000..fdc5fd024939
--- /dev/null
+++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/CouchbaseKeyValueLookupService.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.couchbase;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.lookup.LookupFailureException;
+import org.apache.nifi.lookup.StringLookupService;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.services.couchbase.utils.CouchbaseLookupInResult;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+@Tags({"lookup", "enrich", "key", "value", "couchbase"})
+@CapabilityDescription("Lookup a string value from Couchbase Server associated with the specified key. The coordinates that are passed to the lookup must contain the key 'key'.")
+public class CouchbaseKeyValueLookupService extends AbstractCouchbaseService implements StringLookupService {
+
+ private volatile String subDocPath;
+
+ public static final PropertyDescriptor LOOKUP_SUB_DOC_PATH = new PropertyDescriptor.Builder()
+ .name("Lookup Sub-Document Path")
+ .description("The Sub-Document lookup path within the target JSON document.")
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+ .build();
+
+ private static final List PROPERTIES = List.of(
+ COUCHBASE_CONNECTION_SERVICE,
+ BUCKET_NAME,
+ SCOPE_NAME,
+ COLLECTION_NAME,
+ LOOKUP_SUB_DOC_PATH
+ );
+
+ @Override
+ protected List getSupportedPropertyDescriptors() {
+ return PROPERTIES;
+ }
+
+ @Override
+ @OnEnabled
+ public void onEnabled(final ConfigurationContext context) {
+ super.onEnabled(context);
+ subDocPath = context.getProperty(LOOKUP_SUB_DOC_PATH).evaluateAttributeExpressions().getValue();
+ }
+
+ @Override
+ public Optional lookup(Map coordinates) throws LookupFailureException {
+ final Object documentId = coordinates.get(KEY);
+
+ if (documentId == null) {
+ return Optional.empty();
+ }
+ try {
+ final CouchbaseLookupInResult result = couchbaseClient.lookUpIn(documentId.toString(), subDocPath);
+ return Optional.ofNullable(result.resultContent().toString());
+ } catch (Exception e) {
+ throw new LookupFailureException("Key-value lookup from Couchbase failed", e);
+ }
+ }
+}
diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/CouchbaseMapCacheClient.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/CouchbaseMapCacheClient.java
new file mode 100644
index 000000000000..28d2215c3685
--- /dev/null
+++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/CouchbaseMapCacheClient.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.couchbase;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.distributed.cache.client.AtomicCacheEntry;
+import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import org.apache.nifi.services.couchbase.exception.CouchbaseException;
+import org.apache.nifi.services.couchbase.utils.CouchbaseGetResult;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+@Tags({"distributed", "cache", "map", "cluster", "couchbase"})
+@CapabilityDescription("Provides the ability to communicate with a Couchbase Server cluster as a DistributedMapCacheServer." +
+ " This can be used in order to share a Map between nodes in a NiFi cluster." +
+ " Couchbase Server cluster can provide a high available and persistent cache storage.")
+public class CouchbaseMapCacheClient extends AbstractCouchbaseService implements AtomicDistributedMapCacheClient {
+
+ public static final PropertyDescriptor COUCHBASE_CONNECTION_SERVICE = new PropertyDescriptor.Builder()
+ .name("Couchbase Connection Service")
+ .description("A Couchbase Connection Service which manages connections to a Couchbase cluster.")
+ .required(true)
+ .identifiesControllerService(CouchbaseConnectionService.class)
+ .build();
+
+ private static final List PROPERTIES = List.of(
+ COUCHBASE_CONNECTION_SERVICE,
+ BUCKET_NAME,
+ SCOPE_NAME,
+ COLLECTION_NAME
+ );
+
+ @Override
+ protected List getSupportedPropertyDescriptors() {
+ return PROPERTIES;
+ }
+
+ @Override
+ public AtomicCacheEntry fetch(K key, Serializer keySerializer, Deserializer valueDeserializer) throws IOException {
+ final String documentId = serializeDocumentKey(key, keySerializer);
+ try {
+ final CouchbaseGetResult result = couchbaseClient.getDocument(documentId);
+ return new AtomicCacheEntry<>(key, deserializeDocument(valueDeserializer, result.resultContent()), result.cas());
+ } catch (CouchbaseException e) {
+ return null;
+ }
+ }
+
+ @Override
+ public boolean replace(AtomicCacheEntry entry, Serializer keySerializer, Serializer valueSerializer) throws IOException {
+ final String documentId = serializeDocumentKey(entry.getKey(), keySerializer);
+ final byte[] document = serializeDocument(entry.getValue(), valueSerializer);
+
+ try {
+ couchbaseClient.replaceDocument(documentId, document);
+ return true;
+ } catch (CouchbaseException e) {
+ return false;
+ }
+ }
+
+ @Override
+ public boolean putIfAbsent(K key, V value, Serializer keySerializer, Serializer valueSerializer) throws IOException {
+ final String documentId = serializeDocumentKey(key, keySerializer);
+ final byte[] document = serializeDocument(value, valueSerializer);
+
+ try {
+ couchbaseClient.insertDocument(documentId, document);
+ return true;
+ } catch (CouchbaseException e) {
+ return false;
+ }
+ }
+
+ @Override
+ public V getAndPutIfAbsent(K key, V value, Serializer keySerializer, Serializer valueSerializer, Deserializer valueDeserializer) throws IOException {
+ if (containsKey(key, keySerializer)) {
+ return get(key, keySerializer, valueDeserializer);
+ }
+
+ put(key, value, keySerializer, valueSerializer);
+ return value;
+ }
+
+ @Override
+ public boolean containsKey(K key, Serializer keySerializer) throws IOException {
+ final String documentId = serializeDocumentKey(key, keySerializer);
+
+ try {
+ return couchbaseClient.documentExists(documentId);
+ } catch (CouchbaseException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public void put(K key, V value, Serializer keySerializer, Serializer valueSerializer) throws IOException {
+ final String documentId = serializeDocumentKey(key, keySerializer);
+ final byte[] document = serializeDocument(value, valueSerializer);
+
+ try {
+ couchbaseClient.upsertDocument(documentId, document);
+ } catch (CouchbaseException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public V get(K key, Serializer keySerializer, Deserializer valueDeserializer) throws IOException {
+ final String documentId = serializeDocumentKey(key, keySerializer);
+
+ try {
+ final CouchbaseGetResult result = couchbaseClient.getDocument(documentId);
+ return deserializeDocument(valueDeserializer, result.resultContent());
+ } catch (CouchbaseException e) {
+ return null;
+ }
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public boolean remove(K key, Serializer serializer) throws IOException {
+ final String documentId = serializeDocumentKey(key, serializer);
+
+ try {
+ couchbaseClient.removeDocument(documentId);
+ return true;
+ } catch (CouchbaseException e) {
+ return false;
+ }
+ }
+
+ private String serializeDocumentKey(final S key, final Serializer serializer) throws IOException {
+ final String result;
+
+ if (key instanceof String) {
+ result = (String) key;
+ } else {
+ final ByteArrayOutputStream stream = new ByteArrayOutputStream();
+ serializer.serialize(key, stream);
+ result = stream.toString(StandardCharsets.UTF_8);
+ }
+
+ if (result.isEmpty()) {
+ throw new IOException("Cache record key cannot be empty!");
+ }
+
+ return result;
+ }
+
+ private byte[] serializeDocument(final S value, final Serializer serializer) throws IOException {
+ final ByteArrayOutputStream stream = new ByteArrayOutputStream();
+ serializer.serialize(value, stream);
+ return stream.toByteArray();
+ }
+
+ private static V deserializeDocument(final Deserializer deserializer, final byte[] value) throws IOException {
+ return deserializer.deserialize(value);
+ }
+}
diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/CouchbaseRecordLookupService.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/CouchbaseRecordLookupService.java
new file mode 100644
index 000000000000..3ffae09b2c2b
--- /dev/null
+++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/CouchbaseRecordLookupService.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.couchbase;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.json.JsonParserFactory;
+import org.apache.nifi.json.JsonTreeRowRecordReader;
+import org.apache.nifi.json.SchemaApplicationStrategy;
+import org.apache.nifi.json.StartingFieldStrategy;
+import org.apache.nifi.lookup.LookupFailureException;
+import org.apache.nifi.lookup.RecordLookupService;
+import org.apache.nifi.schema.access.InferenceSchemaStrategy;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.services.couchbase.utils.CouchbaseGetResult;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+@Tags({"lookup", "enrich", "couchbase"})
+@CapabilityDescription("Lookup a record from Couchbase Server associated with the specified key. The coordinates that are passed to the lookup must contain the key 'key'.")
+public class CouchbaseRecordLookupService extends AbstractCouchbaseService implements RecordLookupService {
+
+ private static final String DATE_FORMAT = "yyyy-MM-dd";
+ private static final String TIME_FORMAT = "HH:mm:ss.SSSZ";
+ private static final String DATE_TIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSZ";
+ private static final JsonParserFactory jsonParserFactory = new JsonParserFactory();
+
+ private static final List PROPERTIES = List.of(
+ COUCHBASE_CONNECTION_SERVICE,
+ BUCKET_NAME,
+ SCOPE_NAME,
+ COLLECTION_NAME
+ );
+
+ @Override
+ protected List getSupportedPropertyDescriptors() {
+ return PROPERTIES;
+ }
+
+ @Override
+ public Optional lookup(Map coordinates) throws LookupFailureException {
+ final Object documentId = coordinates.get(KEY);
+
+ if (documentId == null) {
+ return Optional.empty();
+ }
+
+ try {
+ final CouchbaseGetResult result = couchbaseClient.getDocument(documentId.toString());
+ final RecordSchema schema = new InferenceSchemaStrategy().getSchema(null, new ByteArrayInputStream(result.resultContent()), null);
+
+ final JsonTreeRowRecordReader recordReader = createJsonReader(new ByteArrayInputStream(result.resultContent()), schema);
+
+ return Optional.ofNullable(recordReader.nextRecord());
+ } catch (Exception e) {
+ throw new LookupFailureException("Record lookup from Couchbase failed", e);
+ }
+ }
+
+ private JsonTreeRowRecordReader createJsonReader(InputStream inputStream, RecordSchema recordSchema) throws IOException, MalformedRecordException {
+ return new JsonTreeRowRecordReader(
+ inputStream,
+ getLogger(),
+ recordSchema,
+ DATE_FORMAT,
+ TIME_FORMAT,
+ DATE_TIME_FORMAT,
+ StartingFieldStrategy.ROOT_NODE,
+ null,
+ SchemaApplicationStrategy.SELECTED_PART,
+ null,
+ jsonParserFactory
+ );
+ }
+}
diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
new file mode 100644
index 000000000000..772920f46fea
--- /dev/null
+++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.services.couchbase.CouchbaseKeyValueLookupService
+org.apache.nifi.services.couchbase.CouchbaseRecordLookupService
+org.apache.nifi.services.couchbase.CouchbaseMapCacheClient
diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/AbstractCouchbaseServiceTest.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/AbstractCouchbaseServiceTest.java
new file mode 100644
index 000000000000..daab9c887845
--- /dev/null
+++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/AbstractCouchbaseServiceTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.couchbase;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public abstract class AbstractCouchbaseServiceTest {
+
+ protected static final String SERVICE_ID = "couchbaseConnectionService";
+ protected static final String TEST_DOCUMENT_ID = "test-document-id";
+ protected static final String TEST_DOCUMENT_CONTENT = "{\"key\":\"value\"}";
+ protected static final long TEST_CAS = 1L;
+
+ protected static CouchbaseConnectionService mockConnectionService(CouchbaseClient client) {
+ final CouchbaseConnectionService connectionService = mock(CouchbaseConnectionService.class);
+ when(connectionService.getIdentifier()).thenReturn(SERVICE_ID);
+ when(connectionService.getClient(any())).thenReturn(client);
+ return connectionService;
+ }
+}
diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseKeyValueLookupServiceTest.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseKeyValueLookupServiceTest.java
new file mode 100644
index 000000000000..0531c1110630
--- /dev/null
+++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseKeyValueLookupServiceTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.couchbase;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.lookup.LookupFailureException;
+import org.apache.nifi.services.couchbase.exception.CouchbaseException;
+import org.apache.nifi.services.couchbase.utils.CouchbaseLookupInResult;
+import org.apache.nifi.util.MockConfigurationContext;
+import org.apache.nifi.util.MockControllerServiceInitializationContext;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.nifi.services.couchbase.AbstractCouchbaseService.COUCHBASE_CONNECTION_SERVICE;
+import static org.apache.nifi.services.couchbase.AbstractCouchbaseService.KEY;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class CouchbaseKeyValueLookupServiceTest extends AbstractCouchbaseServiceTest {
+
+ private CouchbaseKeyValueLookupService lookupService;
+
+ @BeforeEach
+ public void init() {
+ lookupService = new CouchbaseKeyValueLookupService();
+ }
+
+ @Test
+ public void testSuccessfulLookUp() throws CouchbaseException, LookupFailureException {
+ final CouchbaseClient client = mock(CouchbaseClient.class);
+ when(client.lookUpIn(anyString(), any())).thenReturn(new CouchbaseLookupInResult("test result", TEST_CAS));
+
+ final CouchbaseConnectionService connectionService = mockConnectionService(client);
+
+ final MockControllerServiceInitializationContext serviceInitializationContext = new MockControllerServiceInitializationContext(connectionService, SERVICE_ID);
+ final Map properties = Collections.singletonMap(COUCHBASE_CONNECTION_SERVICE, SERVICE_ID);
+ final MockConfigurationContext context = new MockConfigurationContext(properties, serviceInitializationContext, new HashMap<>());
+
+ lookupService.onEnabled(context);
+
+ final Map coordinates = Collections.singletonMap(KEY, TEST_DOCUMENT_ID);
+ final Optional result = lookupService.lookup(coordinates);
+
+ assertTrue(result.isPresent());
+ assertEquals("test result", result.get());
+ }
+
+ @Test
+ public void testLookUpFailure() throws CouchbaseException {
+ final CouchbaseClient client = mock(CouchbaseClient.class);
+ when(client.lookUpIn(anyString(), any())).thenThrow(new CouchbaseException("Test exception"));
+
+ final CouchbaseConnectionService connectionService = mockConnectionService(client);
+
+ final MockControllerServiceInitializationContext serviceInitializationContext = new MockControllerServiceInitializationContext(connectionService, SERVICE_ID);
+ final Map properties = Collections.singletonMap(COUCHBASE_CONNECTION_SERVICE, SERVICE_ID);
+ final MockConfigurationContext context = new MockConfigurationContext(properties, serviceInitializationContext, new HashMap<>());
+
+ lookupService.onEnabled(context);
+
+ final Map coordinates = Collections.singletonMap(KEY, TEST_DOCUMENT_ID);
+
+ assertThrows(LookupFailureException.class, () -> lookupService.lookup(coordinates));
+ }
+
+ @Test
+ public void testMissingKey() throws LookupFailureException {
+ final CouchbaseClient client = mock(CouchbaseClient.class);
+ final CouchbaseConnectionService connectionService = mockConnectionService(client);
+
+ final MockControllerServiceInitializationContext serviceInitializationContext = new MockControllerServiceInitializationContext(connectionService, SERVICE_ID);
+ final Map properties = Collections.singletonMap(COUCHBASE_CONNECTION_SERVICE, SERVICE_ID);
+ final MockConfigurationContext context = new MockConfigurationContext(properties, serviceInitializationContext, new HashMap<>());
+ lookupService.onEnabled(context);
+
+ final Optional result = lookupService.lookup(Collections.emptyMap());
+
+ assertTrue(result.isEmpty());
+ }
+}
diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseMapCacheClientTest.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseMapCacheClientTest.java
new file mode 100644
index 000000000000..36181411791c
--- /dev/null
+++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseMapCacheClientTest.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.couchbase;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import org.apache.nifi.services.couchbase.exception.CouchbaseException;
+import org.apache.nifi.services.couchbase.utils.CouchbaseGetResult;
+import org.apache.nifi.services.couchbase.utils.CouchbaseUpsertResult;
+import org.apache.nifi.util.MockConfigurationContext;
+import org.apache.nifi.util.MockControllerServiceInitializationContext;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.nifi.services.couchbase.AbstractCouchbaseService.COUCHBASE_CONNECTION_SERVICE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class CouchbaseMapCacheClientTest extends AbstractCouchbaseServiceTest {
+
+ private final Serializer stringSerializer = (value, output) -> output.write(value.getBytes(StandardCharsets.UTF_8));
+ private final Deserializer stringDeserializer = input -> new String(input, StandardCharsets.UTF_8);
+ private CouchbaseMapCacheClient mapCacheClient;
+
+ @BeforeEach
+ public void init() {
+ mapCacheClient = new CouchbaseMapCacheClient();
+ }
+
+ @Test
+ public void testCacheGet() throws CouchbaseException, IOException {
+ final CouchbaseClient client = mock(CouchbaseClient.class);
+ when(client.getDocument(anyString())).thenReturn(new CouchbaseGetResult(TEST_DOCUMENT_CONTENT.getBytes(), TEST_CAS));
+
+ final CouchbaseConnectionService connectionService = mockConnectionService(client);
+
+ final MockControllerServiceInitializationContext serviceInitializationContext = new MockControllerServiceInitializationContext(connectionService, SERVICE_ID);
+ final Map properties = Collections.singletonMap(COUCHBASE_CONNECTION_SERVICE, SERVICE_ID);
+ final MockConfigurationContext context = new MockConfigurationContext(properties, serviceInitializationContext, new HashMap<>());
+
+ mapCacheClient.onEnabled(context);
+
+ final String result = mapCacheClient.get(TEST_DOCUMENT_ID, stringSerializer, stringDeserializer);
+
+ assertEquals(TEST_DOCUMENT_CONTENT, result);
+ }
+
+ @Test
+ public void testCacheGetFailure() throws CouchbaseException, IOException {
+ final CouchbaseClient client = mock(CouchbaseClient.class);
+ when(client.getDocument(anyString())).thenThrow(new CouchbaseException("Test exception"));
+
+ final CouchbaseConnectionService connectionService = mockConnectionService(client);
+
+ final MockControllerServiceInitializationContext serviceInitializationContext = new MockControllerServiceInitializationContext(connectionService, SERVICE_ID);
+ final Map properties = Collections.singletonMap(COUCHBASE_CONNECTION_SERVICE, SERVICE_ID);
+ final MockConfigurationContext context = new MockConfigurationContext(properties, serviceInitializationContext, new HashMap<>());
+
+ mapCacheClient.onEnabled(context);
+
+ final String result = mapCacheClient.get(TEST_DOCUMENT_ID, stringSerializer, stringDeserializer);
+
+ assertNull(result);
+ }
+
+ @Test
+ public void testCachePut() throws CouchbaseException, IOException {
+ final CouchbaseClient client = mock(CouchbaseClient.class);
+ when(client.upsertDocument(anyString(), any())).thenReturn(new CouchbaseUpsertResult(TEST_CAS));
+
+ final CouchbaseConnectionService connectionService = mockConnectionService(client);
+
+ final MockControllerServiceInitializationContext serviceInitializationContext = new MockControllerServiceInitializationContext(connectionService, SERVICE_ID);
+ final Map properties = Collections.singletonMap(COUCHBASE_CONNECTION_SERVICE, SERVICE_ID);
+ final MockConfigurationContext context = new MockConfigurationContext(properties, serviceInitializationContext, new HashMap<>());
+
+ mapCacheClient.onEnabled(context);
+
+ mapCacheClient.put(TEST_DOCUMENT_ID, TEST_DOCUMENT_CONTENT, stringSerializer, stringSerializer);
+ verify(client, times(1)).upsertDocument(eq(TEST_DOCUMENT_ID), eq(TEST_DOCUMENT_CONTENT.getBytes()));
+ }
+
+ @Test
+ public void testCachePutFailure() throws CouchbaseException {
+ final CouchbaseClient client = mock(CouchbaseClient.class);
+ when(client.upsertDocument(anyString(), any())).thenThrow(new CouchbaseException("Test exception"));
+
+ final CouchbaseConnectionService connectionService = mockConnectionService(client);
+
+ final MockControllerServiceInitializationContext serviceInitializationContext = new MockControllerServiceInitializationContext(connectionService, SERVICE_ID);
+ final Map properties = Collections.singletonMap(COUCHBASE_CONNECTION_SERVICE, SERVICE_ID);
+ final MockConfigurationContext context = new MockConfigurationContext(properties, serviceInitializationContext, new HashMap<>());
+
+ mapCacheClient.onEnabled(context);
+
+ assertThrows(IOException.class, () -> mapCacheClient.put(TEST_DOCUMENT_ID, TEST_DOCUMENT_CONTENT, stringSerializer, stringSerializer));
+ }
+
+ @Test
+ public void testCacheRemove() throws CouchbaseException, IOException {
+ final CouchbaseClient client = mock(CouchbaseClient.class);
+ final CouchbaseConnectionService connectionService = mockConnectionService(client);
+
+ final MockControllerServiceInitializationContext serviceInitializationContext = new MockControllerServiceInitializationContext(connectionService, SERVICE_ID);
+ final Map properties = Collections.singletonMap(COUCHBASE_CONNECTION_SERVICE, SERVICE_ID);
+ final MockConfigurationContext context = new MockConfigurationContext(properties, serviceInitializationContext, new HashMap<>());
+
+ mapCacheClient.onEnabled(context);
+
+ mapCacheClient.remove(TEST_DOCUMENT_ID, stringSerializer);
+ verify(client, times(1)).removeDocument(eq(TEST_DOCUMENT_ID));
+ }
+}
diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseRecordLookupServiceTest.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseRecordLookupServiceTest.java
new file mode 100644
index 000000000000..4082cf7bcbd4
--- /dev/null
+++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseRecordLookupServiceTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.couchbase;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.lookup.LookupFailureException;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.services.couchbase.exception.CouchbaseException;
+import org.apache.nifi.services.couchbase.utils.CouchbaseGetResult;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.util.MockConfigurationContext;
+import org.apache.nifi.util.MockControllerServiceInitializationContext;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.nifi.services.couchbase.AbstractCouchbaseService.COUCHBASE_CONNECTION_SERVICE;
+import static org.apache.nifi.services.couchbase.AbstractCouchbaseService.KEY;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class CouchbaseRecordLookupServiceTest extends AbstractCouchbaseServiceTest {
+
+ private CouchbaseRecordLookupService lookupService;
+
+ @BeforeEach
+ public void init() {
+ lookupService = new CouchbaseRecordLookupService();
+ }
+
+ @Test
+ public void testSuccessfulLookUp() throws CouchbaseException, LookupFailureException {
+ final CouchbaseClient client = mock(CouchbaseClient.class);
+ when(client.getDocument(anyString())).thenReturn(new CouchbaseGetResult(TEST_DOCUMENT_CONTENT.getBytes(), TEST_CAS));
+
+ final CouchbaseConnectionService connectionService = mockConnectionService(client);
+
+ final MockControllerServiceInitializationContext serviceInitializationContext = new MockControllerServiceInitializationContext(connectionService, SERVICE_ID);
+ final Map properties = Collections.singletonMap(COUCHBASE_CONNECTION_SERVICE, SERVICE_ID);
+ final MockConfigurationContext context = new MockConfigurationContext(properties, serviceInitializationContext, new HashMap<>());
+
+ lookupService.onEnabled(context);
+
+ final Map coordinates = Collections.singletonMap(KEY, TEST_DOCUMENT_ID);
+ final Optional result = lookupService.lookup(coordinates);
+
+ assertTrue(result.isPresent());
+
+ final List fields = Collections.singletonList(new RecordField("key", RecordFieldType.STRING.getDataType()));
+ final Record expectedRecord = new MapRecord(new SimpleRecordSchema(fields), Collections.singletonMap("key", "value"));
+
+ assertEquals(expectedRecord, result.get());
+ }
+
+ @Test
+ public void testLookUpFailure() throws CouchbaseException {
+ final CouchbaseClient client = mock(CouchbaseClient.class);
+ when(client.lookUpIn(anyString(), any())).thenThrow(new CouchbaseException("Test exception"));
+
+ final CouchbaseConnectionService connectionService = mockConnectionService(client);
+
+ final MockControllerServiceInitializationContext serviceInitializationContext = new MockControllerServiceInitializationContext(connectionService, SERVICE_ID);
+ final Map properties = Collections.singletonMap(COUCHBASE_CONNECTION_SERVICE, SERVICE_ID);
+ final MockConfigurationContext context = new MockConfigurationContext(properties, serviceInitializationContext, new HashMap<>());
+
+ lookupService.onEnabled(context);
+
+ final Map coordinates = Collections.singletonMap(KEY, TEST_DOCUMENT_ID);
+
+ assertThrows(LookupFailureException.class, () -> lookupService.lookup(coordinates));
+ }
+
+ @Test
+ public void testMissingKey() throws LookupFailureException {
+ final CouchbaseClient client = mock(CouchbaseClient.class);
+ final CouchbaseConnectionService connectionService = mockConnectionService(client);
+
+ final MockControllerServiceInitializationContext serviceInitializationContext = new MockControllerServiceInitializationContext(connectionService, SERVICE_ID);
+ final Map properties = Collections.singletonMap(COUCHBASE_CONNECTION_SERVICE, SERVICE_ID);
+ final MockConfigurationContext context = new MockConfigurationContext(properties, serviceInitializationContext, new HashMap<>());
+ lookupService.onEnabled(context);
+
+ final Optional result = lookupService.lookup(Collections.emptyMap());
+
+ assertTrue(result.isEmpty());
+ }
+}
diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-standard-services/src/main/java/org/apache/nifi/services/couchbase/StandardCouchbaseClient.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-standard-services/src/main/java/org/apache/nifi/services/couchbase/StandardCouchbaseClient.java
index dce98be91963..9a5fcbdbd197 100644
--- a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-standard-services/src/main/java/org/apache/nifi/services/couchbase/StandardCouchbaseClient.java
+++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-standard-services/src/main/java/org/apache/nifi/services/couchbase/StandardCouchbaseClient.java
@@ -45,22 +45,33 @@
import com.couchbase.client.java.codec.RawBinaryTranscoder;
import com.couchbase.client.java.codec.RawJsonTranscoder;
import com.couchbase.client.java.codec.Transcoder;
+import com.couchbase.client.java.json.JsonObject;
+import com.couchbase.client.java.kv.ExistsResult;
import com.couchbase.client.java.kv.GetOptions;
import com.couchbase.client.java.kv.GetResult;
+import com.couchbase.client.java.kv.InsertOptions;
+import com.couchbase.client.java.kv.LookupInResult;
+import com.couchbase.client.java.kv.LookupInSpec;
import com.couchbase.client.java.kv.MutationResult;
import com.couchbase.client.java.kv.PersistTo;
+import com.couchbase.client.java.kv.ReplaceOptions;
import com.couchbase.client.java.kv.ReplicateTo;
import com.couchbase.client.java.kv.UpsertOptions;
import org.apache.nifi.services.couchbase.exception.CouchbaseException;
import org.apache.nifi.services.couchbase.exception.ExceptionCategory;
import org.apache.nifi.services.couchbase.utils.CouchbaseGetResult;
+import org.apache.nifi.services.couchbase.utils.CouchbaseLookupInResult;
import org.apache.nifi.services.couchbase.utils.CouchbaseUpsertResult;
import org.apache.nifi.services.couchbase.utils.DocumentType;
import org.apache.nifi.services.couchbase.utils.JsonValidator;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
+import java.util.stream.Collectors;
import static java.util.Map.entry;
import static org.apache.nifi.services.couchbase.exception.ExceptionCategory.FAILURE;
@@ -124,11 +135,11 @@ public CouchbaseGetResult getDocument(String documentId) throws CouchbaseExcepti
@Override
public CouchbaseUpsertResult upsertDocument(String documentId, byte[] content) throws CouchbaseException {
- try {
- if (!getInputValidator(documentType).test(content)) {
- throw new CouchbaseException("The provided input is invalid");
- }
+ if (!getInputValidator(documentType).test(content)) {
+ throw new CouchbaseException("The provided input is invalid for document [%s]".formatted(documentId));
+ }
+ try {
final MutationResult result = collection.upsert(documentId, content,
UpsertOptions.upsertOptions()
.durability(persistTo, replicateTo)
@@ -141,6 +152,76 @@ public CouchbaseUpsertResult upsertDocument(String documentId, byte[] content) t
}
}
+ @Override
+ public boolean documentExists(String documentId) throws CouchbaseException {
+ try {
+ final ExistsResult result = collection.exists(documentId);
+ return result.exists();
+ } catch (Exception e) {
+ throw new CouchbaseException("Failed to check document [%s] in Couchbase".formatted(documentId), e);
+ }
+ }
+
+ @Override
+ public void insertDocument(String documentId, byte[] content) throws CouchbaseException {
+ if (!getInputValidator(documentType).test(content)) {
+ throw new CouchbaseException("The provided input is invalid for document [%s]".formatted(documentId));
+ }
+
+ try {
+ collection.insert(documentId, content,
+ InsertOptions.insertOptions()
+ .durability(persistTo, replicateTo)
+ .transcoder(getTranscoder(documentType))
+ .clientContext(new HashMap<>()));
+ } catch (Exception e) {
+ throw new CouchbaseException("Failed to insert document [%s] in Couchbase".formatted(documentId), e);
+ }
+ }
+
+ @Override
+ public void removeDocument(String documentId) throws CouchbaseException {
+ try {
+ collection.remove(documentId);
+ } catch (Exception e) {
+ throw new CouchbaseException("Failed to remove document [%s] in Couchbase".formatted(documentId), e);
+ }
+ }
+
+ @Override
+ public void replaceDocument(String documentId, byte[] content) throws CouchbaseException {
+ if (!getInputValidator(documentType).test(content)) {
+ throw new CouchbaseException("The provided input is invalid for document [%s]".formatted(documentId));
+ }
+
+ try {
+ collection.replace(documentId, content,
+ ReplaceOptions.replaceOptions()
+ .durability(persistTo, replicateTo)
+ .transcoder(getTranscoder(documentType))
+ .clientContext(new HashMap<>()));
+ } catch (Exception e) {
+ throw new CouchbaseException("Failed to replace document [%s] in Couchbase".formatted(documentId), e);
+ }
+ }
+
+ @Override
+ public CouchbaseLookupInResult lookUpIn(String documentId, String subDocPath) throws CouchbaseException {
+ try {
+ final String documentPath = subDocPath == null ? "" : subDocPath;
+ final LookupInResult result = collection.lookupIn(documentId, Collections.singletonList(LookupInSpec.get(documentPath)));
+
+ if (!result.exists(0)) {
+ throw new CouchbaseException("No value found on the requested path [%s] in Couchbase".formatted(subDocPath));
+ }
+
+ final Object lookUpInResult = result.contentAs(0, Object.class);
+ return new CouchbaseLookupInResult(deserializeLookupInResult(lookUpInResult), result.cas());
+ } catch (Exception e) {
+ throw new CouchbaseException("Failed to look up in document [%s] in Couchbase".formatted(documentId), e);
+ }
+ }
+
@Override
public ExceptionCategory getExceptionCategory(Throwable throwable) {
return exceptionMapping.getOrDefault(throwable.getClass(), FAILURE);
@@ -159,4 +240,20 @@ private Predicate getInputValidator(DocumentType documentType) {
case BINARY -> v -> true;
};
}
+
+ private String deserializeLookupInResult(Object result) {
+ if (result instanceof String) {
+ return (String) result;
+ } else if (result instanceof Map) {
+ return JsonObject.from((Map) result).toString();
+ } else if (result instanceof List) {
+ return ((List>) result).stream()
+ .map(this::deserializeLookupInResult)
+ .collect(Collectors.joining(",", "[", "]"));
+ } else if (result instanceof byte[]) {
+ return new String((byte[]) result, StandardCharsets.UTF_8);
+ }
+
+ return result.toString();
+ }
}
diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-standard-services/src/test/java/org/apache/nifi/services/couchbase/TestCouchbaseClient.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-standard-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseClientTest.java
similarity index 50%
rename from nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-standard-services/src/test/java/org/apache/nifi/services/couchbase/TestCouchbaseClient.java
rename to nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-standard-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseClientTest.java
index ddea9075df3a..ccca7fb96eab 100644
--- a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-standard-services/src/test/java/org/apache/nifi/services/couchbase/TestCouchbaseClient.java
+++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-standard-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseClientTest.java
@@ -19,16 +19,23 @@
import com.couchbase.client.java.Collection;
import com.couchbase.client.java.kv.GetResult;
+import com.couchbase.client.java.kv.LookupInResult;
import com.couchbase.client.java.kv.MutationResult;
import com.couchbase.client.java.kv.PersistTo;
import com.couchbase.client.java.kv.ReplicateTo;
import org.apache.nifi.services.couchbase.exception.CouchbaseException;
import org.apache.nifi.services.couchbase.utils.CouchbaseGetResult;
+import org.apache.nifi.services.couchbase.utils.CouchbaseLookupInResult;
import org.apache.nifi.services.couchbase.utils.CouchbaseUpsertResult;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import java.util.Optional;
import static org.apache.nifi.services.couchbase.utils.DocumentType.JSON;
@@ -37,11 +44,12 @@
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-public class TestCouchbaseClient {
+public class CouchbaseClientTest {
private static final String TEST_DOCUMENT_ID = "test-document-id";
private static final long TEST_CAS = 1L;
@@ -73,7 +81,24 @@ void testPutJsonDocumentValidationFailure() {
final StandardCouchbaseClient client = new StandardCouchbaseClient(collection, JSON, PersistTo.ONE, ReplicateTo.ONE);
final Exception exception = assertThrows(CouchbaseException.class, () -> client.upsertDocument(TEST_DOCUMENT_ID, content.getBytes()));
+ assertTrue(exception.getMessage().contains("The provided input is invalid"));
+ }
+
+ @Test
+ void testInsertJsonDocumentValidationFailure() {
+ final String content = "{invalid-json}";
+ final StandardCouchbaseClient client = new StandardCouchbaseClient(collection, JSON, PersistTo.ONE, ReplicateTo.ONE);
+
+ final Exception exception = assertThrows(CouchbaseException.class, () -> client.insertDocument(TEST_DOCUMENT_ID, content.getBytes()));
+ assertTrue(exception.getMessage().contains("The provided input is invalid"));
+ }
+
+ @Test
+ void testReplaceJsonDocumentValidationFailure() {
+ final String content = "{invalid-json}";
+ final StandardCouchbaseClient client = new StandardCouchbaseClient(collection, JSON, PersistTo.ONE, ReplicateTo.ONE);
+ final Exception exception = assertThrows(CouchbaseException.class, () -> client.replaceDocument(TEST_DOCUMENT_ID, content.getBytes()));
assertTrue(exception.getMessage().contains("The provided input is invalid"));
}
@@ -95,4 +120,61 @@ void testGetDocument() throws CouchbaseException {
assertEquals(TEST_CAS, getResult.cas());
assertArrayEquals(content.getBytes(), getResult.resultContent());
}
+
+ @Test
+ void testLookupInWithMapResult() throws CouchbaseException {
+ final String expectedResult = "{\"name\":\"John\",\"age\":\"20\"}";
+ final StandardCouchbaseClient client = new StandardCouchbaseClient(collection, JSON, PersistTo.ONE, ReplicateTo.ONE);
+
+ Map lookupInContent = new HashMap<>();
+ lookupInContent.put("name", "John");
+ lookupInContent.put("age", "20");
+
+ final LookupInResult result = mock(LookupInResult.class);
+ when(result.contentAs(anyInt(), any(Class.class))).thenReturn(lookupInContent);
+ when(result.exists(anyInt())).thenReturn(true);
+ when(result.cas()).thenReturn(TEST_CAS);
+
+ when(collection.lookupIn(anyString(), any())).thenReturn(result);
+
+ final CouchbaseLookupInResult lookupInResult = client.lookUpIn(TEST_DOCUMENT_ID, "");
+
+ assertEquals(expectedResult, lookupInResult.resultContent());
+ assertEquals(TEST_CAS, lookupInResult.cas());
+ }
+
+ @Test
+ void testLookupInWithArrayResult() throws CouchbaseException {
+ final String expectedResult = "[{\"name\":\"John\"},{\"name\":\"Jack\"}]";
+ final StandardCouchbaseClient client = new StandardCouchbaseClient(collection, JSON, PersistTo.ONE, ReplicateTo.ONE);
+
+ List