Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
package org.apache.nifi.graph;

import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.util.Tuple;

import java.util.List;
import java.util.Map;

public interface GraphClientService extends ControllerService {
Expand All @@ -30,6 +32,28 @@ public interface GraphClientService extends ControllerService {
String PROPERTIES_SET = "graph.properties.set";
String ROWS_RETURNED = "graph.rows.returned";

Map<String, String> executeQuery(String query, Map<String, Object> parameters, GraphQueryResultCallback handler);
String NODES_TYPE = "Nodes";
String EDGES_TYPE = "Edges";

/**
* Executes the given query using the parameters provided and returns a map of results.
* @param query The query to execute
* @param parameters The parameter values to be used in the query
* @param handler A callback handler to process the results of the query
* @return A map containing the results of the query execution, where the keys are column names and the values are the corresponding data.
*/
Map<String, String> executeQuery(final String query, final Map<String, Object> parameters, final GraphQueryResultCallback handler);
String getTransitUrl();

/**
* Generates a query/statement for setting properties on matched node(s) in the language associated with the Graph Database
* @param componentType The type of component that is executing the query, e.g. "Nodes", "Edges", etc.
* @param identifiersAndValues A tuple containing the name of and value for the property to match on,
* @param nodeType The type of node to match on, e.g. "Person", "Organization", etc.
* @param propertyMap A map of key/value pairs of property names and values to set on the matched node(s)
* @return A query/statement that can be executed against the Graph Database to set the properties on the matched node(s)
*/
default String generateSetPropertiesStatement(final String componentType, final List<Tuple<String, String>> identifiersAndValues, final String nodeType, final Map<String, Object> propertyMap) {
throw new UnsupportedOperationException("This capability is not implemented for this GraphClientService");
}
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.processors.graph.EnrichGraphRecord
org.apache.nifi.processors.graph.ExecuteGraphQuery
org.apache.nifi.processors.graph.ExecuteGraphQueryRecord
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,20 @@
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.graph.GraphClientService;
import org.apache.nifi.graph.GraphQueryResultCallback;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.util.Tuple;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class MockCypherClientService extends AbstractControllerService implements GraphClientService {

@Override
public Map<String, String> executeQuery(String query, Map<String, Object> parameters, GraphQueryResultCallback handler) {
handler.process(Map.of("name", "John Smith", "age", 40), true);
handler.process(Map.of("name", "John Smith", "age", 40), false);
handler.process(Map.of("name", "John Smith", "age", 40, "relationship", "ASSOCIATED_WITH"), true);
handler.process(Map.of("name", "John Smith", "age", 40, "relationship", "ASSOCIATED_WITH"), false);

Map<String, String> resultAttributes = new HashMap<>();
resultAttributes.put(NODES_CREATED, String.valueOf(1));
Expand All @@ -47,4 +51,85 @@ public Map<String, String> executeQuery(String query, Map<String, Object> parame
public String getTransitUrl() {
return "mock://localhost:12345/fake_database";
}

@Override
public String generateSetPropertiesStatement(final String componentType,
final List<Tuple<String, String>> identifiersAndValues,
final String nodeType,
final Map<String, Object> propertyMap) {

StringBuilder queryBuilder = switch (componentType) {
case GraphClientService.NODES_TYPE -> getNodeQueryBuilder(identifiersAndValues, nodeType);
case GraphClientService.EDGES_TYPE -> getEdgeQueryBuilder(identifiersAndValues, nodeType);
default -> throw new ProcessException("Unsupported component type: " + componentType);
};

queryBuilder.append(")\n")
.append("ON MATCH SET ");

List<String> setClauses = new ArrayList<>();
for (Map.Entry<String, Object> entry : propertyMap.entrySet()) {
StringBuilder setClause = new StringBuilder("n.")
.append(entry.getKey())
.append(" = ");
if (entry.getValue() == null) {
setClause.append(" NULL");
} else {
setClause.append("'")
.append(entry.getValue())
.append("'");
}
setClauses.add(setClause.toString());
}
String setClauseString = String.join(", ", setClauses);
queryBuilder.append(setClauseString)
.append("\nON CREATE SET ")
.append(setClauseString);

return queryBuilder.toString();
}

private static StringBuilder getNodeQueryBuilder(List<Tuple<String, String>> identifiersAndValues, String nodeType) {
StringBuilder queryBuilder = new StringBuilder("MERGE (n");
if (nodeType != null && !nodeType.isEmpty()) {
queryBuilder.append(":").append(nodeType);
}

buildMatchClause(identifiersAndValues, queryBuilder);
return queryBuilder;
}

private static StringBuilder getEdgeQueryBuilder(List<Tuple<String, String>> identifiersAndValues, String edgeType) {
StringBuilder queryBuilder = new StringBuilder("MERGE (n)<-[e:");

if (edgeType == null || edgeType.isEmpty()) {
throw new ProcessException("Edge type must not be null or empty");
}
queryBuilder.append(edgeType);

buildMatchClause(identifiersAndValues, queryBuilder);
queryBuilder.append("]-> (x)");
return queryBuilder;
}

private static void buildMatchClause(List<Tuple<String, String>> identifiersAndValues, StringBuilder queryBuilder) {
if (!identifiersAndValues.isEmpty()) {
queryBuilder.append(" {");

List<String> identifierNamesAndValues = new ArrayList<>();
for (Tuple<String, String> identifierAndValue : identifiersAndValues) {
if (identifierAndValue == null || identifierAndValue.getKey() == null || identifierAndValue.getValue() == null) {
throw new ProcessException("Identifiers and values must not be null");
}

final String identifierName = identifierAndValue.getKey();
final Object identifierObject = identifierAndValue.getValue();
if (identifierObject != null) {
identifierNamesAndValues.add(identifierName + ": '" + identifierObject + "'");
}
}
queryBuilder.append(String.join(", ", identifierNamesAndValues))
.append("}");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.graph;

import org.apache.nifi.graph.GraphClientService;
import org.apache.nifi.json.JsonTreeReader;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.record.MockRecordWriter;
import org.apache.nifi.util.MockComponentLog;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.HashMap;
import java.util.Map;

import static org.apache.nifi.processors.graph.EnrichGraphRecord.CLIENT_SERVICE;
import static org.apache.nifi.processors.graph.EnrichGraphRecord.IDENTIFIER_FIELD;
import static org.apache.nifi.processors.graph.EnrichGraphRecord.NODE_TYPE;
import static org.apache.nifi.processors.graph.EnrichGraphRecord.READER_SERVICE;
import static org.apache.nifi.processors.graph.EnrichGraphRecord.WRITER_SERVICE;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;

public class TestEnrichGraphRecord {

private TestRunner testRunner;
private JsonTreeReader reader;
private Processor processor;

@BeforeEach
public void setup() throws Exception {
processor = new EnrichGraphRecord();
testRunner = TestRunners.newTestRunner(processor);

GraphClientService mockGraphClientService = new MockCypherClientService();
MockRecordWriter writer = new MockRecordWriter();
reader = new JsonTreeReader();

testRunner.setProperty(CLIENT_SERVICE, "graphClient");
testRunner.addControllerService("graphClient", mockGraphClientService);
testRunner.addControllerService("reader", reader);
testRunner.addControllerService("writer", writer);
testRunner.setProperty(READER_SERVICE, "reader");
testRunner.setProperty(WRITER_SERVICE, "writer");
testRunner.enableControllerService(writer);
testRunner.enableControllerService(reader);
testRunner.enableControllerService(mockGraphClientService);
}

@Test
public void testSuccessfulNodeProcessing() {
Map<String, String> attributes = new HashMap<>();
attributes.put("id", "123");

String inputContent = "[{\"id\": \"123\", \"name\": \"Node1\"},{\"id\": \"789\", \"name\": \"Node2\"}]";
testRunner.setProperty(IDENTIFIER_FIELD, "//id");
testRunner.setProperty("name", "//name");
testRunner.enqueue(inputContent.getBytes(), attributes);

testRunner.run();

testRunner.assertTransferCount(EnrichGraphRecord.ORIGINAL, 1);
testRunner.assertTransferCount(EnrichGraphRecord.FAILURE, 0);
testRunner.assertTransferCount(EnrichGraphRecord.GRAPH, 2);

MockFlowFile originalFlowFile = testRunner.getFlowFilesForRelationship(EnrichGraphRecord.ORIGINAL).get(0);
assertEquals("123", originalFlowFile.getAttribute("id"));
MockFlowFile successFlowFile = testRunner.getFlowFilesForRelationship(EnrichGraphRecord.GRAPH).get(0);

try {
RecordReader recordReader = reader.createRecordReader(successFlowFile, successFlowFile.getContentStream(), new MockComponentLog("1", processor));
Record record = recordReader.nextRecord();
assertEquals("John Smith", record.getValue("name"));
assertEquals(40, record.getAsInt("age"));
} catch (Exception e) {
fail("Should not reach here");
}
}

@Test
public void testSuccessfulEdgeProcessing() {
Map<String, String> attributes = new HashMap<>();
attributes.put("id", "123");

String inputContent = "[{\"id\": \"123\", \"name\": \"Node1\", \"relationship\": \"ASSOCIATED_WITH\"}," +
"{\"id\": \"789\", \"name\": \"Node2\",\"relationship\": \"ASSOCIATED_WITH\"}]";
testRunner.setProperty(IDENTIFIER_FIELD, "//relationship");
testRunner.setProperty("name", "//name");
testRunner.setProperty(NODE_TYPE, GraphClientService.EDGES_TYPE);
testRunner.enqueue(inputContent.getBytes(), attributes);

testRunner.run();

testRunner.assertTransferCount(EnrichGraphRecord.ORIGINAL, 1);
testRunner.assertTransferCount(EnrichGraphRecord.FAILURE, 0);
testRunner.assertTransferCount(EnrichGraphRecord.GRAPH, 2);

MockFlowFile successFlowFile = testRunner.getFlowFilesForRelationship(EnrichGraphRecord.GRAPH).get(0);

try {
RecordReader recordReader = reader.createRecordReader(successFlowFile, successFlowFile.getContentStream(), new MockComponentLog("1", processor));
Record record = recordReader.nextRecord();
assertEquals("John Smith", record.getValue("name"));
assertEquals(40, record.getAsInt("age"));
assertEquals("ASSOCIATED_WITH", record.getValue("relationship"));
} catch (Exception e) {
fail("Should not reach here");
}
}

@Test
public void testNullIdentifierValue() {
Map<String, String> attributes = new HashMap<>();
attributes.put("id", "123");

// Two bad identifiers, one good
String inputContent = "[{\"id\": null, \"name\": \"Node1\"},{\"id\": null, \"name\": \"Node2\"},{\"id\": \"123\", \"name\": \"Node3\"}]";
testRunner.setProperty(IDENTIFIER_FIELD, "//id");
testRunner.setProperty("name", "//name");
testRunner.enqueue(inputContent.getBytes(), attributes);

testRunner.run();

testRunner.assertTransferCount(EnrichGraphRecord.ORIGINAL, 0);
testRunner.assertTransferCount(EnrichGraphRecord.FAILURE, 1);
testRunner.assertTransferCount(EnrichGraphRecord.GRAPH, 1);

// Verify 2 failed records
MockFlowFile failedFlowFile = testRunner.getFlowFilesForRelationship(EnrichGraphRecord.FAILURE).get(0);
assertEquals("2", failedFlowFile.getAttribute("record.count"));
}

@Test
public void testFailedProcessing() {
Map<String, String> attributes = new HashMap<>();
attributes.put("id", "null");

String inputContent = "[{\"id\": null, \"name\": \"Node1\"}]";
testRunner.setProperty(IDENTIFIER_FIELD, "//id");
testRunner.setProperty("name", "//name");

testRunner.enqueue(inputContent.getBytes(), attributes);

testRunner.run();

testRunner.assertTransferCount(EnrichGraphRecord.ORIGINAL, 0);
testRunner.assertTransferCount(EnrichGraphRecord.FAILURE, 1);
testRunner.assertTransferCount(EnrichGraphRecord.GRAPH, 0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,10 @@ private void testExecute(int success, int failure, int original) throws Exceptio
assertNotNull(parsed);
assertEquals(2, parsed.size());
for (Map<String, Object> result : parsed) {
assertEquals(2, result.size());
assertEquals(3, result.size());
assertTrue(result.containsKey("name"));
assertTrue(result.containsKey("age"));
assertTrue(result.containsKey("relationship"));
}
}
}
Loading
Loading