Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ downloads/
eggs/
.eggs/
lib/
!web-ui/src/lib/
!web-ui/src/lib/**
lib64/
parts/
sdist/
Expand Down Expand Up @@ -69,4 +71,4 @@ config.local.json

# Temporary files
*.tmp
*.temp
*.temp
5 changes: 5 additions & 0 deletions configs/neurorift_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -175,5 +175,10 @@
"SimStudioAI (conceptual orchestration)",
"Anti-Gravity AI"
]
},
"self_evolution": {
"enabled": true,
"require_human_approval": true,
"sandbox_branch": "ai-evolution"
}
}
158 changes: 155 additions & 3 deletions core/neurorift-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ impl NeuroRiftCore {
}

/// Queue a task in the active session
pub fn queue_task(&self, tool_name: String, target: String, args: serde_json::Value) -> Result<()> {
pub fn queue_task(&self, tool_name: String, target: String, args: serde_json::Value) -> Result<Option<String>> {
if let Some(session) = self.get_active_session() {
let mut session = session.write();
let args_map = args.as_object()
Expand All @@ -177,12 +177,134 @@ impl NeuroRiftCore {

// Get the task that was just added
if let Some(task) = session.task_queue.back() {
let task_id = task.id.clone();
self.ws_server.broadcast(WSEvent::TaskQueued {
task: task.clone(),
});
return Ok(Some(task_id));
}
}

Ok(None)
}

/// Cancel a queued/running task.
pub fn cancel_task(&self, task_id: &str) -> Result<()> {
if let Some(session) = self.get_active_session() {
let mut session = session.write();
if let Some(task) = session.task_queue.iter_mut().find(|task| task.id == task_id) {
task.status = crate::state::TaskStatus::Cancelled;
task.completed_at = Some(chrono::Utc::now());
session.touch();
self.ws_server.broadcast(WSEvent::TaskCancelled {
task_id: task_id.to_string(),
timestamp: chrono::Utc::now(),
});
}
}
Ok(())
}

/// Execute a task by ID through the Python bridge.
pub async fn execute_task(&self, task_id: &str) -> Result<()> {
let (session_id, tool_name, target, args, mode) = if let Some(session) = self.get_active_session() {
let mut session = session.write();
let session_id = session.id.clone();
let mode = session.mode;

if let Some(task) = session.task_queue.iter_mut().find(|task| task.id == task_id) {
task.status = crate::state::TaskStatus::Running;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: A task that has already been marked as cancelled can still be executed because execute_task unconditionally transitions it to Running and dispatches it to the Python bridge, so a user-issued cancel may be ignored if it races with execution startup. [logic error]

Severity Level: Critical 🚨
- ❌ Cancelled tools still execute via Python bridge executor.
- ⚠️ WebSocket TaskCancelled and TaskStarted events conflict for task.
- ⚠️ Tools page may show cancelled yet still-running tasks.
- ⚠️ User intent to abort long tools not reliably honored.
Suggested change
task.status = crate::state::TaskStatus::Running;
if task.status == crate::state::TaskStatus::Cancelled {
// Do not execute tasks that were cancelled before starting
return Ok(());
}
Steps of Reproduction ✅
1. Run the core binary (`core/neurorift-core/src/main.rs:7-8`) so that the WebSocket
server and command listener are active.

2. From a WebSocket client (e.g., Web Mode tools UI as described in the PR), send a
`QueueTask` event over WS matching `WSEvent::QueueTask`
(`core/neurorift-core/src/websocket/events.rs:144-148`); the server forwards it via
`WebSocketServer::handle_connection` (`websocket/mod.rs:76-85`) into the broadcast
channel.

3. Observe in `main.rs:70-126` that the command loop matches `QueueTask { tool_name,
target, args }` and calls `core_cmd.queue_task(...)`; on `Ok(Some(task_id))` it spawns
`core_exec.execute_task(&task_id)` in a new Tokio task (`main.rs:110-119`).

4. Immediately after the task is queued but before or while `execute_task` starts, send a
`CancelTask` event for the same `task_id` (matching `WSEvent::CancelTask` at
`events.rs:149-151`), which the command loop handles at `main.rs:127-131` by calling
`core_cmd.cancel_task(&task_id)`. This sets `task.status = TaskStatus::Cancelled` and
broadcasts `TaskCancelled` (`lib.rs:192-205`).

5. When the previously spawned `execute_task` runs, it calls `get_active_session()` and
locks the session (`lib.rs:210-213`), then executes the `if let Some(task) =
session.task_queue.iter_mut().find(...)` block at `lib.rs:215-225`, which unconditionally
sets `task.status = TaskStatus::Running` and broadcasts `TaskStarted` even if the task was
marked `Cancelled` by `cancel_task` in step 4.

6. The command then proceeds to build the Python `tool_execute` payload and call
`self.python_bridge.execute(command).await` (`lib.rs:233-242`), causing the cancelled task
to actually execute via the Python bridge (`python_bridge/mod.rs:40-52`), despite the user
having requested cancellation.
Prompt for AI Agent 🤖
This is a comment left during a code review.

**Path:** core/neurorift-core/src/lib.rs
**Line:** 216:216
**Comment:**
	*Logic Error: A task that has already been marked as cancelled can still be executed because `execute_task` unconditionally transitions it to `Running` and dispatches it to the Python bridge, so a user-issued cancel may be ignored if it races with execution startup.

Validate the correctness of the flagged issue. If correct, How can I resolve this? If you propose a fix, implement it and please make it concise.
👍 | 👎

task.started_at = Some(chrono::Utc::now());
let args = serde_json::Value::Object(task.args.clone().into_iter().collect());
let tool_name = task.tool_name.clone();
let target = task.target.clone();
self.ws_server.broadcast(WSEvent::TaskStarted {
task_id: task.id.clone(),
started_at: chrono::Utc::now(),
});
(session_id, tool_name, target, args, mode)
} else {
return Ok(());
}
} else {
return Ok(());
};

let command = serde_json::json!({
"type": "tool_execute",
"tool": tool_name,
"target": target,
"args": args,
"mode": format!("{:?}", mode).to_uppercase(),
"session_id": session_id,
});

let result = self.python_bridge.execute(command).await;

if let Some(session) = self.get_active_session() {
let mut session = session.write();
if let Some(task) = session.task_queue.iter_mut().find(|task| task.id == task_id) {
Comment on lines +244 to +246
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Bind task completion updates to the originating session

execute_task captures the task's session_id before awaiting Python, but completion handling re-reads self.get_active_session(); if an operator switches sessions while a long-running tool is in flight, status/result updates are applied to the wrong session (or dropped when the task ID is not found), leaving the original task stuck in running and unsaved. Resolve completion and persistence against the captured session ID instead of the current active session.

Useful? React with 👍 / 👎.

match result {
Ok(data) => {
let raw_output = data
.get("raw_output")
.and_then(|v| v.as_str())
.unwrap_or_default()
.to_string();
let success = data
.get("status")
.and_then(|v| v.as_str())
.map(|s| s.eq_ignore_ascii_case("completed"))
.unwrap_or(true);

task.status = if success {
crate::state::TaskStatus::Completed
} else {
crate::state::TaskStatus::Failed
};
task.completed_at = Some(chrono::Utc::now());
session.touch();

self.ws_server.broadcast(WSEvent::TaskOutput {
task_id: task_id.to_string(),
chunk: raw_output.clone(),
});

if success {
self.ws_server.broadcast(WSEvent::TaskCompleted {
task_id: task_id.to_string(),
result: crate::websocket::events::TaskResult {
success: true,
output: raw_output,
structured_data: data.get("structured_output").cloned(),
duration_ms: (data.get("duration_seconds").and_then(|v| v.as_f64()).unwrap_or(0.0) * 1000.0) as u64,
},
});
} else {
self.ws_server.broadcast(WSEvent::TaskFailed {
task_id: task_id.to_string(),
error: data.get("error").and_then(|v| v.as_str()).unwrap_or("tool execution failed").to_string(),
});
}
}
Err(error) => {
task.status = crate::state::TaskStatus::Failed;
task.completed_at = Some(chrono::Utc::now());
session.touch();
self.ws_server.broadcast(WSEvent::TaskFailed {
task_id: task_id.to_string(),
error: error.to_string(),
});
}
}
Comment on lines +191 to +299
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Cancellation is state-only; running tasks can still execute and overwrite status.

cancel_task only flips the task status locally. execute_task doesn’t check for Cancelled before running (or before applying results), so a task can still execute and emit TaskCompleted/Failed after cancellation. Consider guarding against cancelled tasks and—if supported—signaling the Python bridge to stop in-flight execution.

🛠️ Suggested guard before execution
-            if let Some(task) = session.task_queue.iter_mut().find(|task| task.id == task_id) {
+            if let Some(task) = session.task_queue.iter_mut().find(|task| task.id == task_id) {
+                if task.status == crate::state::TaskStatus::Cancelled {
+                    return Ok(());
+                }
                 task.status = crate::state::TaskStatus::Running;
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@core/neurorift-core/src/lib.rs` around lines 191 - 299, cancel_task only
flips the local status but execute_task doesn't check for cancelled state so a
cancelled task can still run and overwrite status; update execute_task to (1)
check the task's status after finding it and before sending the execute command
(skip execution and broadcast WSEvent::TaskCancelled if status ==
TaskStatus::Cancelled), (2) also check again before applying the result (if
task.status == Cancelled, ignore the result and do not broadcast
Completed/Failed), and (3) if supported, add a cancellation handshake to the
python bridge (e.g., send a "cancel_task" command with task_id via python_bridge
or call a new python_bridge.cancel(task_id)) to stop in-flight execution;
reference the task.status field, cancel_task and execute_task functions,
python_bridge.execute, and WSEvent::TaskCancelled/TaskCompleted/TaskFailed to
locate code to change.

}
}

if let Some(session) = self.get_active_session() {
let session_id = session.read().id.clone();
let _ = self.save_session(&session_id);
Comment on lines +303 to +305
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: After a task completes, the core saves whichever session is currently active instead of the session that actually owned the task, so if the active session changes during execution the task's updated state may be written to the wrong session or not persisted at all. [logic error]

Severity Level: Major ⚠️
- ⚠️ Long-running tool updates may not persist for original session.
- ⚠️ Active session switching during tools causes mis-saved sessions.
- ⚠️ Session autosave behavior becomes misleading for multi-session users.
- ⚠️ Potentially stale task queues when reloading affected session.
Suggested change
if let Some(session) = self.get_active_session() {
let session_id = session.read().id.clone();
let _ = self.save_session(&session_id);
let _ = self.save_session(&session_id);
Steps of Reproduction ✅
1. Start the core service (`core/neurorift-core/src/main.rs:7-38`) so WebSocket and
command listener are running.

2. From a client, create or load session A using `CreateSession` or `LoadSession`
WebSocket events (`websocket/events.rs:127-140`), which `main.rs:73-83` handle by calling
`core_cmd.create_session` / `core_cmd.load_session`; this sets `active_session` to A
(`lib.rs:83-108` for creation and `lib.rs:118-136` for load in the final file, implied by
create/load logic).

3. Queue a long-running tool in session A via a `QueueTask` event (`events.rs:144-148`),
which `main.rs:110-126` maps to `core_cmd.queue_task(...)` and then spawns
`core_exec.execute_task(&task_id)` for the returned `task_id`.

4. While `execute_task` is awaiting the Python bridge (`lib.rs:233-242`), trigger a
session change to session B (e.g., loading or creating another session) via
`CreateSession` or `LoadSession` WS events; `main.rs:73-83` calls
`create_session`/`load_session`, which update `active_session` to B (see `lib.rs` session
management).

5. After the Python bridge returns, `execute_task` reaches its trailing persistence block
at `lib.rs:303-305`, calls `get_active_session()` again, and then
`save_session(&session_id)` using the *current* active session (B), not the originally
captured `session_id` for A.

6. As a result, only session B is persisted via `SessionManager::save_session` (called
inside `save_session`, see `lib.rs:142-156` in the final file), while the updated task
state for session A may not be flushed to disk immediately, leading to inconsistent or
stale task information for session A after restart.
Prompt for AI Agent 🤖
This is a comment left during a code review.

**Path:** core/neurorift-core/src/lib.rs
**Line:** 303:305
**Comment:**
	*Logic Error: After a task completes, the core saves whichever session is currently active instead of the session that actually owned the task, so if the active session changes during execution the task's updated state may be written to the wrong session or not persisted at all.

Validate the correctness of the flagged issue. If correct, How can I resolve this? If you propose a fix, implement it and please make it concise.
👍 | 👎

}
Comment on lines +244 to +306
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Use the originating session_id when applying results and saving.

After execution completes, you re-fetch the active session (Line 244 and Line 303). If the user switches sessions while the task runs, results can be written to the wrong session (or dropped). Use the captured session_id to look up and save the correct session.

✅ Suggested fix (use captured session_id)
-        if let Some(session) = self.get_active_session() {
-            let mut session = session.write();
+        if let Some(session_ref) = self.sessions.get(&session_id) {
+            let mut session = session_ref.value().write();
             if let Some(task) = session.task_queue.iter_mut().find(|task| task.id == task_id) {
                 match result {
                     Ok(data) => {
                         let raw_output = data
@@
                 }
             }
         }
 
-        if let Some(session) = self.get_active_session() {
-            let session_id = session.read().id.clone();
-            let _ = self.save_session(&session_id);
-        }
+        let _ = self.save_session(&session_id);
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if let Some(session) = self.get_active_session() {
let mut session = session.write();
if let Some(task) = session.task_queue.iter_mut().find(|task| task.id == task_id) {
match result {
Ok(data) => {
let raw_output = data
.get("raw_output")
.and_then(|v| v.as_str())
.unwrap_or_default()
.to_string();
let success = data
.get("status")
.and_then(|v| v.as_str())
.map(|s| s.eq_ignore_ascii_case("completed"))
.unwrap_or(true);
task.status = if success {
crate::state::TaskStatus::Completed
} else {
crate::state::TaskStatus::Failed
};
task.completed_at = Some(chrono::Utc::now());
session.touch();
self.ws_server.broadcast(WSEvent::TaskOutput {
task_id: task_id.to_string(),
chunk: raw_output.clone(),
});
if success {
self.ws_server.broadcast(WSEvent::TaskCompleted {
task_id: task_id.to_string(),
result: crate::websocket::events::TaskResult {
success: true,
output: raw_output,
structured_data: data.get("structured_output").cloned(),
duration_ms: (data.get("duration_seconds").and_then(|v| v.as_f64()).unwrap_or(0.0) * 1000.0) as u64,
},
});
} else {
self.ws_server.broadcast(WSEvent::TaskFailed {
task_id: task_id.to_string(),
error: data.get("error").and_then(|v| v.as_str()).unwrap_or("tool execution failed").to_string(),
});
}
}
Err(error) => {
task.status = crate::state::TaskStatus::Failed;
task.completed_at = Some(chrono::Utc::now());
session.touch();
self.ws_server.broadcast(WSEvent::TaskFailed {
task_id: task_id.to_string(),
error: error.to_string(),
});
}
}
}
}
if let Some(session) = self.get_active_session() {
let session_id = session.read().id.clone();
let _ = self.save_session(&session_id);
}
if let Some(session_ref) = self.sessions.get(&session_id) {
let mut session = session_ref.value().write();
if let Some(task) = session.task_queue.iter_mut().find(|task| task.id == task_id) {
match result {
Ok(data) => {
let raw_output = data
.get("raw_output")
.and_then(|v| v.as_str())
.unwrap_or_default()
.to_string();
let success = data
.get("status")
.and_then(|v| v.as_str())
.map(|s| s.eq_ignore_ascii_case("completed"))
.unwrap_or(true);
task.status = if success {
crate::state::TaskStatus::Completed
} else {
crate::state::TaskStatus::Failed
};
task.completed_at = Some(chrono::Utc::now());
session.touch();
self.ws_server.broadcast(WSEvent::TaskOutput {
task_id: task_id.to_string(),
chunk: raw_output.clone(),
});
if success {
self.ws_server.broadcast(WSEvent::TaskCompleted {
task_id: task_id.to_string(),
result: crate::websocket::events::TaskResult {
success: true,
output: raw_output,
structured_data: data.get("structured_output").cloned(),
duration_ms: (data.get("duration_seconds").and_then(|v| v.as_f64()).unwrap_or(0.0) * 1000.0) as u64,
},
});
} else {
self.ws_server.broadcast(WSEvent::TaskFailed {
task_id: task_id.to_string(),
error: data.get("error").and_then(|v| v.as_str()).unwrap_or("tool execution failed").to_string(),
});
}
}
Err(error) => {
task.status = crate::state::TaskStatus::Failed;
task.completed_at = Some(chrono::Utc::now());
session.touch();
self.ws_server.broadcast(WSEvent::TaskFailed {
task_id: task_id.to_string(),
error: error.to_string(),
});
}
}
}
}
let _ = self.save_session(&session_id);
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@core/neurorift-core/src/lib.rs` around lines 244 - 306, The code uses
self.get_active_session() twice which can pick a different session if the user
switches while the task runs; capture the originating session id from the first
session (e.g. let session_id = session.read().id.clone() before taking the write
lock or before any await) and then use that session_id to look up and modify the
same session (via your session lookup/get method) and to call
self.save_session(&session_id) at the end instead of re-calling
get_active_session(); update usages around get_active_session, session.write(),
and the final save to consistently use the captured session_id to ensure results
are applied to the originating session.


Ok(())
}

Expand All @@ -207,14 +329,29 @@ impl NeuroRiftCore {

/// Handle chat message
pub async fn chat(&self, message: String, model: Option<String>) -> Result<()> {
let session_id = self
.get_active_session()
.map(|s| s.read().id.clone())
.unwrap_or_else(|| "default".to_string());

// Forward to Python bridge
let cmd = serde_json::json!({
"type": "ai_generate",
"prompt": message,
"model": model
"model": model,
"session_id": session_id,
});

let data = self.python_bridge.execute(cmd).await?;
let data = match self.python_bridge.execute(cmd).await {
Ok(data) => data,
Err(error) => {
self.ws_server.broadcast(WSEvent::Error {
message: "AI generation failed".to_string(),
details: Some(error.to_string()),
});
return Ok(());
}
};

if let Some(text) = data.get("response").and_then(|v| v.as_str()) {
let model = data.get("model").and_then(|v| v.as_str()).unwrap_or("unknown").to_string();
Expand All @@ -229,6 +366,21 @@ impl NeuroRiftCore {
Ok(())
}

pub async fn get_ollama_status(&self) -> Result<()> {
let data = self.python_bridge.execute(serde_json::json!({
"type": "ai_status"
})).await?;

self.ws_server.broadcast(WSEvent::OllamaStatus {
available: data.get("available").and_then(|v| v.as_bool()).unwrap_or(false),
model: data.get("model").and_then(|v| v.as_str()).map(|s| s.to_string()),
model_count: data.get("model_count").and_then(|v| v.as_u64()).unwrap_or(0) as usize,
needs_pull: data.get("needs_pull").and_then(|v| v.as_bool()).unwrap_or(false),
});

Ok(())
}

/// Get Python bridge
pub fn python_bridge(&self) -> Arc<PythonBridge> {
self.python_bridge.clone()
Expand Down
27 changes: 26 additions & 1 deletion core/neurorift-core/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,24 @@ async fn main() -> Result<()> {
}
QueueTask { tool_name, target, args } => {
tracing::info!("Received QueueTask: {} -> {}", tool_name, target);
if let Err(e) = core_cmd.queue_task(tool_name, target, args) {
match core_cmd.queue_task(tool_name, target, args) {
Ok(Some(task_id)) => {
let core_exec = core_cmd.clone();
tokio::spawn(async move {
if let Err(e) = core_exec.execute_task(&task_id).await {
tracing::error!("Failed to execute task {}: {}", task_id, e);
}
});
Comment on lines +115 to +119
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Implement real task cancellation instead of state-only updates

Task execution is launched with tokio::spawn and the JoinHandle is discarded immediately, so cancel_task has nothing to abort and can only mutate task state. In practice, a canceled task keeps running in the Python bridge and may still emit task_failed/task_completed after task_cancelled, which breaks operator expectations and produces contradictory task lifecycle events.

Useful? React with 👍 / 👎.

}
Ok(None) => {}
Err(e) => {
tracing::error!("Failed to queue task: {}", e);
}
}
}
CancelTask { task_id } => {
tracing::info!("Received CancelTask: {}", task_id);
if let Err(e) = core_cmd.cancel_task(&task_id) {
tracing::error!("Failed to queue task: {}", e);
}
Comment on lines +127 to 131
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Fix incorrect error message in cancel path.

The cancellation error log still says “Failed to queue task,” which is misleading when debugging.

📝 Suggested fix
-                    if let Err(e) = core_cmd.cancel_task(&task_id) {
-                        tracing::error!("Failed to queue task: {}", e);
+                    if let Err(e) = core_cmd.cancel_task(&task_id) {
+                        tracing::error!("Failed to cancel task: {}", e);
                     }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
CancelTask { task_id } => {
tracing::info!("Received CancelTask: {}", task_id);
if let Err(e) = core_cmd.cancel_task(&task_id) {
tracing::error!("Failed to queue task: {}", e);
}
CancelTask { task_id } => {
tracing::info!("Received CancelTask: {}", task_id);
if let Err(e) = core_cmd.cancel_task(&task_id) {
tracing::error!("Failed to cancel task: {}", e);
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@core/neurorift-core/src/main.rs` around lines 127 - 131, The error message in
the CancelTask handling is incorrect: when CancelTask { task_id } calls
core_cmd.cancel_task(&task_id) and it returns Err(e), update the tracing::error
call to log a cancellation-specific message (e.g., "Failed to cancel task") and
include the error details and task_id; locate the CancelTask match arm and the
core_cmd.cancel_task call to change the tracing::error invocation accordingly.

}
Expand All @@ -122,6 +139,14 @@ async fn main() -> Result<()> {
}
});
}
GetOllamaStatus => {
let core_ai = core_cmd.clone();
tokio::spawn(async move {
if let Err(e) = core_ai.get_ollama_status().await {
tracing::error!("Failed to fetch Ollama status: {}", e);
}
});
}
_ => {} // Ignore other events
}
}
Expand Down
18 changes: 16 additions & 2 deletions core/neurorift-core/src/python_bridge/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,20 @@ pub struct PythonBridge {
}

impl PythonBridge {
fn extract_data(result: Value) -> Result<Value> {
let success = result.get("success").and_then(|v| v.as_bool()).unwrap_or(false);

if !success {
let error = result
.get("error")
.and_then(|v| v.as_str())
.unwrap_or("python bridge execution failed");
anyhow::bail!(error.to_string());
}

Ok(result.get("data").cloned().unwrap_or(Value::Null))
}

/// Create a new Python bridge
pub fn new(base_url: impl Into<String>) -> Self {
let client = Client::builder()
Expand All @@ -34,7 +48,7 @@ impl PythonBridge {
.await?;

let result = response.json::<Value>().await?;
Ok(result)
Self::extract_data(result)
}

/// Execute a tool
Expand All @@ -58,7 +72,7 @@ impl PythonBridge {
});

let result = self.execute(command).await?;

Ok(result["response"]
.as_str()
.unwrap_or("")
Expand Down
18 changes: 18 additions & 0 deletions core/neurorift-core/src/websocket/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,14 @@ pub enum WSEvent {
task_id: String,
error: String,
},
TaskOutput {
task_id: String,
chunk: String,
},
TaskCancelled {
task_id: String,
timestamp: DateTime<Utc>,
},

// Approval events
ApprovalRequired {
Expand Down Expand Up @@ -102,6 +110,12 @@ pub enum WSEvent {
active: bool,
url: Option<String>,
},
OllamaStatus {
available: bool,
model: Option<String>,
model_count: usize,
needs_pull: bool,
},

// Error events
Error {
Expand Down Expand Up @@ -132,6 +146,9 @@ pub enum WSEvent {
target: String,
args: serde_json::Value,
},
CancelTask {
task_id: String,
},
ApproveAction {
approval_id: String,
},
Expand All @@ -147,6 +164,7 @@ pub enum WSEvent {
message: String,
model: Option<String>,
},
GetOllamaStatus,
ChatResponse {
response: String,
model: String,
Expand Down
Loading