From bbea42c2386385fe2e66820f5ddca85e887a1b86 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Sat, 6 Dec 2025 18:56:54 +0000 Subject: [PATCH 1/2] Fix Modbus master connection recovery after network interruption - Fix ensure_connection() to check BOTH is_connected flag AND client.connected - Add mark_disconnected() method to properly signal connection errors - Close dead sockets before reconnecting to avoid broken pipe errors - Update all error handlers to use mark_disconnected() instead of directly setting is_connected = False This ensures the connection is properly torn down and re-established when any communication error occurs (timeout, broken pipe, ModbusIOException, etc.) Co-Authored-By: Thiago Alves --- .../modbus_master/modbus_master_connection.py | 56 ++++++++++++++----- .../modbus_master/modbus_master_plugin.py | 20 +++---- 2 files changed, 52 insertions(+), 24 deletions(-) diff --git a/core/src/drivers/plugins/python/modbus_master/modbus_master_connection.py b/core/src/drivers/plugins/python/modbus_master/modbus_master_connection.py index 02c746e8..e9c81a5d 100644 --- a/core/src/drivers/plugins/python/modbus_master/modbus_master_connection.py +++ b/core/src/drivers/plugins/python/modbus_master/modbus_master_connection.py @@ -2,11 +2,11 @@ import time from typing import Optional + from pymodbus.client import ModbusTcpClient -from pymodbus.exceptions import ConnectionException -class ModbusConnectionManager: +class ModbusConnectionManager: # pylint: disable=too-many-instance-attributes """Manages Modbus TCP connections with retry logic.""" def __init__(self, host: str, port: int, timeout_ms: int): @@ -15,11 +15,12 @@ def __init__(self, host: str, port: int, timeout_ms: int): self.timeout = timeout_ms / 1000.0 # Convert to seconds # Retry configuration - self.retry_delay_base = 2.0 # initial delay between attempts (seconds) - self.retry_delay_max = 30.0 # maximum delay between attempts (seconds) + self.retry_delay_base = 2.0 # initial delay between attempts (seconds) + self.retry_delay_max = 30.0 # maximum delay between attempts (seconds) self.retry_delay_current = self.retry_delay_base - # Connection state + # Connection state - is_connected is the authoritative flag for connection health + # It is set to False when any error occurs, forcing reconnection on next cycle self.client: Optional[ModbusTcpClient] = None self.is_connected = False @@ -42,17 +43,17 @@ def connect_with_retry(self, stop_event=None) -> bool: if self.client: try: self.client.close() - except: + except Exception: pass self.client = ModbusTcpClient( - host=self.host, - port=self.port, - timeout=self.timeout + host=self.host, port=self.port, timeout=self.timeout ) # Attempt to connect if self.client.connect(): - print(f"(PASS) Connected to {self.host}:{self.port} (attempt {retry_count + 1})") + print( + f"(PASS) Connected to {self.host}:{self.port} (attempt {retry_count + 1})" + ) self.is_connected = True self.retry_delay_current = self.retry_delay_base # Reset delay return True @@ -88,22 +89,49 @@ def ensure_connection(self, stop_event=None) -> bool: """ Ensures there is a valid connection, reconnecting if necessary. + This method checks BOTH the pymodbus client state AND our is_connected flag. + The is_connected flag is set to False when any communication error occurs, + which forces a full reconnection even if pymodbus thinks the socket is still open. + Args: stop_event: Optional threading.Event to allow early termination Returns: True if connection is available, False if interrupted """ - # Check if already connected - if self.client and self.client.connected: + # Check if already connected - must check BOTH conditions: + # 1. is_connected flag (set to False on any error) + # 2. client.connected (pymodbus internal socket state) + if self.is_connected and self.client and self.client.connected: return True - # Mark as disconnected + # Connection is broken or marked as unhealthy - force full reconnection + # First, close any existing client to clean up dead sockets + if self.client: + try: + self.client.close() + except Exception: + pass # Ignore errors during cleanup + self.client = None + self.is_connected = False - # Try to reconnect + # Try to reconnect with infinite retry return self.connect_with_retry(stop_event) + def mark_disconnected(self): + """ + Mark the connection as broken, forcing reconnection on next ensure_connection() call. + + This should be called when any communication error occurs (timeout, broken pipe, + ModbusIOException, etc.) to ensure the connection is properly re-established. + """ + self.is_connected = False + print( + f"Connection to {self.host}:{self.port} marked as disconnected, " + "will reconnect on next cycle" + ) + def disconnect(self): """Close the connection and clean up resources.""" try: diff --git a/core/src/drivers/plugins/python/modbus_master/modbus_master_plugin.py b/core/src/drivers/plugins/python/modbus_master/modbus_master_plugin.py index 5186f6c8..2c24c457 100644 --- a/core/src/drivers/plugins/python/modbus_master/modbus_master_plugin.py +++ b/core/src/drivers/plugins/python/modbus_master/modbus_master_plugin.py @@ -172,7 +172,7 @@ def run(self): # pylint: disable=too-many-locals f"(FC {point.fc}, addr {address}): {response}" ) # Mark as disconnected to force reconnection on next cycle - self.connection_manager.is_connected = False + self.connection_manager.mark_disconnected() continue if response.isError(): print( @@ -180,7 +180,7 @@ def run(self): # pylint: disable=too-many-locals f"(FC {point.fc}, addr {address}): {response}" ) # Mark as disconnected to force reconnection on next cycle - self.connection_manager.is_connected = False + self.connection_manager.mark_disconnected() continue # Extract data from response @@ -205,14 +205,14 @@ def run(self): # pylint: disable=too-many-locals f"FC {point.fc}, offset {point.offset}: {ce}" ) # Mark as disconnected to force reconnection - self.connection_manager.is_connected = False + self.connection_manager.mark_disconnected() except Exception as e: print( f"[{self.name}] (FAIL) Error reading " f"FC {point.fc}, offset {point.offset}: {e}" ) # For other errors also mark disconnected as precaution - self.connection_manager.is_connected = False + self.connection_manager.mark_disconnected() # Batch update IEC buffers with single mutex acquisition if read_results_to_update: @@ -316,14 +316,14 @@ def run(self): # pylint: disable=too-many-locals f"(FC {point.fc}, addr {address}): {response}" ) # Mark as disconnected to force reconnection on next cycle - self.connection_manager.is_connected = False + self.connection_manager.mark_disconnected() elif response.isError(): print( f"[{self.name}] (FAIL) Modbus write failed " f"(FC {point.fc}, addr {address}): {response}" ) # Mark as disconnected to force reconnection on next cycle - self.connection_manager.is_connected = False + self.connection_manager.mark_disconnected() except ValueError as ve: print( @@ -336,14 +336,14 @@ def run(self): # pylint: disable=too-many-locals f"FC {point.fc}, offset {point.offset}: {ce}" ) # Mark as disconnected to force reconnection - self.connection_manager.is_connected = False + self.connection_manager.mark_disconnected() except Exception as e: print( f"[{self.name}] (FAIL) Error writing " f"FC {point.fc}, offset {point.offset}: {e}" ) # For other errors also mark disconnected as precaution - self.connection_manager.is_connected = False + self.connection_manager.mark_disconnected() # 3. CYCLE TIMING - Sleep for GCD cycle time cycle_elapsed = time.monotonic() - cycle_start_time @@ -363,8 +363,8 @@ def run(self): # pylint: disable=too-many-locals except ConnectionException as ce: print(f"[{self.name}] (FAIL) Connection failed: {ce}") - # Try to reconnect - self.connection_manager.is_connected = False + # Mark as disconnected to force reconnection + self.connection_manager.mark_disconnected() except Exception as e: print(f"[{self.name}] (FAIL) Unexpected error in thread: {e}") traceback.print_exc() From 5a75438d8aa1bb36e6b5658083d5dc44df3633b5 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Sun, 7 Dec 2025 05:20:26 +0000 Subject: [PATCH 2/2] Improve Modbus slave plugin error recovery and reduce logging - Add automatic restart logic with exponential backoff (never give up) - Fix cross-thread shutdown: call ServerStop() directly instead of via asyncio.run() - Add proper startup success detection using threading.Event - Remove excessive per-request logging (keep only errors and lifecycle messages) - Use ModbusTcpServer with serve_forever(background=True) for reliable bind detection - Add double-start protection and improved shutdown timeout handling - Add pylint disable comments for pre-existing code style issues Co-Authored-By: Thiago Alves --- .../python/modbus_slave/simple_modbus.py | 506 ++++++++++-------- 1 file changed, 290 insertions(+), 216 deletions(-) diff --git a/core/src/drivers/plugins/python/modbus_slave/simple_modbus.py b/core/src/drivers/plugins/python/modbus_slave/simple_modbus.py index 8f9e2585..de139456 100644 --- a/core/src/drivers/plugins/python/modbus_slave/simple_modbus.py +++ b/core/src/drivers/plugins/python/modbus_slave/simple_modbus.py @@ -1,343 +1,381 @@ +# pylint: disable=C0103,C0301,C0413,W0107,W0602,W0621,C0415 +# C0103: Method/variable naming (getValues/setValues required by pymodbus API) +# C0301: Line too long (some lines exceed 100 chars) +# C0413: Import position (shared module import must be after sys.path modification) +# W0107: Unnecessary pass (used for read-only setValues methods) +# W0602: Global variable not assigned (threading.Event uses methods, not reassignment) +# W0621: Redefining name from outer scope (runtime_args parameter shadows global) +# C0415: Import outside toplevel (traceback imported in exception handlers) + import asyncio -import ctypes -from operator import add -import threading -import time -import sys import os -import json -from pymodbus.server import StartAsyncTcpServer, ServerStop +import sys +import threading + from pymodbus.datastore import ( - ModbusSparseDataBlock, ModbusDeviceContext, ModbusServerContext, + ModbusSparseDataBlock, ) +from pymodbus.server import ServerStop +from pymodbus.server.server import ModbusTcpServer MAX_BITS = 8 # Add the parent directory to Python path to find shared module -sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) -# Import the correct type definitions -from shared import ( - PluginRuntimeArgs, - safe_extract_runtime_args_from_capsule, +# Import the correct type definitions (must be after sys.path modification) +from shared import ( # noqa: E402 SafeBufferAccess, - PluginStructureValidator + safe_extract_runtime_args_from_capsule, ) + class OpenPLCCoilsDataBlock(ModbusSparseDataBlock): """Custom Modbus coils data block that mirrors OpenPLC bool_output using SafeBufferAccess""" - + def __init__(self, runtime_args, num_coils=64): self.runtime_args = runtime_args self.num_coils = num_coils - + # Create safe buffer access wrapper self.safe_buffer_access = SafeBufferAccess(runtime_args) if not self.safe_buffer_access.is_valid: - print(f"[MODBUS] Warning: Failed to create safe buffer access for coils: {self.safe_buffer_access.error_msg}") - + print( + f"[MODBUS] Warning: Failed to create safe buffer access for coils: {self.safe_buffer_access.error_msg}" + ) + # Initialize with zeros super().__init__([0] * num_coils) - + def getValues(self, address, count=1): + """Get coil values from OpenPLC bool_output using SafeBufferAccess""" address = address - 1 # Modbus addresses are 0-based - """Get coil values from OpenPLC bool_output using SafeBufferAccess""" - print(f"[MODBUS] Coils getValues called: address={address}, count={count}") - if not self.safe_buffer_access.is_valid: - print(f"[MODBUS] Error: Safe buffer access not valid: {self.safe_buffer_access.error_msg}") + print( + f"[MODBUS] Error: Safe buffer access not valid: {self.safe_buffer_access.error_msg}" + ) return [0] * count - + # Ensure thread-safe access self.safe_buffer_access.acquire_mutex() values = [] for i in range(count): coil_addr = address + i - + if coil_addr < self.num_coils: # Map coil address to buffer and bit indices buffer_idx = coil_addr // MAX_BITS # 8 bits per buffer - bit_idx = coil_addr % MAX_BITS # bit within buffer + bit_idx = coil_addr % MAX_BITS # bit within buffer - value, error_msg = self.safe_buffer_access.read_bool_output(buffer_idx, bit_idx, thread_safe=False) + value, error_msg = self.safe_buffer_access.read_bool_output( + buffer_idx, bit_idx, thread_safe=False + ) if error_msg == "Success": values.append(1 if value else 0) - print(f"[MODBUS] Read coil {coil_addr} (buf:{buffer_idx}, bit:{bit_idx}): {value}") else: print(f"[MODBUS] Error reading coil {coil_addr}: {error_msg}") values.append(0) else: values.append(0) - + # Release mutex after access self.safe_buffer_access.release_mutex() return values - + def setValues(self, address, values): - address = address - 1 # Modbus addresses are 0-based """Set coil values to OpenPLC bool_output using SafeBufferAccess""" - print(f"[MODBUS] Coils setValues called: address={address}, values={values}") - + address = address - 1 # Modbus addresses are 0-based + if not self.safe_buffer_access.is_valid: - print(f"[MODBUS] Error: Safe buffer access not valid: {self.safe_buffer_access.error_msg}") + print( + f"[MODBUS] Error: Safe buffer access not valid: {self.safe_buffer_access.error_msg}" + ) return - + # Ensure thread-safe access self.safe_buffer_access.acquire_mutex() - + for i, value in enumerate(values): coil_addr = address + i - + if coil_addr < self.num_coils: # Map coil address to buffer and bit indices buffer_idx = coil_addr // MAX_BITS # 8 bits per buffer - bit_idx = coil_addr % MAX_BITS # bit within buffer + bit_idx = coil_addr % MAX_BITS # bit within buffer - success, error_msg = self.safe_buffer_access.write_bool_output(buffer_idx, bit_idx, bool(value), thread_safe=False) - if error_msg == "Success": - print(f"[MODBUS] Set coil {coil_addr} (buf:{buffer_idx}, bit:{bit_idx}): {bool(value)}") - else: + _, error_msg = self.safe_buffer_access.write_bool_output( + buffer_idx, bit_idx, bool(value), thread_safe=False + ) + if error_msg != "Success": print(f"[MODBUS] Error setting coil {coil_addr}: {error_msg}") - + # Release mutex after access self.safe_buffer_access.release_mutex() class OpenPLCDiscreteInputsDataBlock(ModbusSparseDataBlock): - """Custom Modbus discrete inputs data block that mirrors OpenPLC bool_input using SafeBufferAccess""" - + """Custom Modbus discrete inputs data block that mirrors OpenPLC bool_input.""" + def __init__(self, runtime_args, num_inputs=64): self.runtime_args = runtime_args self.num_inputs = num_inputs - + # Create safe buffer access wrapper self.safe_buffer_access = SafeBufferAccess(runtime_args) if not self.safe_buffer_access.is_valid: - print(f"[MODBUS] Warning: Failed to create safe buffer access for discrete inputs: {self.safe_buffer_access.error_msg}") - + print( + f"[MODBUS] Warning: Failed to create safe buffer access for " + f"discrete inputs: {self.safe_buffer_access.error_msg}" + ) + # Initialize with zeros super().__init__([0] * num_inputs) - + def getValues(self, address, count=1): - address = address - 1 # Modbus addresses are 0-based """Get discrete input values from OpenPLC bool_input using SafeBufferAccess""" - print(f"[MODBUS] Discrete Inputs getValues called: address={address}, count={count}") - + address = address - 1 # Modbus addresses are 0-based + if not self.safe_buffer_access.is_valid: - print(f"[MODBUS] Error: Safe buffer access not valid: {self.safe_buffer_access.error_msg}") + print( + f"[MODBUS] Error: Safe buffer access not valid: {self.safe_buffer_access.error_msg}" + ) return [0] * count - + # Ensure thread-safe access self.safe_buffer_access.acquire_mutex() - + values = [] for i in range(count): input_addr = address + i - + if input_addr < self.num_inputs: # Map input address to buffer and bit indices buffer_idx = input_addr // MAX_BITS # 8 bits per buffer - bit_idx = input_addr % MAX_BITS # bit within buffer + bit_idx = input_addr % MAX_BITS # bit within buffer - value, error_msg = self.safe_buffer_access.read_bool_input(buffer_idx, bit_idx, thread_safe=False) + value, error_msg = self.safe_buffer_access.read_bool_input( + buffer_idx, bit_idx, thread_safe=False + ) if error_msg == "Success": values.append(1 if value else 0) - print(f"[MODBUS] Read discrete input {input_addr} (buf:{buffer_idx}, bit:{bit_idx}): {value}") else: print(f"[MODBUS] Error reading discrete input {input_addr}: {error_msg}") values.append(0) else: values.append(0) - + # Release mutex after access self.safe_buffer_access.release_mutex() return values - + def setValues(self, address, values): """Discrete inputs are read-only, this method should not be called""" - print(f"[MODBUS] Warning: Attempt to write to read-only discrete inputs at address {address}") + pass # Silently ignore writes to read-only inputs class OpenPLCInputRegistersDataBlock(ModbusSparseDataBlock): - """Custom Modbus input registers data block that mirrors OpenPLC analog inputs using SafeBufferAccess""" - + """Custom Modbus input registers data block that mirrors OpenPLC analog inputs.""" + def __init__(self, runtime_args, num_registers=32): self.runtime_args = runtime_args self.num_registers = num_registers - + # Create safe buffer access wrapper self.safe_buffer_access = SafeBufferAccess(runtime_args) if not self.safe_buffer_access.is_valid: - print(f"[MODBUS] Warning: Failed to create safe buffer access for input registers: {self.safe_buffer_access.error_msg}") - + print( + f"[MODBUS] Warning: Failed to create safe buffer access for " + f"input registers: {self.safe_buffer_access.error_msg}" + ) + # Initialize with zeros super().__init__([0] * num_registers) - + def getValues(self, address, count=1): - address = address - 1 # Modbus addresses are 0-based """Get input register values from OpenPLC int_input using SafeBufferAccess""" - print(f"[MODBUS] Input Registers getValues called: address={address}, count={count}") - + address = address - 1 # Modbus addresses are 0-based + if not self.safe_buffer_access.is_valid: - print(f"[MODBUS] Error: Safe buffer access not valid: {self.safe_buffer_access.error_msg}") + print( + f"[MODBUS] Error: Safe buffer access not valid: {self.safe_buffer_access.error_msg}" + ) return [0] * count - + # Ensure buffer mutex self.safe_buffer_access.acquire_mutex() values = [] for i in range(count): reg_addr = address + i - + if reg_addr < self.num_registers: - value, error_msg = self.safe_buffer_access.read_int_input(reg_addr, thread_safe=False) + value, error_msg = self.safe_buffer_access.read_int_input( + reg_addr, thread_safe=False + ) if error_msg == "Success": values.append(value) - print(f"[MODBUS] Read input register {reg_addr}: {value}") else: print(f"[MODBUS] Error reading input register {reg_addr}: {error_msg}") values.append(0) else: values.append(0) - + # Release mutex after access self.safe_buffer_access.release_mutex() return values - + def setValues(self, address, values): """Input registers are read-only, this method should not be called""" - print(f"[MODBUS] Warning: Attempt to write to read-only input registers at address {address}") + pass # Silently ignore writes to read-only registers class OpenPLCHoldingRegistersDataBlock(ModbusSparseDataBlock): - """Custom Modbus holding registers data block that mirrors OpenPLC analog outputs using SafeBufferAccess""" - + """Custom Modbus holding registers data block that mirrors OpenPLC analog outputs.""" + def __init__(self, runtime_args, num_registers=32): self.runtime_args = runtime_args self.num_registers = num_registers - + # Create safe buffer access wrapper self.safe_buffer_access = SafeBufferAccess(runtime_args) if not self.safe_buffer_access.is_valid: - print(f"[MODBUS] Warning: Failed to create safe buffer access for holding registers: {self.safe_buffer_access.error_msg}") - + print( + f"[MODBUS] Warning: Failed to create safe buffer access for " + f"holding registers: {self.safe_buffer_access.error_msg}" + ) + # Initialize with zeros super().__init__([0] * num_registers) - + def getValues(self, address, count=1): - address = address - 1 # Modbus addresses are 0-based """Get holding register values from OpenPLC int_output using SafeBufferAccess""" - print(f"[MODBUS] Holding Registers getValues called: address={address}, count={count}") - + address = address - 1 # Modbus addresses are 0-based + if not self.safe_buffer_access.is_valid: - print(f"[MODBUS] Error: Safe buffer access not valid: {self.safe_buffer_access.error_msg}") + print( + f"[MODBUS] Error: Safe buffer access not valid: {self.safe_buffer_access.error_msg}" + ) return [0] * count - + # Ensure buffer mutex - self.safe_buffer_access.acquire_mutex() + self.safe_buffer_access.acquire_mutex() values = [] for i in range(count): reg_addr = address + i - + if reg_addr < self.num_registers: - value, error_msg = self.safe_buffer_access.read_int_output(reg_addr, thread_safe=False) + value, error_msg = self.safe_buffer_access.read_int_output( + reg_addr, thread_safe=False + ) if error_msg == "Success": values.append(value) - print(f"[MODBUS] Read holding register {reg_addr}: {value}") else: print(f"[MODBUS] Error reading holding register {reg_addr}: {error_msg}") values.append(0) else: values.append(0) - + # Release mutex after access self.safe_buffer_access.release_mutex() return values - + def setValues(self, address, values): - address = address - 1 # Modbus addresses are 0-based """Set holding register values to OpenPLC int_output using SafeBufferAccess""" - print(f"[MODBUS] Holding Registers setValues called: address={address}, values={values}") - + address = address - 1 # Modbus addresses are 0-based + if not self.safe_buffer_access.is_valid: - print(f"[MODBUS] Error: Safe buffer access not valid: {self.safe_buffer_access.error_msg}") + print( + f"[MODBUS] Error: Safe buffer access not valid: {self.safe_buffer_access.error_msg}" + ) return - + # Ensure buffer mutex self.safe_buffer_access.acquire_mutex() - + for i, value in enumerate(values): reg_addr = address + i - + if reg_addr < self.num_registers: - success, error_msg = self.safe_buffer_access.write_int_output(reg_addr, value, thread_safe=False) - if error_msg == "Success": - print(f"[MODBUS] Set holding register {reg_addr}: {value}") - else: + _, error_msg = self.safe_buffer_access.write_int_output( + reg_addr, value, thread_safe=False + ) + if error_msg != "Success": print(f"[MODBUS] Error setting holding register {reg_addr}: {error_msg}") # Release mutex after access self.safe_buffer_access.release_mutex() + # Global variables for plugin lifecycle server_task = None server_context = None runtime_args = None running = False +server_loop = None # Reference to the server's event loop for cross-thread operations +server_started_event = threading.Event() # Signals successful server startup +server_error = None # Stores any startup error message gIp = "172.29.65.104" # Default values gPort = 5020 +# Retry configuration for server restart +RETRY_DELAY_BASE = 2.0 # Initial delay between restart attempts (seconds) +RETRY_DELAY_MAX = 30.0 # Maximum delay between restart attempts (seconds) + + def init(args_capsule): """Initialize the Modbus plugin""" global runtime_args, server_context, gIp, gPort print("[MODBUS] Python plugin 'simple_modbus' initializing...") - + try: # Print structure validation info for debugging print("[MODBUS] Validating plugin structure alignment...") # PluginStructureValidator.print_structure_info() - + # Extract runtime args from capsule using safe method - if hasattr(args_capsule, '__class__') and 'PyCapsule' in str(type(args_capsule)): + if hasattr(args_capsule, "__class__") and "PyCapsule" in str(type(args_capsule)): # This is a PyCapsule from C - use safe extraction runtime_args, error_msg = safe_extract_runtime_args_from_capsule(args_capsule) if runtime_args is None: print(f"[MODBUS] Failed to extract runtime args: {error_msg}") return False - - print(f"[MODBUS] Runtime arguments extracted successfully") + + print("[MODBUS] Runtime arguments extracted successfully") else: # This is a direct object (for testing) runtime_args = args_capsule - print(f"[MODBUS] Using direct runtime args for testing") - + print("[MODBUS] Using direct runtime args for testing") + # Try to load configuration from plugin_specific_config_file_path try: config_map, status = SafeBufferAccess(runtime_args).get_config_file_args_as_map() if status == "Success" and config_map: # Try to extract network configuration - network_config = config_map.get('network_configuration', {}) - if network_config and 'host' in network_config and 'port' in network_config: - gIp = str(network_config['host']) - gPort = int(network_config['port']) + network_config = config_map.get("network_configuration", {}) + if network_config and "host" in network_config and "port" in network_config: + gIp = str(network_config["host"]) + gPort = int(network_config["port"]) print(f"[MODBUS] Configuration loaded - Host: {gIp}, Port: {gPort}") else: - print(f"[MODBUS] Config file loaded but network_configuration section missing or incomplete - using defaults") + print( + "[MODBUS] Config file loaded but network_configuration section missing or incomplete - using defaults" + ) print(f"[MODBUS] Available config sections: {list(config_map.keys())}") else: print(f"[MODBUS] Failed to load configuration file: {status} - using defaults") except Exception as config_error: print(f"[MODBUS] Exception while loading config: {config_error} - using defaults") import traceback + traceback.print_exc() # Safely access buffer size using validation @@ -345,11 +383,11 @@ def init(args_capsule): if buffer_size == -1: print(f"[MODBUS] Failed to access buffer size: {size_error}") return False - + # print(f"[MODBUS] Buffer size: {buffer_size}") # print(f"[MODBUS] Bits per buffer: {runtime_args.bits_per_buffer}") # print(f"[MODBUS] Structure details: {runtime_args}") - + # Create OpenPLC-connected data blocks for all Modbus types coils_block = OpenPLCCoilsDataBlock(runtime_args, num_coils=64) discrete_inputs_block = OpenPLCDiscreteInputsDataBlock(runtime_args, num_inputs=64) @@ -362,139 +400,172 @@ def init(args_capsule): # print(f"[MODBUS] - Discrete Inputs (bool_input): {discrete_inputs_block.num_inputs} inputs") # print(f"[MODBUS] - Input Registers (int_input): {input_registers_block.num_registers} registers") # print(f"[MODBUS] - Holding Registers (int_output): {holding_registers_block.num_registers} registers") - + device = ModbusDeviceContext( - di=discrete_inputs_block, # Discrete Inputs -> bool_input - co=coils_block, # Coils -> bool_output - ir=input_registers_block, # Input Registers -> int_input - hr=holding_registers_block # Holding Registers -> int_output + di=discrete_inputs_block, # Discrete Inputs -> bool_input + co=coils_block, # Coils -> bool_output + ir=input_registers_block, # Input Registers -> int_input + hr=holding_registers_block, # Holding Registers -> int_output ) server_context = ModbusServerContext(devices={1: device}, single=False) - + print(f"[MODBUS] Plugin initialized successfully - Host: {gIp}, Port: {gPort}") return True - + except Exception as e: print(f"[MODBUS] Plugin initialization failed: {e}") import traceback + traceback.print_exc() return False + def start_loop(): - """Start the Modbus server""" - global server_task, running, gIp, gPort - + """Start the Modbus server with automatic restart on failure.""" + global server_task, running, server_loop, server_started_event, server_error + if server_context is None: print("[MODBUS] Error: Plugin not initialized") return False - - print("[MODBUS] Server context is valid, proceeding with startup...") - print(f"[MODBUS] Server context created successfully") - + + # Prevent double-start + if server_task is not None and server_task.is_alive(): + print("[MODBUS] Warning: Server already running") + return True + running = True - - # Start server in separate thread with proper asyncio handling + server_started_event.clear() + server_error = None + def run_server(): - try: - print("[MODBUS] Creating new event loop for server thread...") - # Create new event loop for this thread - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - print("[MODBUS] Event loop created successfully") - - # Start the server and keep it running - async def start_server(): + """Server thread with automatic restart on failure (never give up).""" + global server_loop, server_error + backoff = RETRY_DELAY_BASE + first_attempt = True + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + server_loop = loop + + async def server_runner(): + """Main server coroutine with restart logic.""" + global server_error + nonlocal backoff, first_attempt + + while running: + # Check if cleanup has been called + if server_context is None: + break + try: - print(f"[MODBUS] Attempting to start TCP server on {gIp}:{gPort}...") - try: - server = await StartAsyncTcpServer( - context=server_context, - address=(gIp, gPort) - ) - print(f"[MODBUS] Server successfully bound to {gIp}:{gPort}") - except Exception as bind_error: - print(f"[MODBUS] Failed to bind to {gIp}:{gPort}: {bind_error}") - print(f"[MODBUS] Attempting to bind to 0.0.0.0:{gPort} as fallback...") - server = await StartAsyncTcpServer( - context=server_context, - address=("0.0.0.0", gPort) - ) - print(f"[MODBUS] Server successfully bound to 0.0.0.0:{gPort} (fallback)") - - # Keep the server running - try: - print("[MODBUS] Server is now running and accepting connections") - while running: - await asyncio.sleep(1) - except asyncio.CancelledError: - print("[MODBUS] Server cancelled") - finally: - print("[MODBUS] Shutting down server...") - if hasattr(server, 'close'): - server.close() - if hasattr(server, 'wait_closed'): - await server.wait_closed() - print("[MODBUS] Server shutdown complete") - - except Exception as server_error: - print(f"[MODBUS] Error in start_server async function: {server_error}") - import traceback - print(f"[MODBUS] Traceback: {traceback.format_exc()}") - raise - - # Run the server - print("[MODBUS] Running server event loop...") - loop.run_until_complete(start_server()) - + # Create and start the server + server = ModbusTcpServer(context=server_context, address=(gIp, gPort)) + + # serve_forever with background=True returns after successful bind + await server.serve_forever(background=True) + + # If we get here, server is listening + if first_attempt: + print(f"[MODBUS] Server listening on {gIp}:{gPort}") + server_started_event.set() + first_attempt = False + + backoff = RETRY_DELAY_BASE # Reset backoff on success + + # Keep server running until stop is requested + while running and server_context is not None: + await asyncio.sleep(1) + + # Graceful shutdown + await server.shutdown() + break + + except Exception as e: + error_msg = str(e) + server_error = error_msg + + if first_attempt: + # Signal startup failure on first attempt + print(f"[MODBUS] Failed to start server on {gIp}:{gPort}: {error_msg}") + server_started_event.set() # Unblock start_loop + + if not running: + break # Stop requested, don't retry + + print(f"[MODBUS] Server error, will retry in {backoff:.1f}s: {error_msg}") + + # Wait before retry (check running flag periodically) + wait_time = 0 + while wait_time < backoff and running: + await asyncio.sleep(0.5) + wait_time += 0.5 + + # Increase backoff for next attempt (capped at max) + backoff = min(backoff * 1.5, RETRY_DELAY_MAX) + first_attempt = False + + try: + loop.run_until_complete(server_runner()) except Exception as e: - print(f"[MODBUS] Error in run_server thread: {e}") - import traceback - print(f"[MODBUS] Full traceback: {traceback.format_exc()}") + print(f"[MODBUS] Fatal error in server thread: {e}") finally: - print("[MODBUS] Closing event loop...") + server_loop = None loop.close() - print("[MODBUS] Event loop closed") - + server_task = threading.Thread(target=run_server, daemon=False) server_task.start() - print(f"[MODBUS] Server thread started on {gIp}:{gPort}") - return True + # Wait for server to start (or fail) with timeout + startup_timeout = 5.0 + if server_started_event.wait(timeout=startup_timeout): + if server_error is not None: + print(f"[MODBUS] Server startup failed: {server_error}") + return False + return True + else: + print(f"[MODBUS] Timeout waiting for server to start on {gIp}:{gPort}") + return False + def stop_loop(): - """Stop the Modbus server""" + """Stop the Modbus server gracefully.""" global server_task, running - + running = False - + if server_task: - # Stop the asyncio server + # Call ServerStop() directly - it's designed for cross-thread use + # (uses asyncio.run_coroutine_threadsafe internally) try: - asyncio.run(ServerStop()) - except: - pass - - server_task.join(timeout=2.0) + ServerStop() + except RuntimeError as e: + # Server may not be running or already stopped + print(f"[MODBUS] ServerStop warning: {e}") + + server_task.join(timeout=5.0) + if server_task.is_alive(): + print("[MODBUS] Warning: Server thread did not stop within timeout") server_task = None - + print("[MODBUS] Server stopped") return True + def cleanup(): """Cleanup plugin resources""" global server_context, runtime_args - + server_context = None runtime_args = None - + print("[MODBUS] Plugin cleaned up") return True + async def main(): """Standalone server for testing""" # Create a proper mock runtime args that inherits from PluginRuntimeArgs - import ctypes - + # Create a mock that has the required methods class MockArgs: def __init__(self): @@ -506,26 +577,28 @@ def __init__(self): self.mutex_take = None self.mutex_give = None self.buffer_mutex = None - + def safe_access_buffer_size(self): """Mock implementation of safe_access_buffer_size""" return self.buffer_size, "Success" - + def validate_pointers(self): """Mock implementation of validate_pointers""" return True, "Mock validation passed" - + def __str__(self): - return f"MockArgs(buffer_size={self.buffer_size}, bits_per_buffer={self.bits_per_buffer})" - + return ( + f"MockArgs(buffer_size={self.buffer_size}, bits_per_buffer={self.bits_per_buffer})" + ) + mock_args = MockArgs() - + # Initialize and start if init(mock_args): if start_loop(): print(f"Modbus server running on {gIp}:{gPort}") print("Press Ctrl+C to stop...") - + try: # Keep server running while True: @@ -539,5 +612,6 @@ def __str__(self): else: print("Failed to initialize plugin") + if __name__ == "__main__": asyncio.run(main())