Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
252 changes: 52 additions & 200 deletions crates/debugger/src/debugger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
use retry::{delay::Exponential, retry};
use server::Implementation;
use transport::{
DEFAULT_DAP_PORT, Message, Reader, TransportConnection,
DEFAULT_DAP_PORT, Reader, TransportConnection,
requests::{self, Disconnect},
responses,
types::{BreakpointLocation, StackFrameId, Variable},
Expand Down Expand Up @@ -133,7 +133,7 @@

let args: InitialiseArguments = initialise_arguments.into();
let internals_rx = rx.clone();
let (mut internals, events) = match &args {
let (mut internals, reader, message_tx) = match &args {
InitialiseArguments::Launch(state::LaunchArguments {
program, language, ..
}) => {
Expand All @@ -143,7 +143,6 @@
program.display()
);

// let implementation = language.into();
let implementation: Implementation = match language {
crate::Language::DebugPy => Implementation::Debugpy,
crate::Language::Delve => Implementation::Delve,
Expand All @@ -156,255 +155,108 @@
.context("connecting to server")?;

// Split the connection into reader and writer to avoid mutex contention
let (mut reader, writer, sequence_number) = connection.split_connection();
let (reader, writer, sequence_number) = connection.split_connection();

let (ttx, trx) = crossbeam_channel::unbounded();
// Create message channel for backward compatibility with send()
let (message_tx, message_rx) = crossbeam_channel::unbounded();

// Wrap writer in Arc<Mutex<>> for shared access
let writer_arc = Arc::new(Mutex::new(writer));

// Spawn polling thread with direct ownership of the reader (no mutex needed)
thread::spawn(move || {
loop {
match reader.poll_message() {
Ok(Some(message)) => {
tracing::debug!(?message, "received message in polling thread");
// Forward events to event channel
if let Message::Event(ref event) = message {
if ttx.send(event.clone()).is_err() {
tracing::debug!(
"event channel closed, terminating polling thread"
);
break;
}
}
// Forward ALL messages to the message channel
if message_tx.send(message).is_err() {
tracing::debug!(
"message channel closed, terminating polling thread"
);
break;
}
}
Ok(None) => {
tracing::debug!("connection closed, terminating polling thread");
break;
}
Err(e) => {
tracing::error!(error = %e, "error receiving message in polling thread, terminating");
break;
}
}
}
tracing::debug!("polling thread terminated");
});
// Background thread will own the reader and send messages to message_rx

let internals = DebuggerInternals::from_split_connection(
writer_arc,
sequence_number,
tx,
message_rx,
message_rx.clone(),
Some(s),
);
(internals, trx)
(internals, reader, message_tx)
}
InitialiseArguments::Attach(_) => {
let connection = TransportConnection::connect(format!("127.0.0.1:{port}"))
.context("connecting to server")?;

// Split the connection into reader and writer to avoid mutex contention
let (mut reader, writer, sequence_number) = connection.split_connection();
let (reader, writer, sequence_number) = connection.split_connection();

let (ttx, trx) = crossbeam_channel::unbounded();
// Create message channel for backward compatibility with send()
let (message_tx, message_rx) = crossbeam_channel::unbounded();

// Wrap writer in Arc<Mutex<>> for shared access
let writer_arc = Arc::new(Mutex::new(writer));

// Spawn polling thread with direct ownership of the reader (no mutex needed)
thread::spawn(move || {
loop {
match reader.poll_message() {
Ok(Some(message)) => {
tracing::debug!(?message, "received message in polling thread");
// Forward events to event channel
if let Message::Event(ref event) = message {
if ttx.send(event.clone()).is_err() {
tracing::debug!(
"event channel closed, terminating polling thread"
);
break;
}
}
// Forward ALL messages to the message channel
if message_tx.send(message).is_err() {
tracing::debug!(
"message channel closed, terminating polling thread"
);
break;
}
}
Ok(None) => {
tracing::debug!("connection closed, terminating polling thread");
break;
}
Err(e) => {
tracing::error!(error = %e, "error receiving message in polling thread, terminating");
break;
}
}
}
tracing::debug!("polling thread terminated");
});
// Background thread will own the reader and send messages to message_rx

let internals = DebuggerInternals::from_split_connection(
writer_arc,
sequence_number,
tx,
message_rx,
message_rx.clone(),
None,
);
(internals, trx)
(internals, reader, message_tx)
}
};

internals.initialise(args).context("initialising")?;

let internals = Arc::new(Mutex::new(internals));

// Create command channel for main thread -> background thread communication
let (command_tx, command_rx) = crossbeam_channel::unbounded();

Check warning on line 204 in crates/debugger/src/debugger.rs

View workflow job for this annotation

GitHub Actions / clippy

unused variable: `command_rx`

warning: unused variable: `command_rx` --> crates/debugger/src/debugger.rs:204:26 | 204 | let (command_tx, command_rx) = crossbeam_channel::unbounded(); | ^^^^^^^^^^ help: if this is intentional, prefix it with an underscore: `_command_rx` | = note: `#[warn(unused_variables)]` (part of `#[warn(unused)]`) on by default

// background thread reading transport events and commands
let background_internals = Arc::clone(&internals);
let background_events = events.clone();
// Start background thread FIRST (it only needs reader and message_tx, not internals)
// This ensures messages are being polled when initialise() is called
thread::spawn(move || {
use crate::internals::FollowUpRequest;

let mut follow_up_queue: Vec<FollowUpRequest> = Vec::new();
let mut reader = reader;

loop {
crossbeam_channel::select! {
recv(background_events) -> msg => {
let event = match msg {
Ok(event) => event,
Err(_) => {
tracing::debug!("event channel closed, terminating background thread");
break;
}
};

let lock_id = Uuid::new_v4().to_string();
let span = tracing::trace_span!("event", %lock_id);
let _guard = span.enter();

tracing::trace!(is_poisoned = %background_internals.is_poisoned(), "trying to unlock background internals");

match background_internals.lock() {
Ok(mut internals) => {
tracing::trace!(?event, "handling event");

// Use non-blocking event processing
let follow_ups = internals.on_event_nonblocking(event);
follow_up_queue.extend(follow_ups);

drop(internals);
tracing::trace!("unlocked background internals");
}
Err(e) => {
tracing::error!(error = %e, "mutex poisoned, terminating background thread");
break;
}
// Poll transport for messages (blocking with short timeout)
match reader.poll_message() {
Ok(Some(message)) => {
tracing::debug!(?message, "received message from transport");

// Forward ALL messages to message channel (for internals.send())
// TODO: Event processing needs to be added back
// Events need to be processed by internals.on_event_nonblocking()
// which converts transport::Event to state::Event
if message_tx.send(message).is_err() {
tracing::debug!(
"message channel closed, terminating background thread"
);
break;
}
}
recv(command_rx) -> msg => {
let command = match msg {
Ok(command) => command,
Err(_) => {
tracing::debug!("command channel closed, terminating background thread");
break;
}
};

let lock_id = Uuid::new_v4().to_string();
let span = tracing::trace_span!("command", %lock_id);
let _guard = span.enter();

match command {
Command::SendRequest { body, response_tx } => {
tracing::trace!(?body, "handling send request command");
match background_internals.lock() {
Ok(mut internals) => {
match internals.send(body) {
Ok(response) => {
let _ = response_tx.send(Ok(response));
}
Err(e) => {
let _ = response_tx.send(Err(e));
}
}
drop(internals);
}
Err(e) => {
let _ = response_tx.send(Err(eyre::eyre!("mutex poisoned: {}", e)));
}
}
}
Command::SendExecute { body, response_tx } => {
tracing::trace!(?body, "handling send execute command");
match background_internals.lock() {
Ok(mut internals) => {
match internals.execute(body) {
Ok(()) => {
let _ = response_tx.send(Ok(()));
}
Err(e) => {
let _ = response_tx.send(Err(e));
}
}
drop(internals);
}
Err(e) => {
let _ = response_tx.send(Err(eyre::eyre!("mutex poisoned: {}", e)));
}
}
}
Command::Shutdown => {
tracing::debug!("received shutdown command");
break;
}
}
Ok(None) => {
tracing::debug!("connection closed, terminating background thread");
break;
}
}

// Process follow-up requests
while let Some(follow_up) = follow_up_queue.pop() {
match background_internals.lock() {
Ok(mut internals) => {
let body = follow_up.to_request_body();
match internals.send(body) {
Ok(response) => {
let more_follow_ups =
internals.on_follow_up_response(follow_up, response);
follow_up_queue.extend(more_follow_ups);
}
Err(e) => {
tracing::error!(error = %e, "failed to send follow-up request");
}
Err(e) => {
// Check if it's a timeout/would-block error
if let Some(io_error) = e.downcast_ref::<std::io::Error>() {
if io_error.kind() == std::io::ErrorKind::WouldBlock
|| io_error.kind() == std::io::ErrorKind::TimedOut
{
// Expected timeout, continue
} else {
tracing::error!(error = %e, "error receiving message, terminating");
break;
}
drop(internals);
}
Err(e) => {
tracing::error!(error = %e, "mutex poisoned while processing follow-up");
} else {
tracing::error!(error = %e, "error receiving message, terminating");
break;
}
}
}
}
tracing::debug!("background thread terminated");
tracing::debug!("message forwarding thread terminated");
});

// Initialize AFTER starting message forwarding thread
// This ensures messages are being polled when send() waits for responses
internals.initialise(args).context("initialising")?;

// Now wrap in Arc<Mutex<>> for thread-safe access from other parts
let internals = Arc::new(Mutex::new(internals));

Ok(Self {
internals,
rx: internals_rx,
Expand Down
4 changes: 2 additions & 2 deletions crates/debugger/src/internals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@
Self::with_breakpoints(connection, publisher, message_rx, HashMap::new(), server)
}

/// Legacy constructor - use from_split_connection_no_channel instead
#[allow(dead_code)]
pub(crate) fn from_split_connection(
writer: Arc<Mutex<Box<dyn Write + Send>>>,
sequence_number: Arc<AtomicI64>,
Expand All @@ -148,8 +150,7 @@
///
/// This is used when the background thread owns the reader directly
/// and doesn't need the message_rx channel.
#[allow(dead_code)] // Used in PR 2b
pub(crate) fn from_split_connection_no_channel(

Check warning on line 153 in crates/debugger/src/internals.rs

View workflow job for this annotation

GitHub Actions / clippy

associated items `from_split_connection_no_channel`, `on_event_nonblocking`, `on_follow_up_response`, `handle_stack_trace_response`, `handle_scopes_response`, and `handle_variables_response` are never used

warning: associated items `from_split_connection_no_channel`, `on_event_nonblocking`, `on_follow_up_response`, `handle_stack_trace_response`, `handle_scopes_response`, and `handle_variables_response` are never used --> crates/debugger/src/internals.rs:153:19 | 103 | impl DebuggerInternals { | ---------------------- associated items in this implementation ... 153 | pub(crate) fn from_split_connection_no_channel( | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ ... 692 | pub(crate) fn on_event_nonblocking( | ^^^^^^^^^^^^^^^^^^^^ ... 737 | pub(crate) fn on_follow_up_response( | ^^^^^^^^^^^^^^^^^^^^^ ... 755 | fn handle_stack_trace_response( | ^^^^^^^^^^^^^^^^^^^^^^^^^^^ ... 837 | fn handle_scopes_response( | ^^^^^^^^^^^^^^^^^^^^^^ ... 863 | fn handle_variables_response( | ^^^^^^^^^^^^^^^^^^^^^^^^^
writer: Arc<Mutex<Box<dyn Write + Send>>>,
sequence_number: Arc<AtomicI64>,
publisher: crossbeam_channel::Sender<Event>,
Expand All @@ -171,7 +172,6 @@
///
/// This sends the request but doesn't wait for a response. The caller
/// is responsible for tracking the sequence number and matching responses.
#[allow(dead_code)] // Used in PR 2b
pub(crate) fn send_request(&mut self, body: requests::RequestBody) -> eyre::Result<i64> {
tracing::debug!(?body, "internals.send_request called");

Expand Down
4 changes: 0 additions & 4 deletions crates/debugger/src/pending_requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,13 @@
///
/// This structure maintains a map of sequence numbers to response channels.
/// When a response arrives, it can be matched to the waiting request.
#[allow(dead_code)] // Used in PR 2b
pub(crate) struct PendingRequests {
pending: HashMap<Seq, oneshot::Sender<responses::Response>>,
}

impl PendingRequests {
/// Create a new pending requests tracker
#[allow(dead_code)] // Used in PR 2b
pub(crate) fn new() -> Self {

Check warning on line 21 in crates/debugger/src/pending_requests.rs

View workflow job for this annotation

GitHub Actions / clippy

associated items `new`, `add`, and `handle_response` are never used

warning: associated items `new`, `add`, and `handle_response` are never used --> crates/debugger/src/pending_requests.rs:21:19 | 19 | impl PendingRequests { | -------------------- associated items in this implementation 20 | /// Create a new pending requests tracker 21 | pub(crate) fn new() -> Self { | ^^^ ... 30 | pub(crate) fn add(&mut self, seq: Seq) -> oneshot::Receiver<responses::Response> { | ^^^ ... 40 | pub(crate) fn handle_response(&mut self, response: responses::Response) -> bool { | ^^^^^^^^^^^^^^^
Self {
pending: HashMap::new(),
}
Expand All @@ -29,7 +27,6 @@
/// Add a pending request
///
/// Returns the response receiver that will receive the response when it arrives
#[allow(dead_code)] // Used in PR 2b
pub(crate) fn add(&mut self, seq: Seq) -> oneshot::Receiver<responses::Response> {
let (tx, rx) = oneshot::channel();
self.pending.insert(seq, tx);
Expand All @@ -40,7 +37,6 @@
///
/// If this response matches a pending request, sends it to the waiter and returns true.
/// Otherwise returns false.
#[allow(dead_code)] // Used in PR 2b
pub(crate) fn handle_response(&mut self, response: responses::Response) -> bool {
if let Some(tx) = self.pending.remove(&response.request_seq) {
let _ = tx.send(response);
Expand Down
Loading