diff --git a/crates/db/.sqlx/query-b927dbda0106198b584520e535d4a28e09e91cd0971b52edcb279e6c736847f4.json b/crates/db/.sqlx/query-b927dbda0106198b584520e535d4a28e09e91cd0971b52edcb279e6c736847f4.json new file mode 100644 index 0000000000..f90f3fe246 --- /dev/null +++ b/crates/db/.sqlx/query-b927dbda0106198b584520e535d4a28e09e91cd0971b52edcb279e6c736847f4.json @@ -0,0 +1,50 @@ +{ + "db_name": "SQLite", + "query": "SELECT id as \"id!: Uuid\",\n workspace_id as \"workspace_id!: Uuid\",\n repo_id as \"repo_id!: Uuid\",\n target_branch,\n created_at as \"created_at!: DateTime\",\n updated_at as \"updated_at!: DateTime\"\n FROM workspace_repos\n WHERE rowid = $1", + "describe": { + "columns": [ + { + "name": "id!: Uuid", + "ordinal": 0, + "type_info": "Blob" + }, + { + "name": "workspace_id!: Uuid", + "ordinal": 1, + "type_info": "Blob" + }, + { + "name": "repo_id!: Uuid", + "ordinal": 2, + "type_info": "Blob" + }, + { + "name": "target_branch", + "ordinal": 3, + "type_info": "Text" + }, + { + "name": "created_at!: DateTime", + "ordinal": 4, + "type_info": "Text" + }, + { + "name": "updated_at!: DateTime", + "ordinal": 5, + "type_info": "Text" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [ + true, + false, + false, + false, + false, + false + ] + }, + "hash": "b927dbda0106198b584520e535d4a28e09e91cd0971b52edcb279e6c736847f4" +} diff --git a/crates/db/src/models/workspace_repo.rs b/crates/db/src/models/workspace_repo.rs index 60a408dfde..bfb4b4c97f 100644 --- a/crates/db/src/models/workspace_repo.rs +++ b/crates/db/src/models/workspace_repo.rs @@ -241,6 +241,23 @@ impl WorkspaceRepo { .await } + pub async fn find_by_rowid(pool: &SqlitePool, rowid: i64) -> Result, sqlx::Error> { + sqlx::query_as!( + WorkspaceRepo, + r#"SELECT id as "id!: Uuid", + workspace_id as "workspace_id!: Uuid", + repo_id as "repo_id!: Uuid", + target_branch, + created_at as "created_at!: DateTime", + updated_at as "updated_at!: DateTime" + FROM workspace_repos + WHERE rowid = $1"#, + rowid + ) + .fetch_optional(pool) + .await + } + /// Find repos for a workspace with their copy_files configuration. /// Uses LEFT JOIN so repos without project_repo entries still appear (with NULL copy_files). pub async fn find_repos_with_copy_files( diff --git a/crates/local-deployment/src/container.rs b/crates/local-deployment/src/container.rs index b19d4a3a20..ea1dab4d62 100644 --- a/crates/local-deployment/src/container.rs +++ b/crates/local-deployment/src/container.rs @@ -78,6 +78,8 @@ pub struct LocalContainerService { queued_message_service: QueuedMessageService, publisher: Result, notification_service: NotificationService, + workspace_repo_changes: + Arc>, } impl LocalContainerService { @@ -92,6 +94,9 @@ impl LocalContainerService { approvals: Approvals, queued_message_service: QueuedMessageService, publisher: Result, + workspace_repo_changes: Arc< + tokio::sync::broadcast::Sender, + >, ) -> Self { let child_store = Arc::new(RwLock::new(HashMap::new())); let interrupt_senders = Arc::new(RwLock::new(HashMap::new())); @@ -110,6 +115,7 @@ impl LocalContainerService { queued_message_service, publisher, notification_service, + workspace_repo_changes, }; container.spawn_workspace_cleanup(); @@ -1279,6 +1285,7 @@ impl ContainerService for LocalContainerService { base_commit: base_commit.clone(), stats_only, path_prefix: Some(repo.name.clone()), + workspace_repo_changes: self.workspace_repo_changes.clone(), }) .await?; diff --git a/crates/local-deployment/src/lib.rs b/crates/local-deployment/src/lib.rs index c48dc8615c..d17f3d649b 100644 --- a/crates/local-deployment/src/lib.rs +++ b/crates/local-deployment/src/lib.rs @@ -103,6 +103,8 @@ impl Deployment for LocalDeployment { // Create shared components for EventService let events_msg_store = Arc::new(MsgStore::new()); let events_entry_count = Arc::new(RwLock::new(0)); + let (workspace_repo_changes_tx, _) = tokio::sync::broadcast::channel(100); + let workspace_repo_changes = Arc::new(workspace_repo_changes_tx); // Create DB with event hooks let db = { @@ -110,6 +112,7 @@ impl Deployment for LocalDeployment { events_msg_store.clone(), events_entry_count.clone(), DBService::new().await?, // Temporary DB service for the hook + workspace_repo_changes.clone(), ); DBService::new_with_after_connect(hook).await? }; @@ -182,10 +185,16 @@ impl Deployment for LocalDeployment { approvals.clone(), queued_message_service.clone(), share_publisher.clone(), + workspace_repo_changes.clone(), ) .await; - let events = EventService::new(db.clone(), events_msg_store, events_entry_count); + let events = EventService::new( + db.clone(), + events_msg_store, + events_entry_count, + workspace_repo_changes, + ); let file_search_cache = Arc::new(FileSearchCache::new()); diff --git a/crates/services/src/services/diff_stream.rs b/crates/services/src/services/diff_stream.rs index 632369b2dd..4009a50df1 100644 --- a/crates/services/src/services/diff_stream.rs +++ b/crates/services/src/services/diff_stream.rs @@ -18,7 +18,7 @@ use notify_debouncer_full::{ }; use thiserror::Error; use tokio::{sync::mpsc, task::JoinHandle}; -use tokio_stream::wrappers::{IntervalStream, ReceiverStream}; +use tokio_stream::wrappers::ReceiverStream; use utils::{ diff::{self, Diff}, log_msg::LogMsg, @@ -26,6 +26,7 @@ use utils::{ use uuid::Uuid; use crate::services::{ + events::WorkspaceRepoChange, filesystem_watcher::{self, FilesystemWatcherError}, git::{Commit, DiffTarget, GitService, GitServiceError}, }; @@ -103,6 +104,7 @@ pub struct DiffStreamArgs { pub base_commit: Commit, pub stats_only: bool, pub path_prefix: Option, + pub workspace_repo_changes: Arc>, } struct DiffStreamManager { @@ -123,10 +125,9 @@ enum DiffEvent { pub async fn create(args: DiffStreamArgs) -> Result { let (tx, rx) = mpsc::channel::>(DIFF_STREAM_CHANNEL_CAPACITY); - let manager_args = args.clone(); let watcher_task = tokio::spawn(async move { - let mut manager = DiffStreamManager::new(manager_args, tx); + let mut manager = DiffStreamManager::new(args, tx); if let Err(e) = manager.run().await { tracing::error!("Diff stream manager failed: {e}"); let _ = manager.tx.send(Err(io::Error::other(e.to_string()))).await; @@ -167,8 +168,9 @@ impl DiffStreamManager { }; let _git_guard = git_debouncer; - let mut target_interval = - IntervalStream::new(tokio::time::interval(Duration::from_secs(1))); + let mut workspace_repo_rx = self.args.workspace_repo_changes.subscribe(); + let workspace_id = self.args.workspace_id; + let repo_id = self.args.repo_id; loop { let event = tokio::select! { @@ -179,7 +181,18 @@ impl DiffStreamManager { None => std::future::pending().await, } } => DiffEvent::GitStateChange, - _ = target_interval.next() => DiffEvent::CheckTarget, + result = workspace_repo_rx.recv() => { + match result { + Ok(change) if change.workspace_id == workspace_id && change.repo_id == repo_id => { + DiffEvent::CheckTarget + } + Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => { // Missed messages, check anyway to be safe + DiffEvent::CheckTarget + }, + Ok(_) => continue, // Change for a different workspace/repo + Err(tokio::sync::broadcast::error::RecvError::Closed) => break, // Channel closed + } + }, else => break, }; diff --git a/crates/services/src/services/events.rs b/crates/services/src/services/events.rs index 748371cd8d..d3eb876594 100644 --- a/crates/services/src/services/events.rs +++ b/crates/services/src/services/events.rs @@ -4,15 +4,22 @@ use db::{ DBService, models::{ execution_process::ExecutionProcess, project::Project, scratch::Scratch, task::Task, - workspace::Workspace, + workspace::Workspace, workspace_repo::WorkspaceRepo, }, }; use serde_json::json; use sqlx::{Error as SqlxError, Sqlite, SqlitePool, decode::Decode, sqlite::SqliteOperation}; -use tokio::sync::RwLock; +use tokio::sync::{RwLock, broadcast}; use utils::msg_store::MsgStore; use uuid::Uuid; +/// Notification sent when a workspace_repo's target_branch changes +#[derive(Debug, Clone)] +pub struct WorkspaceRepoChange { + pub workspace_id: Uuid, + pub repo_id: Uuid, +} + #[path = "events/patches.rs"] pub mod patches; #[path = "events/streams.rs"] @@ -31,15 +38,22 @@ pub struct EventService { db: DBService, #[allow(dead_code)] entry_count: Arc>, + workspace_repo_changes: Arc>, } impl EventService { /// Creates a new EventService that will work with a DBService configured with hooks - pub fn new(db: DBService, msg_store: Arc, entry_count: Arc>) -> Self { + pub fn new( + db: DBService, + msg_store: Arc, + entry_count: Arc>, + workspace_repo_changes: Arc>, + ) -> Self { Self { msg_store, db, entry_count, + workspace_repo_changes, } } @@ -82,6 +96,7 @@ impl EventService { msg_store: Arc, entry_count: Arc>, db_service: DBService, + workspace_repo_changes: Arc>, ) -> impl for<'a> Fn( &'a mut sqlx::sqlite::SqliteConnection, ) -> std::pin::Pin< @@ -93,6 +108,7 @@ impl EventService { let msg_store_for_hook = msg_store.clone(); let entry_count_for_hook = entry_count.clone(); let db_for_hook = db_service.clone(); + let workspace_repo_changes_for_hook = workspace_repo_changes.clone(); Box::pin(async move { let mut handle = conn.lock_handle().await?; let runtime_handle = tokio::runtime::Handle::current(); @@ -159,6 +175,7 @@ impl EventService { let entry_count_for_hook = entry_count_for_hook.clone(); let msg_store_for_hook = msg_store_for_hook.clone(); let db = db_for_hook.clone(); + let workspace_repo_changes = workspace_repo_changes_for_hook.clone(); if let Ok(table) = HookTables::from_str(hook.table) { let rowid = hook.rowid; @@ -168,10 +185,22 @@ impl EventService { | (HookTables::Projects, SqliteOperation::Delete) | (HookTables::Workspaces, SqliteOperation::Delete) | (HookTables::ExecutionProcesses, SqliteOperation::Delete) - | (HookTables::Scratch, SqliteOperation::Delete) => { + | (HookTables::Scratch, SqliteOperation::Delete) + | (HookTables::WorkspaceRepos, SqliteOperation::Delete) => { // Deletions handled in preupdate hook for reliable data capture return; } + (HookTables::WorkspaceRepos, _) => { + if let Ok(Some(workspace_repo)) = + WorkspaceRepo::find_by_rowid(&db.pool, rowid).await + { + let _ = workspace_repo_changes.send(WorkspaceRepoChange { + workspace_id: workspace_repo.workspace_id, + repo_id: workspace_repo.repo_id, + }); + } + return; + } (HookTables::Tasks, _) => { match Task::find_by_rowid(&db.pool, rowid).await { Ok(Some(task)) => RecordTypes::Task(task), diff --git a/crates/services/src/services/events/types.rs b/crates/services/src/services/events/types.rs index 6e53827b57..7855ecc781 100644 --- a/crates/services/src/services/events/types.rs +++ b/crates/services/src/services/events/types.rs @@ -32,6 +32,8 @@ pub enum HookTables { Scratch, #[strum(to_string = "projects")] Projects, + #[strum(to_string = "workspace_repos")] + WorkspaceRepos, } #[derive(Serialize, Deserialize, TS)]