diff --git a/README.md b/README.md index e8cd1f4..69a7f1f 100644 --- a/README.md +++ b/README.md @@ -43,8 +43,8 @@ from p2pnetwork.node import Node class MyOwnPeer2PeerNode (Node): # Python class constructor - def __init__(self, host, port): - super(MyOwnPeer2PeerNode, self).__init__(host, port, None) + def __init__(self, host, port, id=None, callback=None, max_connections=0): + super(MyOwnPeer2PeerNode, self).__init__(host, port, id, callback, max_connections) def outbound_node_connected(self, connected_node): print("outbound_node_connected: " + connected_node.id) @@ -75,15 +75,15 @@ class MyOwnPeer2PeerNode (Node): return MyOwnNodeConnection(self, connection, id, host, port) ```` ### Extend class NodeConnection -The NodeConnection class only hold the TCP/IP connection with the other node, to manage the different connection to and from the main node. It does not implement application specific elements. Mostly, you will only need to extend the Node class. However, when you would like to create your own NodeConnection class you can do this. Make sure that you override ````create_new_connection(self, connection, id, host, port)```` in the class Node, to make sure you initiate your own NodeConnection class. The example below shows some example. +The NodeConnection class only hold the TCP/IP connection with the other node, to manage the different connections to and from the main node. It does not implement application specific elements. Mostly, you will only need to extend the Node class. However, when you would like to create your own NodeConnection class you can do this. Make sure that you override ````create_new_connection(self, connection, id, host, port)```` in the class Node, to make sure you initiate your own NodeConnection class. The example below shows some example. ````python from p2pnetwork.node import Node class MyOwnPeer2PeerNode (Node): # Python class constructor - def __init__(self, host, port): - super(MyOwnPeer2PeerNode, self).__init__(host, port, None) + def __init__(self, host, port, id=None, callback=None, max_connections=0): + super(MyOwnPeer2PeerNode, self).__init__(host, port, id, callback, max_connections) # Override event functions... @@ -101,7 +101,7 @@ class MyOwnNodeConnection (NodeConnection): super(MyOwnNodeConnection, self).__init__(main_node, sock, id, host, port) # Check yourself what you would like to change and override! See the - # documentation + # documentation and code of the nodeconnection class. ```` ### Using your new classes diff --git a/examples/MyOwnPeer2PeerNode.py b/examples/MyOwnPeer2PeerNode.py index 1576ec6..0c8cc9f 100644 --- a/examples/MyOwnPeer2PeerNode.py +++ b/examples/MyOwnPeer2PeerNode.py @@ -1,39 +1,40 @@ ####################################################################################################################### # Author: Maurice Snoeren # -# Version: 0.1 beta (use at your own risk) # +# Version: 0.2 beta (use at your own risk) # # # # MyOwnPeer2PeerNode is an example how to use the p2pnet.Node to implement your own peer-to-peer network node. # +# 28/06/2021: Added the new developments on id and max_connections ####################################################################################################################### from p2pnetwork.node import Node class MyOwnPeer2PeerNode (Node): # Python class constructor - def __init__(self, host, port): - super(MyOwnPeer2PeerNode, self).__init__(host, port, None) + def __init__(self, host, port, id=None, callback=None, max_connections=0): + super(MyOwnPeer2PeerNode, self).__init__(host, port, id, callback, max_connections) print("MyPeer2PeerNode: Started") # all the methods below are called when things happen in the network. # implement your network node behavior to create the required functionality. def outbound_node_connected(self, node): - print("outbound_node_connected: " + node.id) + print("outbound_node_connected (" + self.id + "): " + node.id) def inbound_node_connected(self, node): - print("inbound_node_connected: " + node.id) + print("inbound_node_connected: (" + self.id + "): " + node.id) def inbound_node_disconnected(self, node): - print("inbound_node_disconnected: " + node.id) + print("inbound_node_disconnected: (" + self.id + "): " + node.id) def outbound_node_disconnected(self, node): - print("outbound_node_disconnected: " + node.id) + print("outbound_node_disconnected: (" + self.id + "): " + node.id) def node_message(self, node, data): - print("node_message from " + node.id + ": " + str(data)) + print("node_message (" + self.id + ") from " + node.id + ": " + str(data)) def node_disconnect_with_outbound_node(self, node): - print("node wants to disconnect with other outbound node: " + node.id) + print("node wants to disconnect with oher outbound node: (" + self.id + "): " + node.id) def node_request_to_stop(self): - print("node is requested to stop!") + print("node is requested to stop (" + self.id + "): ") diff --git a/examples/my_own_p2p_application.py b/examples/my_own_p2p_application.py index e428fb3..95e0dc9 100644 --- a/examples/my_own_p2p_application.py +++ b/examples/my_own_p2p_application.py @@ -13,9 +13,9 @@ from MyOwnPeer2PeerNode import MyOwnPeer2PeerNode -node_1 = MyOwnPeer2PeerNode("127.0.0.1", 8001) -node_2 = MyOwnPeer2PeerNode("127.0.0.1", 8002) -node_3 = MyOwnPeer2PeerNode("127.0.0.1", 8003) +node_1 = MyOwnPeer2PeerNode("127.0.0.1", 8001, 1) +node_2 = MyOwnPeer2PeerNode("127.0.0.1", 8002, 2) +node_3 = MyOwnPeer2PeerNode("127.0.0.1", 8003, 3) time.sleep(1) @@ -27,12 +27,28 @@ node_1.connect_with_node('127.0.0.1', 8002) node_2.connect_with_node('127.0.0.1', 8003) -node_3.connect_with_node('127.0.0.1', 8002) +node_3.connect_with_node('127.0.0.1', 8001) time.sleep(2) node_1.send_to_nodes("message: Hi there!") +time.sleep(2) + +print("node 1 is stopping..") +node_1.stop() + +time.sleep(20) + +node_2.send_to_nodes("message: Hi there node 2!") +node_2.send_to_nodes("message: Hi there node 2!") +node_2.send_to_nodes("message: Hi there node 2!") +node_3.send_to_nodes("message: Hi there node 2!") +node_3.send_to_nodes("message: Hi there node 2!") +node_3.send_to_nodes("message: Hi there node 2!") + +time.sleep(10) + time.sleep(5) node_1.stop() diff --git a/examples/my_own_p2p_application_callback.py b/examples/my_own_p2p_application_callback.py index 34e7bcb..0617944 100644 --- a/examples/my_own_p2p_application_callback.py +++ b/examples/my_own_p2p_application_callback.py @@ -28,9 +28,9 @@ def node_callback(event, main_node, connected_node, data): # Just for test we spin off multiple nodes, however it is more likely that these nodes are running # on computers on the Internet! Otherwise we do not have any peer2peer application. -node_1 = Node("127.0.0.1", 8001, node_callback) -node_2 = Node("127.0.0.1", 8002, node_callback) -node_3 = Node("127.0.0.1", 8003, node_callback) +node_1 = Node("127.0.0.1", 8001, callback=node_callback) +node_2 = Node("127.0.0.1", 8002, callback=node_callback) +node_3 = Node("127.0.0.1", 8003, callback=node_callback) time.sleep(1) #node_1.debug = True diff --git a/examples/my_own_p2p_application_using_dict.py b/examples/my_own_p2p_application_using_dict.py index fdae9ac..9888481 100644 --- a/examples/my_own_p2p_application_using_dict.py +++ b/examples/my_own_p2p_application_using_dict.py @@ -27,7 +27,7 @@ node_1.connect_with_node('127.0.0.1', 8002) node_2.connect_with_node('127.0.0.1', 8003) -node_3.connect_with_node('127.0.0.1', 8002) +node_3.connect_with_node('127.0.0.1', 8001) time.sleep(2) diff --git a/p2pnetwork/node.py b/p2pnetwork/node.py index 55f4058..9c56619 100644 --- a/p2pnetwork/node.py +++ b/p2pnetwork/node.py @@ -1,5 +1,4 @@ import socket -import sys import time import threading import random @@ -14,7 +13,6 @@ Python package p2pnet for implementing decentralized peer-to-peer network applications -TODO: Variabele to limit the number of connected nodes. TODO: Also create events when things go wrong, like a connection with a node has failed. """ @@ -33,12 +31,14 @@ def node_callback(event, main_node, connected_node, data): connected_node: Which connected node caused the event. data: The data that is send by the connected node.""" - def __init__(self, host, port, callback=None): + def __init__(self, host, port, id=None, callback=None, max_connections=0): """Create instance of a Node. If you want to implement the Node functionality with a callback, you should provide a callback method. It is preferred to implement a new node by extending this Node class. host: The host name or ip address that is used to bind the TCP/IP server to. port: The port number that is used to bind the TCP/IP server to. - callback: (optional) The callback that is invokes when events happen inside the network.""" + id: (optional) This id will be assiocated with the node. When not given a unique ID will be created. + callback: (optional) The callback that is invokes when events happen inside the network. + max_connections: (optional) limiting the maximum nodes that are able to connect to this node.""" super(Node, self).__init__() # When this flag is set, the node will stop and close @@ -52,17 +52,23 @@ def __init__(self, host, port, callback=None): self.callback = callback # Nodes that have established a connection with this node - self.nodes_inbound = [] # Nodes that are connect with us N->(US)->N + self.nodes_inbound = [] # Nodes that are connect with us N->(US) # Nodes that this nodes is connected to self.nodes_outbound = [] # Nodes that we are connected to (US)->N - # Create a unique ID for each node. - # TODO: A fixed unique ID is required for each node, node some random is created, need to think of it. - id = hashlib.sha512() - t = self.host + str(self.port) + str(random.randint(1, 99999999)) - id.update(t.encode('ascii')) - self.id = id.hexdigest() + # A list of nodes that should be reconnected to whenever the connection was lost + self.reconnect_to_nodes = [] + + # Create a unique ID for each node if the ID is not given. + if id == None: + id = hashlib.sha512() + t = self.host + str(self.port) + str(random.randint(1, 99999999)) + id.update(t.encode('ascii')) + self.id = id.hexdigest() + + else: + self.id = str(id) # Make sure the ID is a string! # Start the TCP/IP server self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) @@ -72,6 +78,9 @@ def __init__(self, host, port, callback=None): self.message_count_send = 0 self.message_count_recv = 0 self.message_count_rerr = 0 + + # Connection limit of inbound nodes (nodes that connect to us) + self.max_connections = max_connections # Debugging on or off! self.debug = False @@ -84,7 +93,7 @@ def all_nodes(self): def debug_print(self, message): """When the debug flag is set to True, all debug messages are printed in the console.""" if self.debug: - print("DEBUG: " + message) + print("DEBUG (" + self.id + "): " + message) def init_server(self): """Initialization of the TCP/IP server to receive connections. It binds to the given host and port.""" @@ -100,22 +109,6 @@ def print_connections(self): print("- Total nodes connected with us: %d" % len(self.nodes_inbound)) print("- Total nodes connected to : %d" % len(self.nodes_outbound)) - def delete_closed_connections(self): - """Misleading function name, while this function checks whether the connected nodes have been terminated - by the other host. If so, clean the array list of the nodes. When a connection is closed, an event is - send node_message or outbound_node_disconnected.""" - for n in self.nodes_inbound: - if n.terminate_flag.is_set(): - self.inbound_node_disconnected(n) - n.join() - del self.nodes_inbound[self.nodes_inbound.index(n)] - - for n in self.nodes_outbound: - if n.terminate_flag.is_set(): - self.outbound_node_disconnected(n) - n.join() - del self.nodes_outbound[self.nodes_inbound.index(n)] - def send_to_nodes(self, data, exclude=[]): """ Send a message to all the nodes that are connected with this node. data is a python variable which is converted to JSON that is send over to the other node. exclude list gives all the nodes to which this @@ -136,22 +129,19 @@ def send_to_nodes(self, data, exclude=[]): def send_to_node(self, n, data): """ Send the data to the node n if it exists.""" self.message_count_send = self.message_count_send + 1 - self.delete_closed_connections() if n in self.nodes_inbound or n in self.nodes_outbound: - try: - n.send(data) + n.send(data) - except Exception as e: - self.debug_print("Node send_to_node: Error while sending data to the node (" + str(e) + ")") else: self.debug_print("Node send_to_node: Could not send the data, node is not found!") - def connect_with_node(self, host, port): + def connect_with_node(self, host, port, reconnect=False): """ Make a connection with another node that is running on host with port. When the connection is made, an event is triggered outbound_node_connected. When the connection is made with the node, it exchanges the id's of the node. First we send our id and then we receive the id of the node we are connected to. - When the connection is made the method outbound_node_connected is invoked. - TODO: think wheter we need an error event to trigger when the connection has failed!""" + When the connection is made the method outbound_node_connected is invoked. If reconnect is True, the + node will try to reconnect to the code whenever the node connection was closed.""" + if host == self.host and port == self.port: print("connect_with_node: Cannot connect with yourself!!") return False @@ -159,7 +149,7 @@ def connect_with_node(self, host, port): # Check if node is already connected with this node! for node in self.nodes_outbound: if node.host == host and node.port == port: - print("connect_with_node: Already connected with this node.") + print("connect_with_node: Already connected with this node (" + node.id + ").") return True try: @@ -171,12 +161,25 @@ def connect_with_node(self, host, port): sock.send(self.id.encode('utf-8')) # Send my id to the connected node! connected_node_id = sock.recv(4096).decode('utf-8') # When a node is connected, it sends it id! + # Fix bug: Cannot connect with nodes that are already connected with us! + for node in self.nodes_inbound: + if node.host == host and node.id == connected_node_id: + print("connect_with_node: This node (" + node.id + ") is already connected with us.") + return True + thread_client = self.create_new_connection(sock, connected_node_id, host, port) thread_client.start() self.nodes_outbound.append(thread_client) self.outbound_node_connected(thread_client) + # If reconnection to this host is required, it will be added to the list! + if reconnect: + self.debug_print("connect_with_node: Reconnection check is enabled on node " + host + ":" + str(port)) + self.reconnect_to_nodes.append({ + "host": host, "port": port, "tries": 0 + }) + except Exception as e: self.debug_print("TcpServer.connect_with_node: Could not connect with node. (" + str(e) + ")") @@ -187,11 +190,9 @@ def disconnect_with_node(self, node): if node in self.nodes_outbound: self.node_disconnect_with_outbound_node(node) node.stop() - node.join() # When this is here, the application is waiting and waiting - del self.nodes_outbound[self.nodes_outbound.index(node)] else: - print("Node disconnect_with_node: cannot disconnect with a node with which we are not connected.") + self.debug_print("Node disconnect_with_node: cannot disconnect with a node with which we are not connected.") def stop(self): """Stop this node and terminate all the connected nodes.""" @@ -206,6 +207,28 @@ def create_new_connection(self, connection, id, host, port): the node connection.""" return NodeConnection(self, connection, id, host, port) + def reconnect_nodes(self): + """This method checks whether nodes that have the reconnection status are still connected. If not + connected these nodes are started again.""" + for node_to_check in self.reconnect_to_nodes: + found_node = False + self.debug_print("reconnect_nodes: Checking node " + node_to_check["host"] + ":" + str(node_to_check["port"])) + + for node in self.nodes_outbound: + if node.host == node_to_check["host"] and node.port == node_to_check["port"]: + found_node = True + node_to_check["trials"] = 0 # Reset the trials + self.debug_print("reconnect_nodes: Node " + node_to_check["host"] + ":" + str(node_to_check["port"]) + " still running!") + + if not found_node: # Reconnect with node + node_to_check["trials"] += 1 + if self.node_reconnection_error(node_to_check["host"], node_to_check["port"], node_to_check["trials"]): + self.connect_with_node(node_to_check["host"], node_to_check["port"]) # Perform the actual connection + + else: + self.debug_print("reconnect_nodes: Removing node (" + node_to_check["host"] + ":" + str(node_to_check["port"]) + ") from the reconnection list!") + self.reconnect_to_nodes.remove(node_to_check) + def run(self): """The main loop of the thread that deals with connections from other nodes on the network. When a node is connected it will exchange the node id's. First we receive the id of the connected node @@ -215,24 +238,33 @@ def run(self): try: self.debug_print("Node: Wait for incoming connection") connection, client_address = self.sock.accept() - - # Basic information exchange (not secure) of the id's of the nodes! - connected_node_id = connection.recv(4096).decode('utf-8') # When a node is connected, it sends it id! - connection.send(self.id.encode('utf-8')) # Send my id to the connected node! - thread_client = self.create_new_connection(connection, connected_node_id, client_address[0], client_address[1]) - thread_client.start() + self.debug_print("Total inbound connections:" + str(len(self.nodes_inbound))) + # When the maximum connections is reached, it disconnects the connection + if self.max_connections == 0 or len(self.nodes_inbound) < self.max_connections: + + # Basic information exchange (not secure) of the id's of the nodes! + connected_node_id = connection.recv(4096).decode('utf-8') # When a node is connected, it sends it id! + connection.send(self.id.encode('utf-8')) # Send my id to the connected node! + + thread_client = self.create_new_connection(connection, connected_node_id, client_address[0], client_address[1]) + thread_client.start() - self.nodes_inbound.append(thread_client) + self.nodes_inbound.append(thread_client) + self.inbound_node_connected(thread_client) - self.inbound_node_connected(thread_client) - + else: + self.debug_print("New connection is closed. You have reached the maximum connection limit!") + connection.close() + except socket.timeout: self.debug_print('Node: Connection timeout!') except Exception as e: raise e + self.reconnect_nodes() + time.sleep(0.01) print("Node stopping...") @@ -267,6 +299,20 @@ def inbound_node_connected(self, node): if self.callback is not None: self.callback("inbound_node_connected", self, node, {}) + def node_disconnected(self, node): + """While the same nodeconnection class is used, the class itself is not able to + determine if it is a inbound or outbound connection. This function is making + sure the correct method is used.""" + self.debug_print("node_disconnected: " + node.id) + + if node in self.nodes_inbound: + del self.nodes_inbound[self.nodes_inbound.index(node)] + self.inbound_node_disconnected(node) + + if node in self.nodes_outbound: + del self.nodes_outbound[self.nodes_outbound.index(node)] + self.outbound_node_disconnected(node) + def inbound_node_disconnected(self, node): """This method is invoked when a node, that was previously connected with us, is in a disconnected state.""" @@ -300,6 +346,15 @@ def node_request_to_stop(self): if self.callback is not None: self.callback("node_request_to_stop", self, {}, {}) + def node_reconnection_error(self, host, port, trials): + """This method is invoked when a reconnection error occurred. The node connection is disconnected and the + flag for reconnection is set to True for this node. This function can be overidden to implement your + specific logic to take action when a lot of trials have been done. If the method returns True, the + node will try to perform the reconnection. If the method returns False, the node will stop reconnecting + to this node. The node will forever tries to perform the reconnection.""" + self.debug_print("node_reconnection_error: Reconnecting to node " + host + ":" + str(port) + " (trials: " + str(trials) + ")") + return True + def __str__(self): return 'Node: {}:{}'.format(self.host, self.port) diff --git a/p2pnetwork/nodeconnection.py b/p2pnetwork/nodeconnection.py index 039f020..ff8fc2a 100644 --- a/p2pnetwork/nodeconnection.py +++ b/p2pnetwork/nodeconnection.py @@ -1,9 +1,6 @@ import socket -import sys import time import threading -import random -import hashlib import json """ @@ -45,7 +42,7 @@ def __init__(self, main_node, sock, id, host, port): self.terminate_flag = threading.Event() # The id of the connected node - self.id = id + self.id = str(id) # Make sure the ID is a string # End of transmission character for the network streaming messages. self.EOT_CHAR = 0x04.to_bytes(1, 'big') @@ -53,28 +50,37 @@ def __init__(self, main_node, sock, id, host, port): # Datastore to store additional information concerning the node. self.info = {} + # Use socket timeout to determine problems with the connection + self.sock.settimeout(10.0) + self.main_node.debug_print("NodeConnection.send: Started with client (" + self.id + ") '" + self.host + ":" + str(self.port) + "'") def send(self, data, encoding_type='utf-8'): """Send the data to the connected node. The data can be pure text (str), dict object (send as json) and bytes object. When sending bytes object, it will be using standard socket communication. A end of transmission character 0x04 - utf-8/ascii will be used to decode the packets ate the other node.""" + utf-8/ascii will be used to decode the packets ate the other node. When the socket is corrupted the node connection + is closed.""" if isinstance(data, str): - self.sock.sendall( data.encode(encoding_type) + self.EOT_CHAR ) + try: + self.sock.sendall( data.encode(encoding_type) + self.EOT_CHAR ) + + except Exception as e: # Fixed issue #19: When sending is corrupted, close the connection + self.main_node.debug_print("nodeconnection send: Error sending data to node: " + str(e)) + self.stop() # Stopping node due to failure elif isinstance(data, dict): try: json_data = json.dumps(data) json_data = json_data.encode(encoding_type) + self.EOT_CHAR self.sock.sendall(json_data) - + except TypeError as type_error: self.main_node.debug_print('This dict is invalid') self.main_node.debug_print(type_error) - except Exception as e: - print('Unexpected Error in send message') - print(e) + except Exception as e: # Fixed issue #19: When sending is corrupted, close the connection + self.main_node.debug_print("nodeconnection send: Error sending data to node: " + str(e)) + self.stop() # Stopping node due to failure elif isinstance(data, bytes): bin_data = data + self.EOT_CHAR @@ -112,8 +118,7 @@ def parse_packet(self, packet): def run(self): """The main loop of the thread to handle the connection with the node. Within the main loop the thread waits to receive data from the node. If data is received - the method node_message will be invoked of the main node to be processed.""" - self.sock.settimeout(10.0) + the method node_message will be invoked of the main node to be processed.""" buffer = b'' # Hold the stream that comes in! while not self.terminate_flag.is_set(): @@ -126,7 +131,7 @@ def run(self): self.main_node.debug_print("NodeConnection: timeout") except Exception as e: - self.terminate_flag.set() + self.terminate_flag.set() # Exception occurred terminating the connection self.main_node.debug_print('Unexpected error') self.main_node.debug_print(e) @@ -147,9 +152,9 @@ def run(self): time.sleep(0.01) # IDEA: Invoke (event) a method in main_node so the user is able to send a bye message to the node before it is closed? - self.sock.settimeout(None) self.sock.close() + self.main_node.node_disconnected( self ) # Fixed issue #19: Send to main_node when a node is disconnected. We do not know whether it is inbounc or outbound. self.main_node.debug_print("NodeConnection: Stopped") def set_info(self, key, value): diff --git a/p2pnetwork/tests/test_node.py b/p2pnetwork/tests/test_node.py index afd16b1..4e80cce 100644 --- a/p2pnetwork/tests/test_node.py +++ b/p2pnetwork/tests/test_node.py @@ -10,6 +10,8 @@ Testing the node on its basic functionality, like connecting to other nodes and sending data around. Furthermore, the events are tested whether they are handled correctly in the case of the callback and in the case of extending the class. +TODO: Tests to check the correct disconnection of the nodes. +TODO: Tests to check the reconnection functionality of the node. """ class TestNode(unittest.TestCase): @@ -17,8 +19,8 @@ class TestNode(unittest.TestCase): def test_node_connection(self): """Testing whether two Node instances are able to connect with each other.""" - node1 = Node("localhost", 10001) - node2 = Node("localhost", 10002) + node1 = Node(host="localhost", port=10001) + node2 = Node(host="localhost", port=10002) node1.start() node2.start() @@ -76,8 +78,8 @@ def node_callback(event, main_node, connected_node, data): except Exception as e: message = "exception: " + str(e) - node1 = Node("localhost", 10001, node_callback) - node2 = Node("localhost", 10002, node_callback) + node1 = Node(host="localhost", port=10001, callback=node_callback) + node2 = Node(host="localhost", port=10002, callback=node_callback) node1.start() node2.start() @@ -120,9 +122,9 @@ def node_callback(event, main_node, connected_node, data): except Exception as e: message.append("exception: " + str(e)) - node_0 = Node('127.0.0.1', 10000, node_callback) - node_1 = Node('127.0.0.1', 10001, node_callback) - node_2 = Node('127.0.0.1', 10002, node_callback) + node_0 = Node(host='127.0.0.1', port=10000, callback=node_callback) + node_1 = Node(host='127.0.0.1', port=10001, callback=node_callback) + node_2 = Node(host='127.0.0.1', port=10002, callback=node_callback) node_0.start() node_1.start() @@ -206,10 +208,10 @@ def node_callback(event, main_node, connected_node, data): global message message.append(event + ":" + main_node.id) - node_0 = Node('127.0.0.1', 10000, node_callback) - node_1 = Node('127.0.0.1', 10001, node_callback) - node_2 = Node('127.0.0.1', 10002, node_callback) - + node_0 = Node(host='127.0.0.1', port=10000, callback=node_callback) + node_1 = Node(host='127.0.0.1', port=10001, callback=node_callback) + node_2 = Node(host='127.0.0.1', port=10002, callback=node_callback) + node_0.start() node_1.start() node_2.start() @@ -312,9 +314,9 @@ def node_request_to_stop(self): global message message.append("node is requested to stop!") - node1 = MyTestNode("127.0.0.1", 10001) - node2 = MyTestNode("127.0.0.1", 10002) - node3 = MyTestNode("127.0.0.1", 10003) + node1 = MyTestNode(host="127.0.0.1", port=10001) + node2 = MyTestNode(host="127.0.0.1", port=10002) + node3 = MyTestNode(host="127.0.0.1", port=10003) node1.start() node2.start() @@ -373,5 +375,88 @@ def node_request_to_stop(self): self.assertEqual(message[12], "node is requested to stop!", "MyTestNode should have seen this event!") self.assertEqual(message[13], "node is requested to stop!", "MyTestNode should have seen this event!") + def test_node_max_connections(self): + """Testing the maximum connections of the node.""" + + global message + message = [] + + # Using the callback we are able to see the events and messages of the Node + def node_callback(event, main_node, connected_node, data): + global message + message.append(event + ":" + main_node.id) + + node_0 = Node(host='127.0.0.1', port=10000, callback=node_callback, max_connections=1) # max connection of 1 + node_1 = Node(host='127.0.0.1', port=10001, callback=node_callback, max_connections=2) # max connection of 2 + node_2 = Node(host='127.0.0.1', port=10002, callback=node_callback) + + node_0.start() + node_1.start() + node_2.start() + time.sleep(1) + + # Test the connections + node_1.connect_with_node('127.0.0.1', 10000) # This works! + time.sleep(2) + + node_2.connect_with_node('127.0.0.1', 10000) # This should be rejected + time.sleep(2) + + node_0.connect_with_node('127.0.0.1', 10001) # This works! + time.sleep(2) + + node_2.connect_with_node('127.0.0.1', 10001) # This works! + time.sleep(2) + + # Send messages + node_0.send_to_nodes('hello from node 0') + time.sleep(2) + + node_1.send_to_nodes('hello from node 1') + time.sleep(2) + + node_2.send_to_nodes('hello from node 2') + time.sleep(2) + + node_0.stop() + node_1.stop() + node_2.stop() + node_0.join() + node_1.join() + node_2.join() + + # Perform the asserts! + self.assertEqual(len(node_0.nodes_inbound), 1, "More inbound connections have been accepted bij node_0!") + self.assertEqual(len(node_1.nodes_inbound), 2, "Node 1 should have two connections from node_0 and node_2!") + self.assertEqual(len(node_2.nodes_outbound), 1, "Node 2 should have one outbound connection with node_1!") + + def test_node_id(self): + """Testing the ID settings of the node.""" + + global message + message = [] + + # Using the callback we are able to see the events and messages of the Node + def node_callback(event, main_node, connected_node, data): + global message + message.append(event + ":" + main_node.id) + + node_0 = Node(host='127.0.0.1', port=10000, id="thisisanidtest", callback=node_callback) + node_1 = Node(host='127.0.0.1', port=10001, callback=node_callback) + + node_0.start() + node_1.start() + time.sleep(1) + + node_0.stop() + node_1.stop() + node_0.join() + node_1.join() + + # Perform the asserts! + self.assertEqual(node_0.id, "thisisanidtest", "Node 0 shoud have id \"thisisanidtest\"") + self.assertNotEqual(node_1.id, "thisisanidtest", "Node 1 should have a different id than node 0") + self.assertNotEqual(node_1.id, None, "The ID pf node 1 should not be equal to None") + if __name__ == '__main__': unittest.main() diff --git a/setup.py b/setup.py index d3618e0..4a19d34 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ setuptools.setup( name="p2pnetwork", - version="1.0", + version="1.1", author="Maurice Snoeren", author_email="macsnoeren@gmail.com", description="Python decentralized peer-to-peer network application framework.",