From 7fb073e7a1d9fba748ce867641e20af5f39147f6 Mon Sep 17 00:00:00 2001 From: ljwoods2 Date: Tue, 10 Dec 2024 17:12:33 -0700 Subject: [PATCH 1/4] initial commit --- docs/source/protocol_v3.rst | 10 ++++---- imdclient/IMDClient.py | 42 +++++++++++++++++++++++++++---- imdclient/IMDProtocol.py | 1 + imdclient/tests/base.py | 25 ++++++++++++++++++ imdclient/tests/test_imdclient.py | 15 +++++++++++ 5 files changed, 83 insertions(+), 10 deletions(-) diff --git a/docs/source/protocol_v3.rst b/docs/source/protocol_v3.rst index d2f6358..fe486d3 100644 --- a/docs/source/protocol_v3.rst +++ b/docs/source/protocol_v3.rst @@ -145,7 +145,7 @@ and its associated body packet (if present) is described in detail. * - :ref:`forces` - 15 - ❌ - * - :ref:`wait-flag` + * - :ref:`wait` - 16 - ❌ @@ -489,10 +489,10 @@ forces were previously specified for this session in the :ref:`session info pack .. versionadded:: 3 -.. _wait-flag: +.. _wait: -Wait flag -^^^^^^^^^ +Wait +^^^^ Sent from the receiver to the simulation engine any time after the :ref:`session info packet ` has been sent to request that the simulation engine modify its waiting behavior mid-simulation either @@ -513,7 +513,7 @@ The simulation engine's waiting behavior also applies when a receiver disconnect .. code-block:: none Header: - 16 (int32) Wait flag + 16 (int32) Wait (int32) Nonzero to set the simulation engine's waiting behavior to blocking, 0 to set the simulation engine's waiting behavior to non-blocking diff --git a/imdclient/IMDClient.py b/imdclient/IMDClient.py index f158e2a..95b0b94 100644 --- a/imdclient/IMDClient.py +++ b/imdclient/IMDClient.py @@ -46,6 +46,10 @@ class IMDClient: IMDFramebuffer will be filled with as many :class:`IMDFrame` fit in `buffer_size` bytes [``10MB``] timeout : int, optional Timeout for the socket in seconds [``5``] + continue_after_disconnect : bool, optional [``None``] + If True, the client will attempt to change the simulation engine's waiting behavior to + non-blocking after the client disconnects. If False, the client will attempt to change it + to blocking. If None, the client will not attempt to change the simulation engine's behavior. **kwargs : dict (optional) Additional keyword arguments to pass to the :class:`BaseIMDProducer` and :class:`IMDFrameBuffer` """ @@ -57,6 +61,7 @@ def __init__( n_atoms, socket_bufsize=None, multithreaded=True, + continue_after_disconnect=None, **kwargs, ): @@ -64,6 +69,7 @@ def __init__( self._conn = self._connect_to_server(host, port, socket_bufsize) self._imdsinfo = self._await_IMD_handshake() self._multithreaded = multithreaded + self._continue_after_disconnect = continue_after_disconnect if self._multithreaded: self._buf = IMDFrameBuffer( @@ -201,7 +207,9 @@ def _connect_to_server(self, host, port, socket_bufsize): # /proc/sys/net/core/rmem_default # Max (linux): # /proc/sys/net/core/rmem_max - conn.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, socket_bufsize) + conn.setsockopt( + socket.SOL_SOCKET, socket.SO_RCVBUF, socket_bufsize + ) try: logger.debug(f"IMDClient: Connecting to {host}:{port}") conn.connect((host, port)) @@ -292,6 +300,17 @@ def _go(self): self._conn.sendall(go) logger.debug("IMDClient: Sent go packet to server") + if self._continue_after_disconnect is not None: + wait_behavior = (int)(not self._continue_after_disconnect) + wait_packet = create_header_bytes( + IMDHeaderType.IMD_WAIT, wait_behavior + ) + self._conn.sendall(wait_packet) + logger.debug( + "IMDClient: Attempted to change wait behavior to ", + not self._continue_after_disconnect + ) + def _disconnect(self): # MUST disconnect before stopping execution # if simulation already ended, this method will do nothing @@ -499,7 +518,14 @@ def _read(self, buf): class IMDProducerV2(BaseIMDProducer): def __init__( - self, conn, buffer, sinfo, n_atoms, multithreaded, error_queue, **kwargs + self, + conn, + buffer, + sinfo, + n_atoms, + multithreaded, + error_queue, + **kwargs, ): super(IMDProducerV2, self).__init__( conn, buffer, sinfo, n_atoms, multithreaded, error_queue, **kwargs @@ -762,7 +788,9 @@ def __init__( raise ValueError("pause_empty_proportion must be between 0 and 1") self._pause_empty_proportion = pause_empty_proportion if unpause_empty_proportion < 0 or unpause_empty_proportion > 1: - raise ValueError("unpause_empty_proportion must be between 0 and 1") + raise ValueError( + "unpause_empty_proportion must be between 0 and 1" + ) self._unpause_empty_proportion = unpause_empty_proportion if buffer_size <= 0: @@ -829,7 +857,9 @@ def wait_for_space(self): logger.debug("IMDProducer: Noticing consumer finished") raise EOFError except Exception as e: - logger.debug(f"IMDProducer: Error waiting for space in buffer: {e}") + logger.debug( + f"IMDProducer: Error waiting for space in buffer: {e}" + ) def pop_empty_imdframe(self): logger.debug("IMDProducer: Getting empty frame") @@ -875,7 +905,9 @@ def pop_full_imdframe(self): imdf = self._full_q.get() else: with self._full_imdf_avail: - while self._full_q.qsize() == 0 and not self._producer_finished: + while ( + self._full_q.qsize() == 0 and not self._producer_finished + ): self._full_imdf_avail.wait() if self._producer_finished and self._full_q.qsize() == 0: diff --git a/imdclient/IMDProtocol.py b/imdclient/IMDProtocol.py index 97a9bf1..68caf81 100644 --- a/imdclient/IMDProtocol.py +++ b/imdclient/IMDProtocol.py @@ -34,6 +34,7 @@ class IMDHeaderType(Enum): IMD_BOX = 13 IMD_VELOCITIES = 14 IMD_FORCES = 15 + IMD_WAIT = 16 def parse_energy_bytes(data, endianness): diff --git a/imdclient/tests/base.py b/imdclient/tests/base.py index 08ef9a1..d619e44 100644 --- a/imdclient/tests/base.py +++ b/imdclient/tests/base.py @@ -208,3 +208,28 @@ def test_compare_imd_to_true_traj( imd_u.trajectory[i - first_frame].forces, atol=1e-03, ) + + def test_continue_after_disconnect( + self, docker_client, topol, tmp_path, port + ): + u = mda.Universe( + (tmp_path / topol), + f"imd://localhost:{port}", + continue_after_disconnect=True, + # Make sure LAMMPS topol can be read + # Does nothing if not LAMMPS + atom_style="id type x y z", + ) + # Though we disconnect here, the simulation should continue + u.trajectory.close() + # Wait for the simulation to finish running + time.sleep(45) + + # Now, attempt to reconnect- should fail, + # since the simulation should have continued + with pytest.raises(IOError): + u = mda.Universe( + (tmp_path / topol), + f"imd://localhost:{port}", + atom_style="id type x y z", + ) diff --git a/imdclient/tests/test_imdclient.py b/imdclient/tests/test_imdclient.py index c8fe9b2..b01bc5b 100644 --- a/imdclient/tests/test_imdclient.py +++ b/imdclient/tests/test_imdclient.py @@ -149,6 +149,21 @@ def test_pause_resume_no_disconnect(self, server_client_two_frame_buf): # server should receive disconnect from client (though it doesn't have to do anything) server.expect_packet(IMDHeaderType.IMD_DISCONNECT) + @pytest.mark.parametrize("cont", [True, False]) + def test_continue_after_disconnect(self, universe, imdsinfo, port, cont): + server = InThreadIMDServer(universe.trajectory) + server.set_imdsessioninfo(imdsinfo) + server.handshake_sequence("localhost", port, first_frame=False) + client = IMDClient( + f"localhost", + port, + universe.trajectory.n_atoms, + continue_after_disconnect=cont, + ) + server.expect_packet( + IMDHeaderType.IMD_WAIT, expected_length=(int)(not cont) + ) + class TestIMDClientV3ContextManager: @pytest.fixture From e65fa60df86c0a7bffa3d272cb76a18fa3eb73dc Mon Sep 17 00:00:00 2001 From: ljwoods2 Date: Tue, 10 Dec 2024 17:15:47 -0700 Subject: [PATCH 2/4] linting --- imdclient/IMDClient.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/imdclient/IMDClient.py b/imdclient/IMDClient.py index 95b0b94..fa121b5 100644 --- a/imdclient/IMDClient.py +++ b/imdclient/IMDClient.py @@ -307,7 +307,7 @@ def _go(self): ) self._conn.sendall(wait_packet) logger.debug( - "IMDClient: Attempted to change wait behavior to ", + "IMDClient: Attempted to change wait behavior to %s", not self._continue_after_disconnect ) From 7a59dc9fc3d3621a5a08bdc8bb7531cacf812a1c Mon Sep 17 00:00:00 2001 From: ljwoods2 Date: Tue, 10 Dec 2024 17:26:41 -0700 Subject: [PATCH 3/4] remove race condition, add simulation engine test --- imdclient/tests/base.py | 21 +++++++++++++++++++++ imdclient/tests/server.py | 13 ++----------- imdclient/tests/test_imdclient.py | 3 +++ imdclient/tests/test_imdreader.py | 4 +++- 4 files changed, 29 insertions(+), 12 deletions(-) diff --git a/imdclient/tests/base.py b/imdclient/tests/base.py index d619e44..41ebfd7 100644 --- a/imdclient/tests/base.py +++ b/imdclient/tests/base.py @@ -233,3 +233,24 @@ def test_continue_after_disconnect( f"imd://localhost:{port}", atom_style="id type x y z", ) + + def test_wait_after_disconnect(self, docker_client, topol, tmp_path, port): + u = mda.Universe( + (tmp_path / topol), + f"imd://localhost:{port}", + # Could also use None here- just being explicit + continue_after_disconnect=False, + # Make sure LAMMPS topol can be read + # Does nothing if not LAMMPS + atom_style="id type x y z", + ) + u.trajectory.close() + # Give the simulation engine + # enough time to finish running (though it shouldn't) + time.sleep(45) + + u = mda.Universe( + (tmp_path / topol), + f"imd://localhost:{port}", + atom_style="id type x y z", + ) diff --git a/imdclient/tests/server.py b/imdclient/tests/server.py index e55cf54..9dcbe1a 100644 --- a/imdclient/tests/server.py +++ b/imdclient/tests/server.py @@ -93,7 +93,7 @@ def _send_handshakeV3(self): positions, wrapped_coords, velocities, - forces + forces, ) logger.debug(f"InThreadIMDServer: Sending session info") self.conn.sendall(sinfo) @@ -101,14 +101,6 @@ def _send_handshakeV3(self): def join_accept_thread(self): self.accept_thread.join() - def _expect_go(self): - logger.debug(f"InThreadIMDServer: Waiting for go") - head_buf = bytearray(IMDHEADERSIZE) - read_into_buf(self.conn, head_buf) - header = IMDHeader(head_buf) - if header.type != IMDHeaderType.IMD_GO: - raise ValueError("Expected IMD_GO packet, got something else") - def send_frames(self, start, end): for i in range(start, end): self.send_frame(i) @@ -126,7 +118,7 @@ def send_frame(self, i): ) self.conn.sendall(time_header + time) - + if self.imdsinfo.energies: energy_header = create_header_bytes(IMDHeaderType.IMD_ENERGIES, 1) energies = create_energy_bytes( @@ -183,7 +175,6 @@ def send_frame(self, i): self.conn.sendall(force_header + force) - def expect_packet(self, packet_type, expected_length=None): head_buf = bytearray(IMDHEADERSIZE) read_into_buf(self.conn, head_buf) diff --git a/imdclient/tests/test_imdclient.py b/imdclient/tests/test_imdclient.py index b01bc5b..aa33708 100644 --- a/imdclient/tests/test_imdclient.py +++ b/imdclient/tests/test_imdclient.py @@ -69,6 +69,7 @@ def server_client_two_frame_buf(self, universe, imdsinfo, port): buffer_size=imdframe_memsize(universe.trajectory.n_atoms, imdsinfo) * 2, ) + server.join_accept_thread() yield server, client client.stop() server.cleanup() @@ -84,6 +85,7 @@ def server_client(self, universe, imdsinfo, port, request): port, universe.trajectory.n_atoms, ) + server.join_accept_thread() yield server, client client.stop() server.cleanup() @@ -160,6 +162,7 @@ def test_continue_after_disconnect(self, universe, imdsinfo, port, cont): universe.trajectory.n_atoms, continue_after_disconnect=cont, ) + server.join_accept_thread() server.expect_packet( IMDHeaderType.IMD_WAIT, expected_length=(int)(not cont) ) diff --git a/imdclient/tests/test_imdreader.py b/imdclient/tests/test_imdreader.py index 4312e27..046a710 100644 --- a/imdclient/tests/test_imdreader.py +++ b/imdclient/tests/test_imdreader.py @@ -171,7 +171,9 @@ def test_total_time(self, reader, ref): decimal=ref.prec, ) - @pytest.mark.skip(reason="Stream-based reader can only be read iteratively") + @pytest.mark.skip( + reason="Stream-based reader can only be read iteratively" + ) def test_changing_dimensions(self, ref, reader): if ref.changing_dimensions: reader.rewind() From 5a41ddf0a8d1b4f72d56826adaf9e08f949aca5e Mon Sep 17 00:00:00 2001 From: ljwoods2 Date: Tue, 10 Dec 2024 17:40:31 -0700 Subject: [PATCH 4/4] fix del error message --- imdclient/IMD.py | 4 +++- imdclient/IMDClient.py | 3 --- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/imdclient/IMD.py b/imdclient/IMD.py index e9d7062..5940f91 100644 --- a/imdclient/IMD.py +++ b/imdclient/IMD.py @@ -50,6 +50,7 @@ def __init__( ): super(IMDReader, self).__init__(filename, **kwargs) + self._imdclient = None logger.debug("IMDReader initializing") if n_atoms is None: @@ -125,6 +126,7 @@ def _format_hint(thing): def close(self): """Gracefully shut down the reader. Stops the producer thread.""" logger.debug("IMDReader close() called") - self._imdclient.stop() + if self._imdclient is not None: + self._imdclient.stop() # NOTE: removeme after testing logger.debug("IMDReader shut down gracefully.") diff --git a/imdclient/IMDClient.py b/imdclient/IMDClient.py index fa121b5..bb0d461 100644 --- a/imdclient/IMDClient.py +++ b/imdclient/IMDClient.py @@ -739,9 +739,6 @@ def _parse_imdframe(self): ).reshape((self._n_atoms, 3)), ) - def __del__(self): - logger.debug("IMDProducer: I am being deleted") - class IMDFrameBuffer: """