diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..108e0cc --- /dev/null +++ b/.gitignore @@ -0,0 +1,16 @@ +# Eclipse +.classpath +.project +.settings/ + +# Intellij +.idea/ +*.iml +*.iws + +# Mac +.DS_Store + +# Maven +log/ +target/ \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..13d6a8e --- /dev/null +++ b/README.md @@ -0,0 +1,26 @@ + +# Nifi OPC-UA Bundle + +This is a bundle of OPC UA controller service and processors for Nifi. The bundle is an improvement built on top of the OPC UA bundle made by `HashmapInc.`(source code here: [https://github.com/hashmapinc/nifi-opcua-bundle](https://github.com/hashmapinc/nifi-opcua-bundle) ) +A couple of differences between the new bundle and the `HashmapInc.` one: + + 1. The `HashmapInc.` is based on [OPC UA-Java Stack](https://github.com/OPCFoundation/UA-Java), which provides more bottom-level APIs. The new bundle is based on [Apache Milo](https://github.com/eclipse/milo), which is built on top of [OPC UA-Java Stack](https://github.com/OPCFoundation/UA-Java) but provides more high-level APIs and more advanced functionalities such as *subscription*. + 2. The new bundle adds a `SubscribeOPCNodes` processor, which allows the user to specify a list of OPC tags to subscribe to. The processor will produce flowfiles, when value changes on subscribed tags are detected. + 3. Adds an option to the `GetOPCData` processor so that the user can specify the source of tag list as a local file. The processor will get values of all tags listed in the file from the OPC. + 4. Adds an option to the `ListOPCNodes` processor, so that user may choose to not get nodes which are not leaves of the tree. This could come in handy, since in most cases, only leaf nodes contain value. + 5. Minor tweaks to improve performance as well as to adapt to our use case. + +## Build Instructions +Build it with Maven: +``` +mvn clean install +``` +Find the built nar file here: +``` +/nifi-custom-listen-tcp-nar/target/nifi-opcua-nar-.nar +``` +and copy it to the following directory of the running Nifi instance: +``` +/opt/nifi/nifi-1.4.0/lib +``` +Restart Nifi, then you can find the new processors available. \ No newline at end of file diff --git a/nifi-opcua-nar/pom.xml b/nifi-opcua-nar/pom.xml new file mode 100644 index 0000000..dad6aab --- /dev/null +++ b/nifi-opcua-nar/pom.xml @@ -0,0 +1,51 @@ + + + + 4.0.0 + + + de.fraunhofer.fit + nifi-opcua-bundle + 1.0 + + + nifi-opcua-nar + 1.0 + nar + + true + true + + + + + de.fraunhofer.fit + nifi-opcua-processors + 1.0 + + + de.fraunhofer.fit + nifi-opcua-service-api + 1.0 + + + + + diff --git a/nifi-opcua-processors/pom.xml b/nifi-opcua-processors/pom.xml new file mode 100644 index 0000000..376045c --- /dev/null +++ b/nifi-opcua-processors/pom.xml @@ -0,0 +1,63 @@ + + + + 4.0.0 + + + de.fraunhofer.fit + nifi-opcua-bundle + 1.0 + + + nifi-opcua-processors + jar + + + + org.apache.nifi + nifi-api + + + de.fraunhofer.fit + nifi-opcua-service-api + 1.0 + + + org.apache.nifi + nifi-utils + + + org.apache.nifi + nifi-mock + test + + + org.slf4j + slf4j-simple + test + + + junit + junit + test + + + de.fraunhofer.fit + nifi-opcua-service + 1.0 + + + diff --git a/nifi-opcua-processors/src/main/java/de/fraunhofer/fit/processors/opcua/GetOPCData.java b/nifi-opcua-processors/src/main/java/de/fraunhofer/fit/processors/opcua/GetOPCData.java new file mode 100644 index 0000000..a0866fe --- /dev/null +++ b/nifi-opcua-processors/src/main/java/de/fraunhofer/fit/processors/opcua/GetOPCData.java @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package de.fraunhofer.fit.processors.opcua; + +import de.fraunhofer.fit.opcua.OPCUAService; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.Validator; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.*; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.StringReader; +import java.nio.charset.Charset; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.*; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; + +@Tags({"opc"}) +@CapabilityDescription("Get the data of specified nodes from a OPC UA server.") +public class GetOPCData extends AbstractProcessor { + + private final AtomicReference timestamp = new AtomicReference<>(); + private final AtomicBoolean excludeNullValue = new AtomicBoolean(); + private String nullValueString = ""; + + private List tagList; + + public static final PropertyDescriptor OPCUA_SERVICE = new PropertyDescriptor.Builder() + .name("OPC UA Service") + .description("Specifies the OPC UA Service that can be used to access data") + .required(true) + .identifiesControllerService(OPCUAService.class) + .sensitive(false) + .build(); + + public static final PropertyDescriptor RETURN_TIMESTAMP = new PropertyDescriptor + .Builder().name("Return Timestamp") + .description("Allows to select the source, server, or both timestamps") + .required(true) + .sensitive(false) + .allowableValues("SourceTimestamp", "ServerTimestamp", "Both") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor TAG_LIST_SOURCE = new PropertyDescriptor + .Builder().name("Tag List Source") + .description("Either get the tag list from the flow file, or from a dynamic property") + .required(true) + .allowableValues("Flowfile", "Local File") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .sensitive(false) + .build(); + + public static final PropertyDescriptor TAG_LIST_FILE = new PropertyDescriptor + .Builder().name("Default Tag List Name") + .description("The location of the tag list file") + .required(true) + .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) + .sensitive(false) + .build(); + + public static final PropertyDescriptor TAG_LIST_FILE_CACHE = new PropertyDescriptor + .Builder().name("Tag List Cache Expiry Time") + .description("The number of seconds between tag list re-reads") + .required(false) + .addValidator(StandardValidators.INTEGER_VALIDATOR) + .sensitive(false) + .build(); + + public static final PropertyDescriptor EXCLUDE_NULL_VALUE = new PropertyDescriptor + .Builder().name("Exclude Null Value") + .description("Return data only for non null values") + .required(true) + .sensitive(false) + .allowableValues("No", "Yes") + .defaultValue("No") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor NULL_VALUE_STRING = new PropertyDescriptor + .Builder().name("Null Value String") + .description("If removing null values, what string is used for null") + .required(false) + .sensitive(false) + .addValidator(Validator.VALID) + .build(); + + public static final Relationship SUCCESS = new Relationship.Builder() + .name("Success") + .description("Successful OPC read") + .build(); + + public static final Relationship FAILURE = new Relationship.Builder() + .name("Failure") + .description("Failed OPC read") + .build(); + + private List descriptors; + + private Set relationships; + + @Override + protected void init(final ProcessorInitializationContext context) { + final List descriptors = new ArrayList<>(); + descriptors.add(OPCUA_SERVICE); + descriptors.add(RETURN_TIMESTAMP); + descriptors.add(EXCLUDE_NULL_VALUE); + descriptors.add(NULL_VALUE_STRING); + descriptors.add(TAG_LIST_FILE); + descriptors.add(TAG_LIST_SOURCE); + descriptors.add(TAG_LIST_FILE_CACHE); + this.descriptors = Collections.unmodifiableList(descriptors); + + final Set relationships = new HashSet(); + relationships.add(SUCCESS); + relationships.add(FAILURE); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + public Set getRelationships() { + return this.relationships; + } + + @Override + public final List getSupportedPropertyDescriptors() { + return descriptors; + } + + @OnScheduled + public void onScheduled(final ProcessContext context) { + timestamp.set(context.getProperty(RETURN_TIMESTAMP).getValue()); + excludeNullValue.set(context.getProperty(EXCLUDE_NULL_VALUE).getValue().equals("Yes")); + if (context.getProperty(NULL_VALUE_STRING).isSet()) { + nullValueString = context.getProperty(NULL_VALUE_STRING).getValue(); + } + + // Now every time onSchedule is triggered, data will be read from file anew + if (context.getProperty(TAG_LIST_SOURCE).toString().equals("Local File")) { + try { + tagList = parseFile(Paths.get(context.getProperty(TAG_LIST_FILE).toString())); + } catch (IOException e) { + getLogger().error("Error reading tag list from local file."); + } + } + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + + // Initialize response variable + final AtomicReference> requestedTagnames = new AtomicReference<>(); + // Submit to getValue + OPCUAService opcUAService; + + try { + opcUAService = context.getProperty(OPCUA_SERVICE) + .asControllerService(OPCUAService.class); + } catch (Exception ex) { + getLogger().error(ex.getMessage()); + return; + } + + if (context.getProperty(TAG_LIST_SOURCE).toString().equals("Flowfile")) { + + // get FlowFile + FlowFile flowFile = session.get(); + if (flowFile == null) + return; + + // Read tag name from flow file content + session.read(flowFile, in -> { + try { + // TODO: combine this with parseFile + List tagname = new BufferedReader(new InputStreamReader(in)) + .lines().collect(Collectors.toList()); + + requestedTagnames.set(tagname); + + } catch (Exception e) { + getLogger().error("Failed to read flowfile " + e.getMessage()); + } + }); + } else { + + if(tagList == null) + return; + + try { + requestedTagnames.set(tagList); + } catch (Exception ex) { + getLogger().error(ex.getMessage()); + return; + } + } + + byte[] values = opcUAService.getValue(requestedTagnames.get(), timestamp.get(), + excludeNullValue.get(), nullValueString); + + FlowFile flowFile; + flowFile = session.get(); + + if (flowFile == null) + flowFile = session.create(); + + // Write the results back out to flow file + try { + flowFile = session.write(flowFile, out -> out.write(values)); + session.transfer(flowFile, SUCCESS); + } catch (ProcessException ex) { + getLogger().error("Unable to process", ex); + session.transfer(flowFile, FAILURE); + } + + } + + + private List parseFile(Path filePath) throws IOException { + byte[] encoded; + encoded = Files.readAllBytes(filePath); + String fileContent = new String(encoded, Charset.defaultCharset()); + return new BufferedReader(new StringReader(fileContent)).lines().collect(Collectors.toList()); + } +} diff --git a/nifi-opcua-processors/src/main/java/de/fraunhofer/fit/processors/opcua/ListOPCNodes.java b/nifi-opcua-processors/src/main/java/de/fraunhofer/fit/processors/opcua/ListOPCNodes.java new file mode 100644 index 0000000..2ccb97e --- /dev/null +++ b/nifi-opcua-processors/src/main/java/de/fraunhofer/fit/processors/opcua/ListOPCNodes.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package de.fraunhofer.fit.processors.opcua; + +import de.fraunhofer.fit.opcua.OPCUAService; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.Validator; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.ReadsAttributes; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.util.StandardValidators; + +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +@Tags({"example"}) +@CapabilityDescription("Provide a description") +@SeeAlso({}) +@ReadsAttributes({@ReadsAttribute(attribute = "", description = "")}) +@WritesAttributes({@WritesAttribute(attribute = "", description = "")}) +public class ListOPCNodes extends AbstractProcessor { + + private static String starting_node = null; + private static String print_indentation = "No"; + private static Integer max_recursiveDepth; + private static Integer max_reference_per_node; + private static boolean print_non_leaf_nodes; + + public static final PropertyDescriptor OPCUA_SERVICE = new PropertyDescriptor.Builder() + .name("OPC UA Service") + .description("Specifies the OPC UA Service that can be used to access data") + .required(true) + .identifiesControllerService(OPCUAService.class) + .build(); + + public static final PropertyDescriptor STARTING_NODE = new PropertyDescriptor + .Builder().name("Starting Nodes") + .description("From what node should Nifi begin browsing the node tree. Default is the root node. Seperate multiple nodes with a comma (,)") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor RECURSIVE_DEPTH = new PropertyDescriptor + .Builder().name("Recursive Depth") + .description("Maximum depth from the starting node to read, Default is 0") + .required(true) + .addValidator(StandardValidators.INTEGER_VALIDATOR) + .build(); + + public static final PropertyDescriptor PRINT_INDENTATION = new PropertyDescriptor + .Builder().name("Print Indentation") + .description("Should Nifi add indentation to the output text") + .required(true) + .defaultValue("") + .addValidator(Validator.VALID) + .build(); + + public static final PropertyDescriptor MAX_REFERENCE_PER_NODE = new PropertyDescriptor + .Builder().name("Max References Per Node") + .description("The number of Reference Descriptions to pull per node query.") + .required(true) + .defaultValue("1000") + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .build(); + + public static final PropertyDescriptor PRINT_NON_LEAF_NODES = new PropertyDescriptor + .Builder().name("Print Non Leaf Nodes") + .description("Whether or not to print the nodes which are not leaves.") + .required(true) + .defaultValue("true") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .build(); + + public static final Relationship SUCCESS = new Relationship.Builder() + .name("Success") + .description("Successful OPC read") + .build(); + + public static final Relationship FAILURE = new Relationship.Builder() + .name("Failure") + .description("Failed OPC read") + .build(); + + public List descriptors; + + public Set relationships; + + @Override + protected void init(final ProcessorInitializationContext context) { + final List descriptors = new ArrayList<>(); + descriptors.add(OPCUA_SERVICE); + descriptors.add(RECURSIVE_DEPTH); + descriptors.add(STARTING_NODE); + descriptors.add(PRINT_INDENTATION); + descriptors.add(MAX_REFERENCE_PER_NODE); + descriptors.add(PRINT_NON_LEAF_NODES); + + this.descriptors = Collections.unmodifiableList(descriptors); + + final Set relationships = new HashSet<>(); + relationships.add(SUCCESS); + relationships.add(FAILURE); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + public Set getRelationships() { + return this.relationships; + } + + @Override + public final List getSupportedPropertyDescriptors() { + return descriptors; + } + + @OnScheduled + public void onScheduled(final ProcessContext context) { + print_indentation = context.getProperty(PRINT_INDENTATION).getValue(); + max_recursiveDepth = Integer.valueOf(context.getProperty(RECURSIVE_DEPTH).getValue()); + starting_node = context.getProperty(STARTING_NODE).getValue(); + max_reference_per_node = Integer.valueOf(context.getProperty(MAX_REFERENCE_PER_NODE).getValue()); + print_non_leaf_nodes = Boolean.valueOf(context.getProperty(PRINT_NON_LEAF_NODES).getValue()); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + + final OPCUAService opcUAService = context.getProperty(OPCUA_SERVICE) + .asControllerService(OPCUAService.class); + + byte[] nodes = opcUAService.getNodes(print_indentation, max_recursiveDepth, + max_reference_per_node, print_non_leaf_nodes, starting_node); + + // Write the results back out to a flow file + FlowFile flowFile = session.create(); + + if (flowFile != null) { + try { + flowFile = session.write(flowFile, (OutputStream out) -> { + out.write(nodes); + }); + + // Transfer data to flow file + session.transfer(flowFile, SUCCESS); + } catch (ProcessException ex) { + getLogger().error("Unable to process", ex); + session.transfer(flowFile, FAILURE); + } + } + + } +} diff --git a/nifi-opcua-processors/src/main/java/de/fraunhofer/fit/processors/opcua/SubscribeOPCNodes.java b/nifi-opcua-processors/src/main/java/de/fraunhofer/fit/processors/opcua/SubscribeOPCNodes.java new file mode 100644 index 0000000..de9475d --- /dev/null +++ b/nifi-opcua-processors/src/main/java/de/fraunhofer/fit/processors/opcua/SubscribeOPCNodes.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package de.fraunhofer.fit.processors.opcua; + +import de.fraunhofer.fit.opcua.OPCUAService; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.*; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.OutputStream; +import java.io.StringReader; +import java.nio.charset.Charset; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.*; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.stream.Collectors; + +@Tags({"opc"}) +@CapabilityDescription("Subscribe to a list of nodes and output flowfiles when changes are detected.") +public class SubscribeOPCNodes extends AbstractProcessor { + + private OPCUAService opcUaService; + private BlockingQueue msgQueue; + private String subscriberUid; + + public static final PropertyDescriptor OPCUA_SERVICE = new PropertyDescriptor.Builder() + .name("OPC UA Service") + .description("Specifies the OPC UA Service that can be used to access data") + .required(true) + .identifiesControllerService(OPCUAService.class) + .build(); + + public static final PropertyDescriptor TAG_FILE_LOCATION = new PropertyDescriptor + .Builder().name("Location of the Tag List File") + .description("The location of the tag list file") + .required(true) + .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) + .sensitive(false) + .build(); + + public static final Relationship SUCCESS = new Relationship.Builder() + .name("Success") + .description("Successful OPC read") + .build(); + + public static final Relationship FAILURE = new Relationship.Builder() + .name("Failure") + .description("Failed OPC read") + .build(); + + public List descriptors; + + public Set relationships; + + @Override + protected void init(final ProcessorInitializationContext context) { + final List descriptors = new ArrayList<>(); + descriptors.add(OPCUA_SERVICE); + descriptors.add(TAG_FILE_LOCATION); + + this.descriptors = Collections.unmodifiableList(descriptors); + + final Set relationships = new HashSet<>(); + relationships.add(SUCCESS); + relationships.add(FAILURE); + this.relationships = Collections.unmodifiableSet(relationships); + + msgQueue = new LinkedBlockingQueue<>(); + } + + @Override + public Set getRelationships() { + return this.relationships; + } + + @Override + public final List getSupportedPropertyDescriptors() { + return descriptors; + } + + @OnScheduled + public void onScheduled(final ProcessContext context) { + + opcUaService = context.getProperty(OPCUA_SERVICE) + .asControllerService(OPCUAService.class); + + List tagNames; + try { + tagNames = parseFile(Paths.get(context.getProperty(TAG_FILE_LOCATION).toString())); + } catch (IOException e) { + getLogger().error("Error reading tag list from local file."); + return; + } + + subscriberUid = opcUaService.subscribe(tagNames, msgQueue); + + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + + String msg; + System.out.println("OnTrigger called! there are so many messages here: " + msgQueue.size()); + while ((msg = msgQueue.poll()) != null) { + // Write the results back out to a flow file + FlowFile flowFile = session.create(); + + byte[] outputMsgBytes = msg.getBytes(); + if (flowFile != null) { + try { + flowFile = session.write(flowFile, (OutputStream out) -> out.write(outputMsgBytes)); + + // Transfer data to flow file + session.transfer(flowFile, SUCCESS); + } catch (ProcessException ex) { + getLogger().error("Unable to process", ex); + session.transfer(flowFile, FAILURE); + } + } + } + } + + @OnStopped + public void onStopped(final ProcessContext context) throws Exception { + + getLogger().debug("Unsubscribing from OPC Server..."); + opcUaService.unsubscribe(subscriberUid); + + } + + private List parseFile(Path filePath) throws IOException { + byte[] encoded; + encoded = Files.readAllBytes(filePath); + String fileContent = new String(encoded, Charset.defaultCharset()); + return new BufferedReader(new StringReader(fileContent)).lines().collect(Collectors.toList()); + } +} diff --git a/nifi-opcua-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-opcua-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor new file mode 100644 index 0000000..6a12827 --- /dev/null +++ b/nifi-opcua-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -0,0 +1,17 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +de.fraunhofer.fit.processors.opcua.ListOPCNodes +de.fraunhofer.fit.processors.opcua.GetOPCData +de.fraunhofer.fit.processors.opcua.SubscribeOPCNodes \ No newline at end of file diff --git a/nifi-opcua-processors/src/test/java/de/fraunhofer/fit/processors/opcua/GetOPCDataTest.java b/nifi-opcua-processors/src/test/java/de/fraunhofer/fit/processors/opcua/GetOPCDataTest.java new file mode 100644 index 0000000..c927458 --- /dev/null +++ b/nifi-opcua-processors/src/test/java/de/fraunhofer/fit/processors/opcua/GetOPCDataTest.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package de.fraunhofer.fit.processors.opcua; + +import de.fraunhofer.fit.opcua.StandardOPCUAService; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.nio.file.Paths; +import java.util.List; + + +public class GetOPCDataTest { + + private TestRunner testRunner; + private final String endpoint = "opc.tcp://10.223.104.20:48010"; + private StandardOPCUAService service; + + @Before + public void init() throws InitializationException { + testRunner = TestRunners.newTestRunner(GetOPCData.class); + service = new StandardOPCUAService(); + testRunner.addControllerService("controller", service); + + testRunner.setProperty(service, StandardOPCUAService.ENDPOINT, endpoint); + testRunner.assertValid(service); + + testRunner.enableControllerService(service); + } + + @Test + public void testProcessor() { + + String tagFilePath = (new File("src\\test\\resources\\tags.txt")).getAbsolutePath(); + + testRunner.setProperty(GetOPCData.OPCUA_SERVICE, "controller"); + testRunner.setProperty(GetOPCData.RETURN_TIMESTAMP, "Both"); + testRunner.setProperty(GetOPCData.EXCLUDE_NULL_VALUE, "Yes"); + testRunner.setProperty(GetOPCData.TAG_LIST_SOURCE, "Local File"); + testRunner.setProperty(GetOPCData.TAG_LIST_FILE, tagFilePath); + + testRunner.run(); + + List results = testRunner.getFlowFilesForRelationship(GetOPCData.SUCCESS); + System.out.println(new String(testRunner.getContentAsByteArray(results.get(0)))); + + } + + @After + public void shutdown() { + testRunner.disableControllerService(service); + } + +} diff --git a/nifi-opcua-processors/src/test/java/de/fraunhofer/fit/processors/opcua/ListOPCNodesTest.java b/nifi-opcua-processors/src/test/java/de/fraunhofer/fit/processors/opcua/ListOPCNodesTest.java new file mode 100644 index 0000000..f685ba3 --- /dev/null +++ b/nifi-opcua-processors/src/test/java/de/fraunhofer/fit/processors/opcua/ListOPCNodesTest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package de.fraunhofer.fit.processors.opcua; + +import de.fraunhofer.fit.opcua.StandardOPCUAService; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Test; + +import java.util.List; + + +public class ListOPCNodesTest { + + private TestRunner testRunner; + private final String endpoint = "opc.tcp://10.223.104.20:48010"; + private StandardOPCUAService service; + + @Before + public void init() throws InitializationException { + testRunner = TestRunners.newTestRunner(ListOPCNodes.class); + service = new StandardOPCUAService(); + testRunner.addControllerService("controller", service); + + testRunner.setProperty(service, StandardOPCUAService.ENDPOINT, endpoint); + testRunner.assertValid(service); + + testRunner.enableControllerService(service); + } + + @Test + public void testProcessor() { + testRunner.setProperty(ListOPCNodes.OPCUA_SERVICE, "controller"); + testRunner.setProperty(ListOPCNodes.MAX_REFERENCE_PER_NODE, "10"); + testRunner.setProperty(ListOPCNodes.PRINT_INDENTATION, ""); + testRunner.setProperty(ListOPCNodes.STARTING_NODE, "ns=4;s=S71500/ET200MP-Station_2.PLC_1"); + testRunner.setProperty(ListOPCNodes.RECURSIVE_DEPTH, "4"); + testRunner.setProperty(ListOPCNodes.PRINT_NON_LEAF_NODES, "false"); + + testRunner.run(); + + List results = testRunner.getFlowFilesForRelationship(GetOPCData.SUCCESS); + System.out.println(new String(testRunner.getContentAsByteArray(results.get(0)))); + } + +} diff --git a/nifi-opcua-processors/src/test/java/de/fraunhofer/fit/processors/opcua/SubscribeOPCNodesTest.java b/nifi-opcua-processors/src/test/java/de/fraunhofer/fit/processors/opcua/SubscribeOPCNodesTest.java new file mode 100644 index 0000000..7c1ffa3 --- /dev/null +++ b/nifi-opcua-processors/src/test/java/de/fraunhofer/fit/processors/opcua/SubscribeOPCNodesTest.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package de.fraunhofer.fit.processors.opcua; + +import de.fraunhofer.fit.opcua.StandardOPCUAService; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.util.List; + + +public class SubscribeOPCNodesTest { + + private TestRunner testRunner; + private final String endpoint = "opc.tcp://10.223.104.20:48010"; + private StandardOPCUAService service; + + @Before + public void init() throws InitializationException { + testRunner = TestRunners.newTestRunner(SubscribeOPCNodes.class); + service = new StandardOPCUAService(); + testRunner.addControllerService("controller", service); + + testRunner.setProperty(service, StandardOPCUAService.ENDPOINT, endpoint); + testRunner.assertValid(service); + + testRunner.enableControllerService(service); + } + + @Test + public void testProcessor() throws Exception { + + String tagFilePath = (new File("src\\test\\resources\\subscribeTags.txt")).getAbsolutePath(); + + testRunner.setProperty(SubscribeOPCNodes.OPCUA_SERVICE, "controller"); + testRunner.setProperty(SubscribeOPCNodes.TAG_FILE_LOCATION, tagFilePath); + + testRunner.run(1, false, true); + + Thread.sleep(10000); + + testRunner.run(1, true, false); + + + List results = testRunner.getFlowFilesForRelationship(GetOPCData.SUCCESS); + results.forEach((result)->System.out.println(new String(testRunner.getContentAsByteArray(result)))); + + + } + + @After + public void shutdown() { + testRunner.disableControllerService(service); + } + +} diff --git a/nifi-opcua-processors/src/test/resources/subscribeTags.txt b/nifi-opcua-processors/src/test/resources/subscribeTags.txt new file mode 100644 index 0000000..6ec9763 --- /dev/null +++ b/nifi-opcua-processors/src/test/resources/subscribeTags.txt @@ -0,0 +1,2 @@ +ns=4;s=S71500/ET200MP-Station_2.PLC_1.Programs.MAS_Storage_unit_2_DB.c_dispatch +ns=4;s=S71500/ET200MP-Station_2.PLC_1.Programs.MAS_Storage_unit_2_DB.Magazine_Unit_4.c_dispatch \ No newline at end of file diff --git a/nifi-opcua-processors/src/test/resources/tags.txt b/nifi-opcua-processors/src/test/resources/tags.txt new file mode 100644 index 0000000..69dfed0 --- /dev/null +++ b/nifi-opcua-processors/src/test/resources/tags.txt @@ -0,0 +1,2 @@ +ns=4;s=S71500/ET200MP-Station_2.PLC_1.GlobalVars.I_MAG1_EXT +ns=4;s=S71500/ET200MP-Station_2.PLC_1.GlobalVars.I_MAG1_RET \ No newline at end of file diff --git a/nifi-opcua-service-api/pom.xml b/nifi-opcua-service-api/pom.xml new file mode 100644 index 0000000..f27e042 --- /dev/null +++ b/nifi-opcua-service-api/pom.xml @@ -0,0 +1,35 @@ + + + + 4.0.0 + + + de.fraunhofer.fit + nifi-opcua-bundle + 1.0 + + + nifi-opcua-service-api + jar + + + + org.apache.nifi + nifi-api + provided + + + diff --git a/nifi-opcua-service-api/src/main/java/de/fraunhofer/fit/opcua/OPCUAService.java b/nifi-opcua-service-api/src/main/java/de/fraunhofer/fit/opcua/OPCUAService.java new file mode 100644 index 0000000..82ffae8 --- /dev/null +++ b/nifi-opcua-service-api/src/main/java/de/fraunhofer/fit/opcua/OPCUAService.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package de.fraunhofer.fit.opcua; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.controller.ControllerService; +import org.apache.nifi.processor.exception.ProcessException; + +import java.util.List; +import java.util.concurrent.BlockingQueue; + +@Tags({"example"}) +@CapabilityDescription("Example Service API.") +public interface OPCUAService extends ControllerService { + + byte[] getValue(List reqTagNames, String returnTimestamp, boolean excludeNullValue, + String nullValueString) throws ProcessException; + + byte[] getNodes(String printIndent, int maxRecursiveDepth, int maxReferencePerNode, + boolean printNonLeafNode, String rootNodeId) + throws ProcessException; + + String subscribe(List reqTagNames, BlockingQueue queue) throws ProcessException; + + void unsubscribe(String subscriberUid) throws ProcessException; +} diff --git a/nifi-opcua-service/pom.xml b/nifi-opcua-service/pom.xml new file mode 100644 index 0000000..96a4afd --- /dev/null +++ b/nifi-opcua-service/pom.xml @@ -0,0 +1,64 @@ + + + + 4.0.0 + + + de.fraunhofer.fit + nifi-opcua-bundle + 1.0 + + + nifi-opcua-service + jar + + + + de.fraunhofer.fit + nifi-opcua-service-api + 1.0 + + + org.apache.nifi + nifi-api + provided + + + org.apache.nifi + nifi-processor-utils + + + org.eclipse.milo + sdk-client + 0.2.1 + + + org.apache.nifi + nifi-mock + test + + + org.slf4j + slf4j-simple + test + + + junit + junit + test + + + diff --git a/nifi-opcua-service/src/main/java/de/fraunhofer/fit/opcua/StandardOPCUAService.java b/nifi-opcua-service/src/main/java/de/fraunhofer/fit/opcua/StandardOPCUAService.java new file mode 100644 index 0000000..c7a71db --- /dev/null +++ b/nifi-opcua-service/src/main/java/de/fraunhofer/fit/opcua/StandardOPCUAService.java @@ -0,0 +1,419 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package de.fraunhofer.fit.opcua; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.reporting.InitializationException; +import org.eclipse.milo.opcua.sdk.client.OpcUaClient; +import org.eclipse.milo.opcua.sdk.client.api.config.OpcUaClientConfigBuilder; +import org.eclipse.milo.opcua.sdk.client.api.nodes.Node; +import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaMonitoredItem; +import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaSubscription; +import org.eclipse.milo.opcua.stack.client.UaTcpStackClient; +import org.eclipse.milo.opcua.stack.core.AttributeId; +import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue; +import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId; +import org.eclipse.milo.opcua.stack.core.types.builtin.QualifiedName; +import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger; +import org.eclipse.milo.opcua.stack.core.types.enumerated.MonitoringMode; +import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn; +import org.eclipse.milo.opcua.stack.core.types.structured.EndpointDescription; +import org.eclipse.milo.opcua.stack.core.types.structured.MonitoredItemCreateRequest; +import org.eclipse.milo.opcua.stack.core.types.structured.MonitoringParameters; +import org.eclipse.milo.opcua.stack.core.types.structured.ReadValueId; + +import java.util.*; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiConsumer; + +import static org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned.uint; + +@Tags({"opc"}) +@CapabilityDescription("ControllerService implementation of OPCUAService.") +public class StandardOPCUAService extends AbstractControllerService implements OPCUAService { + + public static final PropertyDescriptor ENDPOINT = new PropertyDescriptor + .Builder().name("Endpoint URL") + .description("the opc.tcp address of the opc ua server") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor SECURITY_POLICY = new PropertyDescriptor + .Builder().name("Security Policy") + .description("How should Nifi create the connection with the UA server") + .required(true) + .allowableValues("None", "Basic128Rsa15", "Basic256", "Basic256Rsa256") + .defaultValue("None") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + private static final List properties; + + private final String SECURITY_POLICY_PREFIX = "http://opcfoundation.org/UA/SecurityPolicy#"; + + private String endpoint; + private String securityPolicy; + private OpcUaClient opcClient; + private UaSubscription uaSubscription; + private Map> subscriberMap; + + private final AtomicLong clientHandles = new AtomicLong(1L); + + static { + final List props = new ArrayList<>(); + props.add(ENDPOINT); + props.add(SECURITY_POLICY); + properties = Collections.unmodifiableList(props); + } + + @Override + protected List getSupportedPropertyDescriptors() { + return properties; + } + + /** + * @param context the configuration context + * @throws InitializationException if unable to create a database connection + */ + @OnEnabled + public void onEnabled(final ConfigurationContext context) throws InitializationException { + + endpoint = context.getProperty(ENDPOINT).getValue(); + securityPolicy = SECURITY_POLICY_PREFIX + context.getProperty(SECURITY_POLICY).getValue(); + + try { + EndpointDescription[] endpoints = + UaTcpStackClient.getEndpoints(endpoint).get(); + + EndpointDescription endpointDescription = null; + for (EndpointDescription ed : endpoints) { + + if (ed.getSecurityPolicyUri().equals(securityPolicy)) { + endpointDescription = ed; + getLogger().debug("Connecting to endpoint " + ed.getEndpointUrl() + + " with security policy " + ed.getSecurityPolicyUri()); + } + } + + if (endpointDescription == null) { + getLogger().error("No endpoint with the specified security policy was found."); + throw new RuntimeException("No endpoint with the specified security policy was found."); + } + + OpcUaClientConfigBuilder cfg = new OpcUaClientConfigBuilder(); + cfg.setEndpoint(endpoints[0]); + opcClient = new OpcUaClient(cfg.build()); + opcClient.connect().get(); + + + } catch (Exception e) { + throw new InitializationException(e); + } + + + } + + @OnDisabled + public void shutdown() { + try { + if (opcClient != null) { + getLogger().debug("Disconnecting from OPC server..."); + opcClient.disconnect().get(); + } + } catch (InterruptedException | ExecutionException e) { + getLogger().warn(e.getMessage()); + } + } + + + @Override + public byte[] getValue(List tagNames, String returnTimestamp, boolean excludeNullValue, + String nullValueString) throws ProcessException { + try { + if (opcClient == null) { + throw new Exception("OPC Client is null. OPC UA service was not enabled properly."); + } + + // TODO: Throw more descriptive exception when parsing fails + ArrayList nodeIdList = new ArrayList<>(); + tagNames.forEach((tagName) -> nodeIdList.add(NodeId.parse(tagName))); + + + List rvList = opcClient.readValues(0, TimestampsToReturn.Both, nodeIdList).get(); + + StringBuilder serverResponse = new StringBuilder(); + + for (int i = 0; i < tagNames.size(); i++) { + String valueLine = ""; + try { + if (excludeNullValue && rvList.get(i).getValue().getValue().toString().equals(nullValueString)) { + getLogger().debug("Null value returned for " + rvList.get(i).getValue().getValue().toString() + + " -- Skipping because property is set"); + continue; + } + + valueLine = writeCsv(tagNames.get(i), returnTimestamp, rvList.get(i)); + + } catch (Exception ex) { + getLogger().error("Error parsing result for " + tagNames.get(i)); + valueLine = ""; + } + if (valueLine.isEmpty()) + continue; + + serverResponse.append(valueLine); + + } + + return serverResponse.toString().trim().getBytes(); + + } catch (Exception e) { + throw new ProcessException(e); + } + + } + + + private String writeCsv(String tagName, String returnTimestamp, DataValue value) { + + // TODO: maybe use StringBuilder for better performance + String valueLine = ""; + + valueLine += tagName + ','; + + if (("ServerTimestamp").equals(returnTimestamp) || ("Both").equals(returnTimestamp)) { + valueLine += value.getServerTime().getJavaTime() + ","; + } + if (("SourceTimestamp").equals(returnTimestamp) || ("Both").equals(returnTimestamp)) { + valueLine += value.getSourceTime().getJavaTime() + ","; + } + + valueLine += value.getValue().getValue().toString() + "," + + value.getStatusCode().getValue() + + System.getProperty("line.separator"); + + return valueLine; + } + + + + + @Override + public byte[] getNodes(String indentString, int maxRecursiveDepth, int maxReferencePerNode, + boolean printNonLeafNode, String rootNodeId) + throws ProcessException { + + try { + if (opcClient == null) { + throw new Exception("OPC Client is null. OPC UA service was not enabled properly."); + } + + StringBuilder builder = new StringBuilder(); + browseNodeIteratively("", indentString, maxRecursiveDepth, maxReferencePerNode, printNonLeafNode, + opcClient, NodeId.parse(rootNodeId), builder); + + return builder.toString().getBytes(); + + } catch (Exception e) { + throw new ProcessException(e.getMessage()); + } + + } + + @Override + public String subscribe(List tagNames, BlockingQueue queue) throws ProcessException { + + if (subscriberMap == null) { + subscriberMap = new HashMap<>(); + } + + try { + if (opcClient == null) { + throw new Exception("OPC Client is null. OPC UA service was not enabled properly."); + } + + uaSubscription = opcClient.getSubscriptionManager().createSubscription(1000.0).get(); + + // Create a list of MonitoredItemCreateRequest + ArrayList micrList = new ArrayList<>(); + tagNames.forEach((tagName) -> { + + ReadValueId readValueId = new ReadValueId( + NodeId.parse(tagName), + AttributeId.Value.uid(), null, QualifiedName.NULL_VALUE); + + Long clientHandleLong = clientHandles.getAndIncrement(); + UInteger clientHandle = uint(clientHandleLong); + + MonitoringParameters parameters = new MonitoringParameters( + clientHandle, + 1000.0, // sampling interval + null, // filter, null means use default + uint(10), // queue size + true // discard oldest + ); + + micrList.add(new MonitoredItemCreateRequest( + readValueId, MonitoringMode.Reporting, parameters)); + + }); + + // This is the callback when the MonitoredItem is created. In this callback, we set the consumer for incoming values + BiConsumer onItemCreated = + (item, id) -> item.setValueConsumer((it, value) -> { + getLogger().debug("subscription value received: item=" + it.getReadValueId().getNodeId() + + " value=" + value.getValue()); + String valueLine = writeCsv(getFullName(it.getReadValueId().getNodeId()), + "Both", value); + + queue.offer(valueLine); + }); + + List items = uaSubscription.createMonitoredItems( + TimestampsToReturn.Both, + micrList, + onItemCreated + ).get(); + + for (UaMonitoredItem item : items) { + if (item.getStatusCode().isGood()) { + getLogger().debug("item created for nodeId=" + item.getReadValueId().getNodeId()); + } else { + getLogger().error("failed to create item for nodeId=" + item.getReadValueId().getNodeId() + + " (status=" + item.getStatusCode() + ")"); + } + } + + String subscriberUid; + do { + subscriberUid = generateRandomChars(10); + } while (subscriberMap.containsKey(subscriberUid)); + + subscriberMap.put(subscriberUid, items); + + return subscriberUid; + + } catch (Exception e) { + throw new ProcessException(e.getMessage()); + } + } + + @Override + public void unsubscribe(String subscriberUid) throws ProcessException { + + if (opcClient == null) { + throw new ProcessException("OPC Client is null. OPC UA service was not enabled properly."); + } + + List listToDelete; + if ((listToDelete = subscriberMap.get(subscriberUid)) != null) { + try { + uaSubscription.deleteMonitoredItems(listToDelete).get(); + } catch (InterruptedException | ExecutionException e) { + e.printStackTrace(); + } + } + + // TODO: maybe return boolean as result? + } + + // remainDepth = 0 means only print out the current node + // StringBuilder is passed into the recursive method to reduce generating strings and improve performance + private void browseNodeIteratively(String currentIndent, String indentString, int remainDepth, int maxRefPerNode, + boolean printNonLeafNode, OpcUaClient client, NodeId browseRoot, StringBuilder builder) { + + //getLogger().info(indent + " Node=" + node.getNodeId().get().getIdentifier().toString()); + + try { + List nodes = client.getAddressSpace().browse(browseRoot).get(); + + if (printNonLeafNode || nodes.size() == 0) { + builder.append(currentIndent) + .append(getFullName(browseRoot)) + .append("\n"); + } + + if (remainDepth > 0) { + + String newIndent = currentIndent + indentString; + remainDepth--; + + int currNodeCount = 0; + + for (Node node : nodes) { + if (currNodeCount == maxRefPerNode) + break; + + // recursively browse to children + browseNodeIteratively(newIndent, indentString, remainDepth, maxRefPerNode, printNonLeafNode, + client, node.getNodeId().get(), builder); + + currNodeCount++; + } + } + + } catch (InterruptedException | ExecutionException e) { + //getLogger().error("Browsing nodeId=" + browseRoot + " failed: " + e.getMessage()); + } + + } + + private String getFullName(NodeId nodeId) { + + String identifierType; + + switch (nodeId.getType()) { + case Numeric: + identifierType = "i"; + break; + case Opaque: + identifierType = "b"; + break; + case Guid: + identifierType = "g"; + break; + default: + identifierType = "s"; + } + + return String.format("ns=%s;%s=%s", nodeId.getNamespaceIndex().toString(), + identifierType, nodeId.getIdentifier().toString()); + } + + private String generateRandomChars(int length) { + String candidateChars = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"; + StringBuilder sb = new StringBuilder(); + Random random = new Random(); + for (int i = 0; i < length; i++) { + sb.append(candidateChars.charAt(random.nextInt(candidateChars + .length()))); + } + return sb.toString(); + } + +} diff --git a/nifi-opcua-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-opcua-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService new file mode 100644 index 0000000..f1bf56b --- /dev/null +++ b/nifi-opcua-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService @@ -0,0 +1,15 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +de.fraunhofer.fit.opcua.StandardOPCUAService \ No newline at end of file diff --git a/nifi-opcua-service/src/test/java/de/fraunhofer/fit/opcua/TestProcessor.java b/nifi-opcua-service/src/test/java/de/fraunhofer/fit/opcua/TestProcessor.java new file mode 100644 index 0000000..7936342 --- /dev/null +++ b/nifi-opcua-service/src/test/java/de/fraunhofer/fit/opcua/TestProcessor.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package de.fraunhofer.fit.opcua; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; + +public class TestProcessor extends AbstractProcessor { + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + } + + @Override + protected List getSupportedPropertyDescriptors() { + List propDescs = new ArrayList<>(); + propDescs.add(new PropertyDescriptor.Builder() + .name("OPCUAService test processor") + .description("OPCUAService test processor") + .identifiesControllerService(OPCUAService.class) + .required(true) + .build()); + return propDescs; + } +} diff --git a/nifi-opcua-service/src/test/java/de/fraunhofer/fit/opcua/TestStandardOPCUAService.java b/nifi-opcua-service/src/test/java/de/fraunhofer/fit/opcua/TestStandardOPCUAService.java new file mode 100644 index 0000000..25419ef --- /dev/null +++ b/nifi-opcua-service/src/test/java/de/fraunhofer/fit/opcua/TestStandardOPCUAService.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package de.fraunhofer.fit.opcua; + +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class TestStandardOPCUAService { + + private final String endpoint = "opc.tcp://10.223.104.20:48010"; + + @Before + public void init() { + + } + + @Test + public void testServiceInitialization() throws InitializationException { + final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class); + final StandardOPCUAService service = new StandardOPCUAService(); + runner.addControllerService("test-good", service); + + runner.setProperty(service, StandardOPCUAService.ENDPOINT, endpoint); + runner.assertValid(service); + + runner.enableControllerService(service); + + runner.disableControllerService(service); + + } + + @Test + public void testServiceGetNodes() throws InitializationException { + final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class); + final StandardOPCUAService service = new StandardOPCUAService(); + runner.addControllerService("test-good", service); + + runner.setProperty(service, StandardOPCUAService.ENDPOINT, endpoint); + runner.assertValid(service); + + runner.enableControllerService(service); + + System.out.println(service.getNodes("--", 3, 10, false, + "ns=4;s=S71500/ET200MP-Station_2.PLC_1.GlobalVars")); + + runner.disableControllerService(service); + } + + @Test + public void testServiceGetValues() throws InitializationException { + final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class); + final StandardOPCUAService service = new StandardOPCUAService(); + runner.addControllerService("test-good", service); + + runner.setProperty(service, StandardOPCUAService.ENDPOINT, endpoint); + runner.assertValid(service); + + runner.enableControllerService(service); + + List tagList = Arrays.asList("ns=4;s=S71500/ET200MP-Station_2.PLC_1.GlobalVars.I_MAG1_EXT", + "ns=4;s=S71500/ET200MP-Station_2.PLC_1.GlobalVars.I_MAG2_EXT"); + + byte[] bytes = service.getValue(tagList, "Both", true, ""); + System.out.println (new String(bytes)); + + runner.disableControllerService(service); + } + +} diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..d3f9896 --- /dev/null +++ b/pom.xml @@ -0,0 +1,37 @@ + + + + 4.0.0 + + + org.apache.nifi + nifi-nar-bundles + 1.3.0 + + + de.fraunhofer.fit + nifi-opcua-bundle + 1.0 + pom + + + nifi-opcua-processors + nifi-opcua-service + nifi-opcua-service-api + nifi-opcua-nar + + +