From e4cb5a36159165c6c0aea2150b2c251519ecb11f Mon Sep 17 00:00:00 2001 From: POliveNokia Date: Sat, 14 Dec 2024 14:18:53 +0000 Subject: [PATCH] NIFI-14013 Add disconnected Relationship to ConnectWebSocket (#9533) Signed-off-by: David Handermann --- .../AbstractWebSocketGatewayProcessor.java | 17 ++++++ .../websocket/TestConnectWebSocket.java | 53 ++++++++++++++++++- .../nifi/websocket/ConnectedListener.java | 1 + .../WebSocketDisconnectedMessage.java | 23 ++++++++ .../websocket/WebSocketMessageRouter.java | 3 ++ 5 files changed, 96 insertions(+), 1 deletion(-) create mode 100644 nifi-extension-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketDisconnectedMessage.java diff --git a/nifi-extension-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/AbstractWebSocketGatewayProcessor.java b/nifi-extension-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/AbstractWebSocketGatewayProcessor.java index 784745465c28..55921de2b391 100644 --- a/nifi-extension-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/AbstractWebSocketGatewayProcessor.java +++ b/nifi-extension-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/main/java/org/apache/nifi/processors/websocket/AbstractWebSocketGatewayProcessor.java @@ -35,6 +35,7 @@ import org.apache.nifi.websocket.WebSocketClientService; import org.apache.nifi.websocket.WebSocketConfigurationException; import org.apache.nifi.websocket.WebSocketConnectedMessage; +import org.apache.nifi.websocket.WebSocketDisconnectedMessage; import org.apache.nifi.websocket.WebSocketMessage; import org.apache.nifi.websocket.WebSocketService; import org.apache.nifi.websocket.WebSocketSessionInfo; @@ -67,6 +68,12 @@ public abstract class AbstractWebSocketGatewayProcessor extends AbstractSessionF .description("The WebSocket session is established") .build(); + public static final Relationship REL_DISCONNECTED = new Relationship.Builder() + .name("disconnected") + .description("The WebSocket session is disconnected") + .autoTerminateDefault(true) + .build(); + public static final Relationship REL_MESSAGE_TEXT = new Relationship.Builder() .name("text message") .description("The WebSocket text message output") @@ -92,6 +99,7 @@ public abstract class AbstractWebSocketGatewayProcessor extends AbstractSessionF static Set getAbstractRelationships() { final Set relationships = new HashSet<>(); relationships.add(REL_CONNECTED); + relationships.add(REL_DISCONNECTED); relationships.add(REL_MESSAGE_TEXT); relationships.add(REL_MESSAGE_BINARY); return relationships; @@ -114,6 +122,13 @@ public void connected(WebSocketSessionInfo sessionInfo) { enqueueMessage(message); } + @Override + public void disconnected(WebSocketSessionInfo sessionInfo) { + final WebSocketMessage message = new WebSocketDisconnectedMessage(sessionInfo); + sessionInfo.setTransitUri(getTransitUri(sessionInfo)); + enqueueMessage(message); + } + @Override public void consume(WebSocketSessionInfo sessionInfo, String messageStr) { final WebSocketMessage message = new WebSocketMessage(sessionInfo); @@ -252,6 +267,8 @@ private void enqueueMessage(final WebSocketMessage incomingMessage) { if (incomingMessage instanceof WebSocketConnectedMessage) { session.transfer(messageFlowFile, REL_CONNECTED); + } else if (incomingMessage instanceof WebSocketDisconnectedMessage) { + session.transfer(messageFlowFile, REL_DISCONNECTED); } else { switch (Objects.requireNonNull(messageType)) { case TEXT: diff --git a/nifi-extension-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/test/java/org/apache/nifi/processors/websocket/TestConnectWebSocket.java b/nifi-extension-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/test/java/org/apache/nifi/processors/websocket/TestConnectWebSocket.java index 3cc237c6147c..170baa7082a8 100644 --- a/nifi-extension-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/test/java/org/apache/nifi/processors/websocket/TestConnectWebSocket.java +++ b/nifi-extension-bundles/nifi-websocket-bundle/nifi-websocket-processors/src/test/java/org/apache/nifi/processors/websocket/TestConnectWebSocket.java @@ -91,6 +91,7 @@ public void testSuccess() throws Exception { processor.consume(webSocketSession, binaryMessage, 0, binaryMessage.length); processor.consume(webSocketSession, binaryMessage, 0, binaryMessage.length); processor.consume(webSocketSession, binaryMessage, 0, binaryMessage.length); + processor.disconnected(webSocketSession); return null; }).when(service).connect(endpointId); runner.addControllerService(serviceId, service); @@ -119,7 +120,7 @@ public void testSuccess() throws Exception { binaryFlowFiles.forEach(ff -> assertFlowFile(webSocketSession, serviceId, endpointId, ff, WebSocketMessage.Type.BINARY)); final List provenanceEvents = sharedSessionState.getProvenanceEvents(); - assertEquals(6, provenanceEvents.size()); + assertEquals(7, provenanceEvents.size()); assertTrue(provenanceEvents.stream().allMatch(event -> ProvenanceEventType.RECEIVE.equals(event.getEventType()))); } @@ -201,6 +202,56 @@ void testDynamicUrlsParsedFromFlowFileButNotAbleToConnect() throws Initializatio runner.stop(); } + @Test + void testDynamicUrlsParsedFromFlowFileAndAbleToConnectAndDisconnect() throws InitializationException { + // Start websocket server + final TestRunner webSocketListener = TestRunners.newTestRunner(ListenWebSocket.class); + + final String serverId = "ws-server-service"; + JettyWebSocketServer server = new JettyWebSocketServer(); + webSocketListener.addControllerService(serverId, server); + webSocketListener.setProperty(server, JettyWebSocketServer.LISTEN_PORT, "0"); + webSocketListener.enableControllerService(server); + + webSocketListener.setProperty(ListenWebSocket.PROP_WEBSOCKET_SERVER_SERVICE, serverId); + webSocketListener.setProperty(ListenWebSocket.PROP_SERVER_URL_PATH, "/test"); + + webSocketListener.run(1, false); + final int listeningPort = server.getListeningPort(); + + final TestRunner runner = TestRunners.newTestRunner(ConnectWebSocket.class); + + final String clientId = "ws-service"; + final String endpointId = "client-1"; + + MockFlowFile flowFile = getFlowFile(); + runner.enqueue(flowFile); + + JettyWebSocketClient client = new JettyWebSocketClient(); + + + runner.addControllerService(clientId, client); + runner.setProperty(client, JettyWebSocketClient.WS_URI, String.format("ws://localhost:%s/${dynamicUrlPart}", listeningPort)); + runner.enableControllerService(client); + + runner.setProperty(ConnectWebSocket.PROP_WEBSOCKET_CLIENT_SERVICE, clientId); + runner.setProperty(ConnectWebSocket.PROP_WEBSOCKET_CLIENT_ID, endpointId); + + runner.run(1, false); + + final List flowFilesForConnectedRelationship = runner.getFlowFilesForRelationship(ConnectWebSocket.REL_CONNECTED); + assertEquals(1, flowFilesForConnectedRelationship.size()); + + final List flowFilesForSuccess = runner.getFlowFilesForRelationship(ConnectWebSocket.REL_SUCCESS); + assertEquals(1, flowFilesForSuccess.size()); + + webSocketListener.disableControllerService(server); + + final List flowFilesForDisconnectedRelationship = runner.getFlowFilesForRelationship(ConnectWebSocket.REL_DISCONNECTED); + assertEquals(1, flowFilesForDisconnectedRelationship.size()); + + runner.stop(); + } private MockFlowFile getFlowFile() { Map attributes = new HashMap<>(); diff --git a/nifi-extension-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/ConnectedListener.java b/nifi-extension-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/ConnectedListener.java index b682094da6f4..124606c3731b 100644 --- a/nifi-extension-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/ConnectedListener.java +++ b/nifi-extension-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/ConnectedListener.java @@ -21,4 +21,5 @@ */ public interface ConnectedListener { void connected(final WebSocketSessionInfo sessionInfo); + void disconnected(final WebSocketSessionInfo sessionInfo); } diff --git a/nifi-extension-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketDisconnectedMessage.java b/nifi-extension-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketDisconnectedMessage.java new file mode 100644 index 000000000000..00abf1cd9ad1 --- /dev/null +++ b/nifi-extension-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketDisconnectedMessage.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.websocket; + +public class WebSocketDisconnectedMessage extends WebSocketMessage { + public WebSocketDisconnectedMessage(final WebSocketSessionInfo sessionInfo) { + super(sessionInfo); + } +} diff --git a/nifi-extension-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessageRouter.java b/nifi-extension-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessageRouter.java index 79171de88ce2..bcce446cd150 100644 --- a/nifi-extension-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessageRouter.java +++ b/nifi-extension-bundles/nifi-websocket-bundle/nifi-websocket-services-api/src/main/java/org/apache/nifi/websocket/WebSocketMessageRouter.java @@ -77,6 +77,9 @@ public void captureSession(final WebSocketSession session) { } public void onWebSocketClose(final String sessionId, final int statusCode, final String reason) { + if (processor instanceof ConnectedListener connectedListener) { + connectedListener.disconnected(getSessionOrFail(sessionId)); + } sessions.remove(sessionId); }