Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 17 additions & 0 deletions crates/db/src/models/workspace_repo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,23 @@ impl WorkspaceRepo {
.await
}

pub async fn find_by_rowid(pool: &SqlitePool, rowid: i64) -> Result<Option<Self>, sqlx::Error> {
sqlx::query_as!(
WorkspaceRepo,
r#"SELECT id as "id!: Uuid",
workspace_id as "workspace_id!: Uuid",
repo_id as "repo_id!: Uuid",
target_branch,
created_at as "created_at!: DateTime<Utc>",
updated_at as "updated_at!: DateTime<Utc>"
FROM workspace_repos
WHERE rowid = $1"#,
rowid
)
.fetch_optional(pool)
.await
}

/// Find repos for a workspace with their copy_files configuration.
/// Uses LEFT JOIN so repos without project_repo entries still appear (with NULL copy_files).
pub async fn find_repos_with_copy_files(
Expand Down
7 changes: 7 additions & 0 deletions crates/local-deployment/src/container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ pub struct LocalContainerService {
queued_message_service: QueuedMessageService,
publisher: Result<SharePublisher, RemoteClientNotConfigured>,
notification_service: NotificationService,
workspace_repo_changes:
Arc<tokio::sync::broadcast::Sender<services::services::events::WorkspaceRepoChange>>,
}

impl LocalContainerService {
Expand All @@ -92,6 +94,9 @@ impl LocalContainerService {
approvals: Approvals,
queued_message_service: QueuedMessageService,
publisher: Result<SharePublisher, RemoteClientNotConfigured>,
workspace_repo_changes: Arc<
tokio::sync::broadcast::Sender<services::services::events::WorkspaceRepoChange>,
>,
) -> Self {
let child_store = Arc::new(RwLock::new(HashMap::new()));
let interrupt_senders = Arc::new(RwLock::new(HashMap::new()));
Expand All @@ -110,6 +115,7 @@ impl LocalContainerService {
queued_message_service,
publisher,
notification_service,
workspace_repo_changes,
};

container.spawn_workspace_cleanup();
Expand Down Expand Up @@ -1279,6 +1285,7 @@ impl ContainerService for LocalContainerService {
base_commit: base_commit.clone(),
stats_only,
path_prefix: Some(repo.name.clone()),
workspace_repo_changes: self.workspace_repo_changes.clone(),
})
.await?;

Expand Down
11 changes: 10 additions & 1 deletion crates/local-deployment/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,16 @@ impl Deployment for LocalDeployment {
// Create shared components for EventService
let events_msg_store = Arc::new(MsgStore::new());
let events_entry_count = Arc::new(RwLock::new(0));
let (workspace_repo_changes_tx, _) = tokio::sync::broadcast::channel(100);
let workspace_repo_changes = Arc::new(workspace_repo_changes_tx);

// Create DB with event hooks
let db = {
let hook = EventService::create_hook(
events_msg_store.clone(),
events_entry_count.clone(),
DBService::new().await?, // Temporary DB service for the hook
workspace_repo_changes.clone(),
);
DBService::new_with_after_connect(hook).await?
};
Expand Down Expand Up @@ -182,10 +185,16 @@ impl Deployment for LocalDeployment {
approvals.clone(),
queued_message_service.clone(),
share_publisher.clone(),
workspace_repo_changes.clone(),
)
.await;

let events = EventService::new(db.clone(), events_msg_store, events_entry_count);
let events = EventService::new(
db.clone(),
events_msg_store,
events_entry_count,
workspace_repo_changes,
);

let file_search_cache = Arc::new(FileSearchCache::new());

Expand Down
25 changes: 19 additions & 6 deletions crates/services/src/services/diff_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@ use notify_debouncer_full::{
};
use thiserror::Error;
use tokio::{sync::mpsc, task::JoinHandle};
use tokio_stream::wrappers::{IntervalStream, ReceiverStream};
use tokio_stream::wrappers::ReceiverStream;
use utils::{
diff::{self, Diff},
log_msg::LogMsg,
};
use uuid::Uuid;

use crate::services::{
events::WorkspaceRepoChange,
filesystem_watcher::{self, FilesystemWatcherError},
git::{Commit, DiffTarget, GitService, GitServiceError},
};
Expand Down Expand Up @@ -103,6 +104,7 @@ pub struct DiffStreamArgs {
pub base_commit: Commit,
pub stats_only: bool,
pub path_prefix: Option<String>,
pub workspace_repo_changes: Arc<tokio::sync::broadcast::Sender<WorkspaceRepoChange>>,
}

struct DiffStreamManager {
Expand All @@ -123,10 +125,9 @@ enum DiffEvent {

pub async fn create(args: DiffStreamArgs) -> Result<DiffStreamHandle, DiffStreamError> {
let (tx, rx) = mpsc::channel::<Result<LogMsg, io::Error>>(DIFF_STREAM_CHANNEL_CAPACITY);
let manager_args = args.clone();

let watcher_task = tokio::spawn(async move {
let mut manager = DiffStreamManager::new(manager_args, tx);
let mut manager = DiffStreamManager::new(args, tx);
if let Err(e) = manager.run().await {
tracing::error!("Diff stream manager failed: {e}");
let _ = manager.tx.send(Err(io::Error::other(e.to_string()))).await;
Expand Down Expand Up @@ -167,8 +168,9 @@ impl DiffStreamManager {
};
let _git_guard = git_debouncer;

let mut target_interval =
IntervalStream::new(tokio::time::interval(Duration::from_secs(1)));
let mut workspace_repo_rx = self.args.workspace_repo_changes.subscribe();
let workspace_id = self.args.workspace_id;
let repo_id = self.args.repo_id;

loop {
let event = tokio::select! {
Expand All @@ -179,7 +181,18 @@ impl DiffStreamManager {
None => std::future::pending().await,
}
} => DiffEvent::GitStateChange,
_ = target_interval.next() => DiffEvent::CheckTarget,
result = workspace_repo_rx.recv() => {
match result {
Ok(change) if change.workspace_id == workspace_id && change.repo_id == repo_id => {
DiffEvent::CheckTarget
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => { // Missed messages, check anyway to be safe
DiffEvent::CheckTarget
},
Ok(_) => continue, // Change for a different workspace/repo
Err(tokio::sync::broadcast::error::RecvError::Closed) => break, // Channel closed
}
},
else => break,
};

Expand Down
37 changes: 33 additions & 4 deletions crates/services/src/services/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,22 @@ use db::{
DBService,
models::{
execution_process::ExecutionProcess, project::Project, scratch::Scratch, task::Task,
workspace::Workspace,
workspace::Workspace, workspace_repo::WorkspaceRepo,
},
};
use serde_json::json;
use sqlx::{Error as SqlxError, Sqlite, SqlitePool, decode::Decode, sqlite::SqliteOperation};
use tokio::sync::RwLock;
use tokio::sync::{RwLock, broadcast};
use utils::msg_store::MsgStore;
use uuid::Uuid;

/// Notification sent when a workspace_repo's target_branch changes
#[derive(Debug, Clone)]
pub struct WorkspaceRepoChange {
pub workspace_id: Uuid,
pub repo_id: Uuid,
}

#[path = "events/patches.rs"]
pub mod patches;
#[path = "events/streams.rs"]
Expand All @@ -31,15 +38,22 @@ pub struct EventService {
db: DBService,
#[allow(dead_code)]
entry_count: Arc<RwLock<usize>>,
workspace_repo_changes: Arc<broadcast::Sender<WorkspaceRepoChange>>,
}

impl EventService {
/// Creates a new EventService that will work with a DBService configured with hooks
pub fn new(db: DBService, msg_store: Arc<MsgStore>, entry_count: Arc<RwLock<usize>>) -> Self {
pub fn new(
db: DBService,
msg_store: Arc<MsgStore>,
entry_count: Arc<RwLock<usize>>,
workspace_repo_changes: Arc<broadcast::Sender<WorkspaceRepoChange>>,
) -> Self {
Self {
msg_store,
db,
entry_count,
workspace_repo_changes,
}
}

Expand Down Expand Up @@ -82,6 +96,7 @@ impl EventService {
msg_store: Arc<MsgStore>,
entry_count: Arc<RwLock<usize>>,
db_service: DBService,
workspace_repo_changes: Arc<broadcast::Sender<WorkspaceRepoChange>>,
) -> impl for<'a> Fn(
&'a mut sqlx::sqlite::SqliteConnection,
) -> std::pin::Pin<
Expand All @@ -93,6 +108,7 @@ impl EventService {
let msg_store_for_hook = msg_store.clone();
let entry_count_for_hook = entry_count.clone();
let db_for_hook = db_service.clone();
let workspace_repo_changes_for_hook = workspace_repo_changes.clone();
Box::pin(async move {
let mut handle = conn.lock_handle().await?;
let runtime_handle = tokio::runtime::Handle::current();
Expand Down Expand Up @@ -159,6 +175,7 @@ impl EventService {
let entry_count_for_hook = entry_count_for_hook.clone();
let msg_store_for_hook = msg_store_for_hook.clone();
let db = db_for_hook.clone();
let workspace_repo_changes = workspace_repo_changes_for_hook.clone();

if let Ok(table) = HookTables::from_str(hook.table) {
let rowid = hook.rowid;
Expand All @@ -168,10 +185,22 @@ impl EventService {
| (HookTables::Projects, SqliteOperation::Delete)
| (HookTables::Workspaces, SqliteOperation::Delete)
| (HookTables::ExecutionProcesses, SqliteOperation::Delete)
| (HookTables::Scratch, SqliteOperation::Delete) => {
| (HookTables::Scratch, SqliteOperation::Delete)
| (HookTables::WorkspaceRepos, SqliteOperation::Delete) => {
// Deletions handled in preupdate hook for reliable data capture
return;
}
(HookTables::WorkspaceRepos, _) => {
if let Ok(Some(workspace_repo)) =
WorkspaceRepo::find_by_rowid(&db.pool, rowid).await
{
let _ = workspace_repo_changes.send(WorkspaceRepoChange {
workspace_id: workspace_repo.workspace_id,
repo_id: workspace_repo.repo_id,
});
}
return;
}
(HookTables::Tasks, _) => {
match Task::find_by_rowid(&db.pool, rowid).await {
Ok(Some(task)) => RecordTypes::Task(task),
Expand Down
2 changes: 2 additions & 0 deletions crates/services/src/services/events/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ pub enum HookTables {
Scratch,
#[strum(to_string = "projects")]
Projects,
#[strum(to_string = "workspace_repos")]
WorkspaceRepos,
}

#[derive(Serialize, Deserialize, TS)]
Expand Down
Loading