Skip to content

Commit df52f88

Browse files
committed
NIFI-14861: Add EnrichGraphRecord processor
1 parent 23c327f commit df52f88

File tree

7 files changed

+735
-5
lines changed

7 files changed

+735
-5
lines changed

nifi-extension-bundles/nifi-graph-bundle/nifi-graph-client-service-api/src/main/java/org/apache/nifi/graph/GraphClientService.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818
package org.apache.nifi.graph;
1919

2020
import org.apache.nifi.controller.ControllerService;
21+
import org.apache.nifi.util.Tuple;
2122

23+
import java.util.List;
2224
import java.util.Map;
2325

2426
public interface GraphClientService extends ControllerService {
@@ -30,6 +32,28 @@ public interface GraphClientService extends ControllerService {
3032
String PROPERTIES_SET = "graph.properties.set";
3133
String ROWS_RETURNED = "graph.rows.returned";
3234

33-
Map<String, String> executeQuery(String query, Map<String, Object> parameters, GraphQueryResultCallback handler);
35+
String NODES_TYPE = "Nodes";
36+
String EDGES_TYPE = "Edges";
37+
38+
/**
39+
* Executes the given query using the parameters provided and returns a map of results.
40+
* @param query The query to execute
41+
* @param parameters The parameter values to be used in the query
42+
* @param handler A callback handler to process the results of the query
43+
* @return A map containing the results of the query execution, where the keys are column names and the values are the corresponding data.
44+
*/
45+
Map<String, String> executeQuery(final String query, final Map<String, Object> parameters, final GraphQueryResultCallback handler);
3446
String getTransitUrl();
47+
48+
/**
49+
* Generates a query/statement for setting properties on matched node(s) in the language associated with the Graph Database
50+
* @param componentType The type of component that is executing the query, e.g. "Nodes", "Edges", etc.
51+
* @param identifiersAndValues A tuple containing the name of and value for the property to match on,
52+
* @param nodeType The type of node to match on, e.g. "Person", "Organization", etc.
53+
* @param propertyMap A map of key/value pairs of property names and values to set on the matched node(s)
54+
* @return A query/statement that can be executed against the Graph Database to set the properties on the matched node(s)
55+
*/
56+
default String generateSetPropertiesStatement(final String componentType, final List<Tuple<String, String>> identifiersAndValues, final String nodeType, final Map<String, Object> propertyMap) {
57+
throw new UnsupportedOperationException("This capability is not implemented for this GraphClientService");
58+
}
3559
}

nifi-extension-bundles/nifi-graph-bundle/nifi-graph-processors/src/main/java/org/apache/nifi/processors/graph/EnrichGraphRecord.java

Lines changed: 356 additions & 0 deletions
Large diffs are not rendered by default.

nifi-extension-bundles/nifi-graph-bundle/nifi-graph-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,5 +12,6 @@
1212
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1313
# See the License for the specific language governing permissions and
1414
# limitations under the License.
15+
org.apache.nifi.processors.graph.EnrichGraphRecord
1516
org.apache.nifi.processors.graph.ExecuteGraphQuery
1617
org.apache.nifi.processors.graph.ExecuteGraphQueryRecord

nifi-extension-bundles/nifi-graph-bundle/nifi-graph-processors/src/test/java/org/apache/nifi/processors/graph/MockCypherClientService.java

Lines changed: 87 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,20 @@
2020
import org.apache.nifi.controller.AbstractControllerService;
2121
import org.apache.nifi.graph.GraphClientService;
2222
import org.apache.nifi.graph.GraphQueryResultCallback;
23+
import org.apache.nifi.processor.exception.ProcessException;
24+
import org.apache.nifi.util.Tuple;
2325

26+
import java.util.ArrayList;
2427
import java.util.HashMap;
28+
import java.util.List;
2529
import java.util.Map;
2630

2731
public class MockCypherClientService extends AbstractControllerService implements GraphClientService {
2832

2933
@Override
3034
public Map<String, String> executeQuery(String query, Map<String, Object> parameters, GraphQueryResultCallback handler) {
31-
handler.process(Map.of("name", "John Smith", "age", 40), true);
32-
handler.process(Map.of("name", "John Smith", "age", 40), false);
35+
handler.process(Map.of("name", "John Smith", "age", 40, "relationship", "ASSOCIATED_WITH"), true);
36+
handler.process(Map.of("name", "John Smith", "age", 40, "relationship", "ASSOCIATED_WITH"), false);
3337

3438
Map<String, String> resultAttributes = new HashMap<>();
3539
resultAttributes.put(NODES_CREATED, String.valueOf(1));
@@ -47,4 +51,85 @@ public Map<String, String> executeQuery(String query, Map<String, Object> parame
4751
public String getTransitUrl() {
4852
return "mock://localhost:12345/fake_database";
4953
}
54+
55+
@Override
56+
public String generateSetPropertiesStatement(final String componentType,
57+
final List<Tuple<String, String>> identifiersAndValues,
58+
final String nodeType,
59+
final Map<String, Object> propertyMap) {
60+
61+
StringBuilder queryBuilder = switch (componentType) {
62+
case GraphClientService.NODES_TYPE -> getNodeQueryBuilder(identifiersAndValues, nodeType);
63+
case GraphClientService.EDGES_TYPE -> getEdgeQueryBuilder(identifiersAndValues, nodeType);
64+
default -> throw new ProcessException("Unsupported component type: " + componentType);
65+
};
66+
67+
queryBuilder.append(")\n")
68+
.append("ON MATCH SET ");
69+
70+
List<String> setClauses = new ArrayList<>();
71+
for (Map.Entry<String, Object> entry : propertyMap.entrySet()) {
72+
StringBuilder setClause = new StringBuilder("n.")
73+
.append(entry.getKey())
74+
.append(" = ");
75+
if (entry.getValue() == null) {
76+
setClause.append(" NULL");
77+
} else {
78+
setClause.append("'")
79+
.append(entry.getValue())
80+
.append("'");
81+
}
82+
setClauses.add(setClause.toString());
83+
}
84+
String setClauseString = String.join(", ", setClauses);
85+
queryBuilder.append(setClauseString)
86+
.append("\nON CREATE SET ")
87+
.append(setClauseString);
88+
89+
return queryBuilder.toString();
90+
}
91+
92+
private static StringBuilder getNodeQueryBuilder(List<Tuple<String, String>> identifiersAndValues, String nodeType) {
93+
StringBuilder queryBuilder = new StringBuilder("MERGE (n");
94+
if (nodeType != null && !nodeType.isEmpty()) {
95+
queryBuilder.append(":").append(nodeType);
96+
}
97+
98+
buildMatchClause(identifiersAndValues, queryBuilder);
99+
return queryBuilder;
100+
}
101+
102+
private static StringBuilder getEdgeQueryBuilder(List<Tuple<String, String>> identifiersAndValues, String edgeType) {
103+
StringBuilder queryBuilder = new StringBuilder("MERGE (n)<-[e:");
104+
105+
if (edgeType == null || edgeType.isEmpty()) {
106+
throw new ProcessException("Edge type must not be null or empty");
107+
}
108+
queryBuilder.append(edgeType);
109+
110+
buildMatchClause(identifiersAndValues, queryBuilder);
111+
queryBuilder.append("]-> (x)");
112+
return queryBuilder;
113+
}
114+
115+
private static void buildMatchClause(List<Tuple<String, String>> identifiersAndValues, StringBuilder queryBuilder) {
116+
if (!identifiersAndValues.isEmpty()) {
117+
queryBuilder.append(" {");
118+
119+
List<String> identifierNamesAndValues = new ArrayList<>();
120+
for (Tuple<String, String> identifierAndValue : identifiersAndValues) {
121+
if (identifierAndValue == null || identifierAndValue.getKey() == null || identifierAndValue.getValue() == null) {
122+
throw new ProcessException("Identifiers and values must not be null");
123+
}
124+
125+
final String identifierName = identifierAndValue.getKey();
126+
final Object identifierObject = identifierAndValue.getValue();
127+
if (identifierObject != null) {
128+
identifierNamesAndValues.add(identifierName + ": '" + identifierObject + "'");
129+
}
130+
}
131+
queryBuilder.append(String.join(", ", identifierNamesAndValues))
132+
.append("}");
133+
}
134+
}
50135
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.nifi.processors.graph;
18+
19+
import org.apache.nifi.graph.GraphClientService;
20+
import org.apache.nifi.json.JsonTreeReader;
21+
import org.apache.nifi.processor.Processor;
22+
import org.apache.nifi.serialization.record.Record;
23+
import org.apache.nifi.serialization.RecordReader;
24+
import org.apache.nifi.serialization.record.MockRecordWriter;
25+
import org.apache.nifi.util.MockComponentLog;
26+
import org.apache.nifi.util.MockFlowFile;
27+
import org.apache.nifi.util.TestRunner;
28+
import org.apache.nifi.util.TestRunners;
29+
import org.junit.jupiter.api.BeforeEach;
30+
import org.junit.jupiter.api.Test;
31+
32+
import java.util.HashMap;
33+
import java.util.Map;
34+
35+
import static org.apache.nifi.processors.graph.EnrichGraphRecord.CLIENT_SERVICE;
36+
import static org.apache.nifi.processors.graph.EnrichGraphRecord.IDENTIFIER_FIELD;
37+
import static org.apache.nifi.processors.graph.EnrichGraphRecord.NODE_TYPE;
38+
import static org.apache.nifi.processors.graph.EnrichGraphRecord.READER_SERVICE;
39+
import static org.apache.nifi.processors.graph.EnrichGraphRecord.WRITER_SERVICE;
40+
import static org.junit.jupiter.api.Assertions.assertEquals;
41+
import static org.junit.jupiter.api.Assertions.fail;
42+
43+
public class TestEnrichGraphRecord {
44+
45+
private TestRunner testRunner;
46+
private JsonTreeReader reader;
47+
private Processor processor;
48+
49+
@BeforeEach
50+
public void setup() throws Exception {
51+
processor = new EnrichGraphRecord();
52+
testRunner = TestRunners.newTestRunner(processor);
53+
54+
GraphClientService mockGraphClientService = new MockCypherClientService();
55+
MockRecordWriter writer = new MockRecordWriter();
56+
reader = new JsonTreeReader();
57+
58+
testRunner.setProperty(CLIENT_SERVICE, "graphClient");
59+
testRunner.addControllerService("graphClient", mockGraphClientService);
60+
testRunner.addControllerService("reader", reader);
61+
testRunner.addControllerService("writer", writer);
62+
testRunner.setProperty(READER_SERVICE, "reader");
63+
testRunner.setProperty(WRITER_SERVICE, "writer");
64+
testRunner.enableControllerService(writer);
65+
testRunner.enableControllerService(reader);
66+
testRunner.enableControllerService(mockGraphClientService);
67+
}
68+
69+
@Test
70+
public void testSuccessfulNodeProcessing() {
71+
Map<String, String> attributes = new HashMap<>();
72+
attributes.put("id", "123");
73+
74+
String inputContent = "[{\"id\": \"123\", \"name\": \"Node1\"},{\"id\": \"789\", \"name\": \"Node2\"}]";
75+
testRunner.setProperty(IDENTIFIER_FIELD, "//id");
76+
testRunner.setProperty("name", "//name");
77+
testRunner.enqueue(inputContent.getBytes(), attributes);
78+
79+
testRunner.run();
80+
81+
testRunner.assertTransferCount(EnrichGraphRecord.ORIGINAL, 1);
82+
testRunner.assertTransferCount(EnrichGraphRecord.FAILURE, 0);
83+
testRunner.assertTransferCount(EnrichGraphRecord.GRAPH, 2);
84+
85+
MockFlowFile originalFlowFile = testRunner.getFlowFilesForRelationship(EnrichGraphRecord.ORIGINAL).get(0);
86+
assertEquals("123", originalFlowFile.getAttribute("id"));
87+
MockFlowFile successFlowFile = testRunner.getFlowFilesForRelationship(EnrichGraphRecord.GRAPH).get(0);
88+
89+
try {
90+
RecordReader recordReader = reader.createRecordReader(successFlowFile, successFlowFile.getContentStream(), new MockComponentLog("1", processor));
91+
Record record = recordReader.nextRecord();
92+
assertEquals("John Smith", record.getValue("name"));
93+
assertEquals(40, record.getAsInt("age"));
94+
} catch (Exception e) {
95+
fail("Should not reach here");
96+
}
97+
}
98+
99+
@Test
100+
public void testSuccessfulEdgeProcessing() {
101+
Map<String, String> attributes = new HashMap<>();
102+
attributes.put("id", "123");
103+
104+
String inputContent = "[{\"id\": \"123\", \"name\": \"Node1\", \"relationship\": \"ASSOCIATED_WITH\"}," +
105+
"{\"id\": \"789\", \"name\": \"Node2\",\"relationship\": \"ASSOCIATED_WITH\"}]";
106+
testRunner.setProperty(IDENTIFIER_FIELD, "//relationship");
107+
testRunner.setProperty("name", "//name");
108+
testRunner.setProperty(NODE_TYPE, GraphClientService.EDGES_TYPE);
109+
testRunner.enqueue(inputContent.getBytes(), attributes);
110+
111+
testRunner.run();
112+
113+
testRunner.assertTransferCount(EnrichGraphRecord.ORIGINAL, 1);
114+
testRunner.assertTransferCount(EnrichGraphRecord.FAILURE, 0);
115+
testRunner.assertTransferCount(EnrichGraphRecord.GRAPH, 2);
116+
117+
MockFlowFile successFlowFile = testRunner.getFlowFilesForRelationship(EnrichGraphRecord.GRAPH).get(0);
118+
119+
try {
120+
RecordReader recordReader = reader.createRecordReader(successFlowFile, successFlowFile.getContentStream(), new MockComponentLog("1", processor));
121+
Record record = recordReader.nextRecord();
122+
assertEquals("John Smith", record.getValue("name"));
123+
assertEquals(40, record.getAsInt("age"));
124+
assertEquals("ASSOCIATED_WITH", record.getValue("relationship"));
125+
} catch (Exception e) {
126+
fail("Should not reach here");
127+
}
128+
}
129+
130+
@Test
131+
public void testNullIdentifierValue() {
132+
Map<String, String> attributes = new HashMap<>();
133+
attributes.put("id", "123");
134+
135+
// Two bad identifiers, one good
136+
String inputContent = "[{\"id\": null, \"name\": \"Node1\"},{\"id\": null, \"name\": \"Node2\"},{\"id\": \"123\", \"name\": \"Node3\"}]";
137+
testRunner.setProperty(IDENTIFIER_FIELD, "//id");
138+
testRunner.setProperty("name", "//name");
139+
testRunner.enqueue(inputContent.getBytes(), attributes);
140+
141+
testRunner.run();
142+
143+
testRunner.assertTransferCount(EnrichGraphRecord.ORIGINAL, 0);
144+
testRunner.assertTransferCount(EnrichGraphRecord.FAILURE, 1);
145+
testRunner.assertTransferCount(EnrichGraphRecord.GRAPH, 1);
146+
147+
// Verify 2 failed records
148+
MockFlowFile failedFlowFile = testRunner.getFlowFilesForRelationship(EnrichGraphRecord.FAILURE).get(0);
149+
assertEquals("2", failedFlowFile.getAttribute("record.count"));
150+
}
151+
152+
@Test
153+
public void testFailedProcessing() {
154+
Map<String, String> attributes = new HashMap<>();
155+
attributes.put("id", "null");
156+
157+
String inputContent = "[{\"id\": null, \"name\": \"Node1\"}]";
158+
testRunner.setProperty(IDENTIFIER_FIELD, "//id");
159+
testRunner.setProperty("name", "//name");
160+
161+
testRunner.enqueue(inputContent.getBytes(), attributes);
162+
163+
testRunner.run();
164+
165+
testRunner.assertTransferCount(EnrichGraphRecord.ORIGINAL, 0);
166+
testRunner.assertTransferCount(EnrichGraphRecord.FAILURE, 1);
167+
testRunner.assertTransferCount(EnrichGraphRecord.GRAPH, 0);
168+
}
169+
}

nifi-extension-bundles/nifi-graph-bundle/nifi-graph-processors/src/test/java/org/apache/nifi/processors/graph/TestExecuteGraphQuery.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,9 +85,10 @@ private void testExecute(int success, int failure, int original) throws Exceptio
8585
assertNotNull(parsed);
8686
assertEquals(2, parsed.size());
8787
for (Map<String, Object> result : parsed) {
88-
assertEquals(2, result.size());
88+
assertEquals(3, result.size());
8989
assertTrue(result.containsKey("name"));
9090
assertTrue(result.containsKey("age"));
91+
assertTrue(result.containsKey("relationship"));
9192
}
9293
}
9394
}

0 commit comments

Comments
 (0)