From 394a6b9a40a69b97472d2e8d3c795008eec7a219 Mon Sep 17 00:00:00 2001 From: Roland Rodriguez Date: Sat, 29 Jun 2024 21:33:57 -0600 Subject: [PATCH 01/15] refactor(repository): use commit_signed method Replace the existing commit method with commit_signed method for signing commits. The previous commit method is commented out for potential future reference. This change ensures that all commits are signed, enhancing security and traceability. --- src/actors/repositories.rs | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/src/actors/repositories.rs b/src/actors/repositories.rs index 01d310d..8db4364 100644 --- a/src/actors/repositories.rs +++ b/src/actors/repositories.rs @@ -161,16 +161,17 @@ impl GitRepository { let when: TimeStamp = (&sig.when()).into(); let message_string = &commit_message.to_string(); // TODO: optionally sign commits - let hash = repo - .commit( - Some("HEAD"), - &sig, - &sig, - message_string, - &tree, - &[&parent_commit], - ) - .expect("Failed to commit"); + let hash = repo.commit_signed(message_string, &sig.into(), None).expect("Failed to sign commit"); + // let hash = repo + // .commit( + // Some("HEAD"), + // &sig, + // &sig, + // message_string, + // &tree, + // &[&parent_commit], + // ) + // .expect("Failed to commit"); let hash = hash.to_string(); let commit_message = commit_message.clone(); From 0f7f00f2732d61b7afc9bbc8b53006f4f939fd36 Mon Sep 17 00:00:00 2001 From: Roland Rodriguez Date: Sat, 29 Jun 2024 21:36:17 -0600 Subject: [PATCH 02/15] feat(repo): add commit signing Introduce optional commit signing functionality in GitRepository. The code now attempts to sign commits using the commit_signed method after creating an unsigned commit buffer. NEW FEATURE: You appear to have introduced one or more new features that are backward-compatible. According to Semantic Versioning (SemVer), this requires a minor version update. Please verify and update your version number accordingly. --- src/actors/repositories.rs | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/actors/repositories.rs b/src/actors/repositories.rs index 8db4364..cfd8cea 100644 --- a/src/actors/repositories.rs +++ b/src/actors/repositories.rs @@ -161,6 +161,17 @@ impl GitRepository { let when: TimeStamp = (&sig.when()).into(); let message_string = &commit_message.to_string(); // TODO: optionally sign commits + let unsigned_commit_buffer = repo + .commit( + Some("HEAD"), + &sig, + &sig, + message_string, + &tree, + &[&parent_commit], + ) + .expect("Failed to commit"); + let hash = repo.commit_signed(message_string, &sig.into(), None).expect("Failed to sign commit"); // let hash = repo // .commit( From 247bb505886323b53c499c18bd1d8fc936056542 Mon Sep 17 00:00:00 2001 From: Roland Rodriguez Date: Sat, 29 Jun 2024 21:36:57 -0600 Subject: [PATCH 03/15] fix(git): correct commit buffer usage This change corrects the usage of the commit buffer in the GitRepository implementation. Previously, the commit_signed function was called with the commit message string directly. The corrected code now uses the unsigned_commit_buffer for signing the commit. BUG FIX: You appear to have made one or more backward-compatible bug fixes. According to Semantic Versioning (SemVer), this requires a patch version update. Please verify and update your version number accordingly. --- src/actors/repositories.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/actors/repositories.rs b/src/actors/repositories.rs index cfd8cea..63d99c4 100644 --- a/src/actors/repositories.rs +++ b/src/actors/repositories.rs @@ -162,8 +162,7 @@ impl GitRepository { let message_string = &commit_message.to_string(); // TODO: optionally sign commits let unsigned_commit_buffer = repo - .commit( - Some("HEAD"), + .commit_create_buffer( &sig, &sig, message_string, @@ -172,7 +171,7 @@ impl GitRepository { ) .expect("Failed to commit"); - let hash = repo.commit_signed(message_string, &sig.into(), None).expect("Failed to sign commit"); + let hash = repo.commit_signed(unsigned_commit_buffer, &sig.into(), None).expect("Failed to sign commit"); // let hash = repo // .commit( // Some("HEAD"), From e677fea688300ed2cbf806c8612fa929dcc0d208 Mon Sep 17 00:00:00 2001 From: Roland Rodriguez Date: Sat, 29 Jun 2024 21:37:17 -0600 Subject: [PATCH 04/15] fix(repo): fix commit signing issue Corrects the method call for commit_signed by converting the unsigned_commit_buffer to a UTF-8 string before passing it to the function. This change ensures the commit signing process works without errors. BUG FIX: You appear to have made one or more backward-compatible bug fixes. According to Semantic Versioning (SemVer), this requires a patch version update. Please verify and update your version number accordingly. --- src/actors/repositories.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/actors/repositories.rs b/src/actors/repositories.rs index 63d99c4..99b2532 100644 --- a/src/actors/repositories.rs +++ b/src/actors/repositories.rs @@ -171,7 +171,7 @@ impl GitRepository { ) .expect("Failed to commit"); - let hash = repo.commit_signed(unsigned_commit_buffer, &sig.into(), None).expect("Failed to sign commit"); + let hash = repo.commit_signed(str::from_utf8(&unsigned_commit_buffer).unwrap().to_string(), &sig.into(), None).expect("Failed to sign commit"); // let hash = repo // .commit( // Some("HEAD"), From 933f3b5d0486fdc5e31b9b40ae2c393b19868788 Mon Sep 17 00:00:00 2001 From: Roland Rodriguez Date: Sat, 29 Jun 2024 21:37:37 -0600 Subject: [PATCH 05/15] fix(repository): fix commit signing Corrects the signing of commits by ensuring the signature is an empty string when converting the unsigned commit buffer to a string. This change prevents the use of an invalid signature object. BUG FIX: You appear to have made one or more backward-compatible bug fixes. According to Semantic Versioning (SemVer), this requires a patch version update. Please verify and update your version number accordingly. --- src/actors/repositories.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/actors/repositories.rs b/src/actors/repositories.rs index 99b2532..e9209fc 100644 --- a/src/actors/repositories.rs +++ b/src/actors/repositories.rs @@ -171,7 +171,7 @@ impl GitRepository { ) .expect("Failed to commit"); - let hash = repo.commit_signed(str::from_utf8(&unsigned_commit_buffer).unwrap().to_string(), &sig.into(), None).expect("Failed to sign commit"); + let hash = repo.commit_signed(std::str::from_utf8(&unsigned_commit_buffer).unwrap().to_string(), "", None).expect("Failed to sign commit"); // let hash = repo // .commit( // Some("HEAD"), From 10cfa48c764105c18f24e13d066a7e8f22417476 Mon Sep 17 00:00:00 2001 From: Roland Rodriguez Date: Sat, 29 Jun 2024 21:38:07 -0600 Subject: [PATCH 06/15] refactor(repo): remove duplicate commit call Remove the duplicate call to repo.commit_signed in the GitRepository implementation. This ensures that the commit operation is only performed once, avoiding unnecessary operations and potential issues. --- src/actors/repositories.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/actors/repositories.rs b/src/actors/repositories.rs index e9209fc..e62379d 100644 --- a/src/actors/repositories.rs +++ b/src/actors/repositories.rs @@ -171,7 +171,7 @@ impl GitRepository { ) .expect("Failed to commit"); - let hash = repo.commit_signed(std::str::from_utf8(&unsigned_commit_buffer).unwrap().to_string(), "", None).expect("Failed to sign commit"); + let hash = repo.commit_signed(std::str::from_utf8(&unsigned_commit_buffer).unwrap(), "", None).expect("Failed to sign commit"); // let hash = repo // .commit( // Some("HEAD"), From a6c9a4295939b5c7e43026d0f1d464d8037f606d Mon Sep 17 00:00:00 2001 From: Roland Rodriguez Date: Sun, 30 Jun 2024 01:35:29 -0600 Subject: [PATCH 07/15] chore(build): update ntangler version Update the ntangler package version from 3.5.3 to 3.6.1 in Cargo.lock. This change ensures that the latest features and bug fixes from the ntangler package are included in the build. --- Cargo.lock | 2 +- src/actors/generators.rs | 308 ------------------------------------ src/actors/llmclient.rs | 332 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 333 insertions(+), 309 deletions(-) delete mode 100644 src/actors/generators.rs create mode 100644 src/actors/llmclient.rs diff --git a/Cargo.lock b/Cargo.lock index e51bd9d..11ac921 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1416,7 +1416,7 @@ dependencies = [ [[package]] name = "ntangler" -version = "3.5.3" +version = "3.6.1" dependencies = [ "akton", "anyhow", diff --git a/src/actors/generators.rs b/src/actors/generators.rs deleted file mode 100644 index d7bd70b..0000000 --- a/src/actors/generators.rs +++ /dev/null @@ -1,308 +0,0 @@ -use std::{ - fmt::Debug, - path::PathBuf, - time::Duration -}; - -use akton::prelude::*; -use AssistantsApiResponseFormatOption::Format; -use async_openai::{ - Client, - config::OpenAIConfig, - error::OpenAIError, - types::{AssistantEventStream, AssistantsApiResponseFormat, AssistantsApiResponseFormatOption, AssistantStreamEvent, CreateMessageRequest, CreateMessageRequestContent, CreateRunRequest, CreateThreadRequest, MessageDeltaContent, MessageRole, ThreadObject}, - types::AssistantsApiResponseFormatType::JsonObject -}; -use failsafe::{ - Config, - futures::CircuitBreaker -}; -use futures::StreamExt; -use tokio::{ - sync::mpsc, - sync::mpsc::Sender, - time::timeout -}; -use tracing::{error, info, instrument, trace, warn}; - -use crate::messages::{CommitMessageGenerated, DiffQueued, GenerationStarted}; - -#[derive(Clone, Debug)] -pub(crate) struct OpenAi { - client: Client, -} - -impl Default for OpenAi { - #[instrument] - fn default() -> Self { - info!("Initializing OpenAi actor with default configuration"); - let client = Client::new(); - Self { - client, - } - } -} - -#[instrument] -async fn create_run_stream_with_circuit_breaker( - circuit_breaker: &(impl CircuitBreaker + Debug), - client: &Client, - thread_id: &str, - format: Option, -) -> anyhow::Result { - match circuit_breaker.call(timeout(Duration::from_secs(10), client.threads().runs(thread_id).create_stream(CreateRunRequest { - assistant_id: "asst_xiaBOCpksCenAMJSL2F0qqFL".to_string(), - stream: Some(true), - parallel_tool_calls: Some(true), - response_format: format, - ..Default::default() - }))).await { - Ok(result) => match result { - Ok(stream) => { - info!("Run stream successfully created for thread_id: {}", thread_id); - Ok(stream) - } - Err(e) => { - error!("Timeout while creating run stream for thread_id: {}", thread_id); - Err(anyhow::Error::from(e).into()) - } - }, - Err(_) => { - error!("Timeout while creating run stream"); - Err(anyhow::Error::msg("Timeout while creating run stream")) - } - } -} - -#[instrument] -async fn create_message_with_circuit_breaker( - circuit_breaker: &(impl CircuitBreaker + Debug), - client: &Client, - thread_id: &str, - diff: String, -) -> anyhow::Result<()> { - match circuit_breaker.call(timeout(Duration::from_secs(10), client.threads().messages(thread_id).create(CreateMessageRequest { - role: MessageRole::User, - content: CreateMessageRequestContent::from(diff), - ..Default::default() - }))).await { - Ok(result) => match result { - Ok(_) => { - info!("Message successfully created in thread_id: {}", thread_id); - Ok(()) - } - Err(e) => { - error!("Failed to create message in thread_id {}: {:?}", thread_id, e); - Err(anyhow::Error::from(e).into()) - } - }, - Err(e) => { - error!("Circuit breaker call failed while creating message in thread_id {}: {:?}", thread_id, e); - Err(anyhow::Error::from(e)) - } - } -} - -#[instrument] -async fn create_thread_with_circuit_breaker(circuit_breaker: &(impl CircuitBreaker + Debug), client: &Client) -> anyhow::Result { - match circuit_breaker.call(timeout(Duration::from_secs(10), client.threads().create(CreateThreadRequest::default()))).await { - Ok(result) => match result { - Ok(thread) => { - info!("Thread successfully created with id: {}", thread.id); - Ok(thread) - } - Err(e) => { - error!("Failed to create thread: {:?}", e); - Err(anyhow::Error::from(e).into()) - } - }, - Err(e) => { - error!("Circuit breaker call failed while creating thread: {:?}", e); - Err(anyhow::Error::from(e)) - } - } -} - -impl OpenAi { - pub(crate) async fn initialize( - config: ActorConfig, - system: &mut AktonReady, - ) -> anyhow::Result { - info!("Initializing OpenAi actor with provided configuration"); - let mut actor = system.create_actor_with_config::(config).await; - trace!("Setting up SubmitDiff event handler for OpenAi actor"); - - // Event: Setting up SubmitDiff Handler - // Description: Setting up an actor to handle the `SubmitDiff` event asynchronously. - // Context: None - actor.setup.act_on_async::(|actor, event| { - let reply_address = event.message.reply_address.clone(); - let broker = actor.akton.get_broker().clone(); - let message = event.message.clone(); - let client = actor.state.client.clone(); - info!("Received DiffQueued event: {:?}", event); - - Context::wrap_future(async move { - Self::handle_diff_received(message, broker, reply_address, client).await; - }) - }); - - actor.context.subscribe::().await; - let context = actor.activate(None).await; - - // Event: Activating OpenAi generator - // Description: Activating the OpenAi generator. - // Context: None - info!(id = &context.key, "Activated OpenAi actor with id: {}", context.key); - Ok(context) - } - #[instrument(skip(broker, return_address, client))] - async fn handle_diff_received(message: DiffQueued, broker: Context, return_address: Context, client: Client) { - let (tx, mut rx) = mpsc::channel(32); - let return_address = return_address.clone(); - let diff = message.diff.clone(); - let target_file = message.target_file.clone(); - let repository_nickname = message.repository_nickname.clone(); - let target_file_clone = target_file.clone(); - let target_file_display = &target_file.display().to_string(); - - let client = client.clone(); - info!("Handling DiffQueued event for file: {}", target_file.display().to_string()); - tokio::spawn(Self::call_ai_endpoint(broker, tx, diff, repository_nickname, target_file_clone, client)); - - // Await the result from the thread - if let Some(commit_message) = rx.recv().await { - // Event: Commit Message Received - // Description: A commit message has been received from the event stream. - // Context: Commit message details. - if !commit_message.is_empty() { - match serde_json::from_str(&commit_message) { - Ok(commit) => { - let message = CommitMessageGenerated::new(target_file, commit); - return_address.emit_async(message, None).await; - info!("Commit message generated and emitted for file: {}", target_file_display); - } - Err(e) => { - error!("Failed to deserialize commit message JSON for file {}: {:?}", target_file_display, e); - } - }; - } else { - error!("Commit message was empty for file: {}. Check the logs.", target_file_display); - } - } else { - // Event: No Commit Message Received - // Description: No commit message was received from the event stream. - // Context: None - error!("No commit message received for file: {}", target_file_display); - } - } - - #[instrument(skip(broker, tx, client))] - async fn call_ai_endpoint(broker: Context, tx: Sender, diff: String, repository_nickname: String, target_file_clone: PathBuf, client: Client) { - let target_file_clone = target_file_clone.clone(); - let target_file_display = &target_file_clone.display().to_string(); - let msg = BrokerRequest::new(GenerationStarted::new( - target_file_clone.clone(), - repository_nickname.clone(), - )); - info!("AI endpoint called for repository: {}, file: {}", repository_nickname, target_file_clone.display()); - broker.emit_async(msg, None).await; - - let circuit_breaker = Config::new().build(); - - let client = client.clone(); - let thread = match create_thread_with_circuit_breaker(&circuit_breaker, &client).await { - Ok(thread) => thread, - Err(e) => { - // TODO: impl fallback logic - error!("Error creating thread with circuit breaker for repository: {}, file: {}: {:?}", repository_nickname, target_file_display, e); - return; // Fail gracefully by returning early - } - }; - - let thread_id = thread.id.clone(); - trace!("Got thread id {} for repository: {}, file: {}", thread_id, repository_nickname, target_file_clone.display()); - match create_message_with_circuit_breaker(&circuit_breaker, &client, &thread.id, diff).await { - Ok(message) => { - trace!("Message created successfully in thread id {} for repository: {}, file: {}", thread_id, repository_nickname, target_file_display); - message - } - Err(e) => { - // TODO: impl fallback logic - error!("Error creating message with circuit breaker for thread id {} in repository: {}, file: {}: {:?}", thread_id, repository_nickname, target_file_display, e); - return; // Fail gracefully by returning early - } - }; - - let format = AssistantsApiResponseFormat { r#type: JsonObject }; - - // Step 3: Initiate a run and handle the event stream. - let mut event_stream = match create_run_stream_with_circuit_breaker(&circuit_breaker, &client, &thread.id, Some(Format(format))).await { - Ok(event_stream) => event_stream, - Err(e) => { - // TODO: impl fallback logic - error!("Error creating run stream with circuit breaker for thread id {} in repository: {}, file: {}: {:?}", thread_id, repository_nickname, target_file_display, e); - return; // Fail gracefully by returning early - } - }; - - let mut commit_message = String::new(); - trace!("Processing events from the event stream for thread id {} in repository: {}, file: {}", thread_id, repository_nickname, target_file_display); - - // Processing events from the event stream. - while let Some(event) = event_stream.next().await { - match event { - Ok(event) => match event { - AssistantStreamEvent::ThreadMessageDelta(message) => { - if let Some(content) = message.delta.content { - for item in content { - match item { - MessageDeltaContent::ImageFile(_) - | MessageDeltaContent::ImageUrl(_) => {} - MessageDeltaContent::Text(text) => { - if let Some(text) = text.text { - if let Some(text) = text.value { - commit_message.push_str(&text); - } - } - } - } - } - } - } - AssistantStreamEvent::Done(_) => { - trace!("Event stream completed for thread id {} in repository: {}, file: {}", thread_id, repository_nickname, target_file_display); - } - _ => { - warn!("Unhandled event type in the stream for thread id {} in repository: {}, file: {}", thread_id, repository_nickname, target_file_display); - } - }, - Err(e) => { - // Event: Error in Event Stream - // Description: An error occurred while processing the event stream. - // Context: Error details. - match e { - OpenAIError::Reqwest(s) => { - error!("Reqwest error in event stream for thread id {} in repository: {}, file: {}: {}", thread_id, repository_nickname, target_file_display, s); - } - OpenAIError::ApiError(_) => {} - OpenAIError::JSONDeserialize(_) => {} - OpenAIError::FileSaveError(_) => {} - OpenAIError::FileReadError(_) => {} - OpenAIError::StreamError(s) => { - error!("Stream error in event stream for thread id {} in repository: {}, file: {}: {}", thread_id, repository_nickname, target_file_display, s); - } - OpenAIError::InvalidArgument(_) => {} - } - } - } - } - - trace!("Returning commit message for repository: {}, file: {}", repository_nickname, target_file_display); - if let Err(e) = tx.send(commit_message).await { - error!("Failed to send commit message for repository: {}, file: {}: {}", repository_nickname, target_file_display, e); - } - } -} - - diff --git a/src/actors/llmclient.rs b/src/actors/llmclient.rs new file mode 100644 index 0000000..ce6dbdd --- /dev/null +++ b/src/actors/llmclient.rs @@ -0,0 +1,332 @@ +use std::env; +use std::fmt::Debug; +use std::path::PathBuf; +use std::time::Duration; +use akton::prelude::*; +use async_openai::config::OpenAIConfig; +use async_openai::error::OpenAIError; +use async_openai::types::{AssistantEventStream, AssistantsApiResponseFormat, AssistantsApiResponseFormatOption, AssistantStreamEvent, CreateMessageRequest, CreateMessageRequestContent, CreateRunRequest, CreateThreadRequest, MessageDeltaContent, MessageRole, ThreadObject}; +use async_openai::types::AssistantsApiResponseFormatOption::Format; +use async_openai::types::AssistantsApiResponseFormatType::JsonObject; +use failsafe::Config; +use failsafe::futures::CircuitBreaker; +use futures::StreamExt; +use reqwest::Client; +use serde::{Deserialize, Serialize}; +use tokio::sync::mpsc::Sender; +use tokio::time::timeout; +use tracing::{debug, error, info, instrument, trace, warn}; + +use crate::messages::{CommitMessageGenerated, DiffQueued, GenerationStarted}; +use crate::models::CommitMessage; + +#[derive(Clone, Debug)] +pub struct LlmClient { + client: Client, + endpoint: String, + api_key: Option, +} + +impl Default for LlmClient { + fn default() -> Self { + LlmClient { + client: Client::new(), + endpoint: String::default(), + api_key: None, + } + } +} + +impl LlmClient { + #[instrument(skip(system, config))] + pub async fn initialize(config: ActorConfig, system: &mut AktonReady) -> anyhow::Result { + let mut actor = system.create_actor_with_config::(config).await; + + // Initialize with default values, these will be set properly later + actor.state.client = Client::new(); + // Read endpoint from environment variable + actor.state.endpoint = env::var("NTANGLER_ENDPOINT") + .unwrap_or_else(|_| { + warn!("NTANGLER_ENDPOINT not set, using default endpoint"); + "https://api.openai.com/v1".to_string() // Default OpenAI endpoint + }); + + // Read API key from environment variable + actor.state.api_key = env::var("NTANGLER_API_TOKEN").ok(); + + if actor.state.api_key.is_none() { + warn!("NTANGLER_API_TOKEN not set, API calls may fail"); + } + + actor.setup.act_on_async::(|actor, event| { + let return_address = event.message.reply_address.clone(); + let message = event.message.clone(); + let client = actor.state.client.clone(); + let api_key = actor.state.api_key.clone(); + let endpoint = actor.state.endpoint.clone(); + info!("Received DiffQueued event: {:?}", event); + Context::wrap_future(Self::handle_generate_commit_message(message, return_address, client, api_key, endpoint) ) + }); + + actor.context.subscribe::().await; + + Ok(actor.activate(None).await) + } + + #[instrument(skip(message, return_address, client, api_key))] + async fn handle_generate_commit_message(message: DiffQueued, return_address: Context, client: Client, api_key: Option, endpoint: String) { + let return_address = return_address.clone(); + match Self::generate_commit_message(client, endpoint, api_key, message.diff).await { + Ok(commit_message) => { + return_address.emit_async( + CommitMessageGenerated { target_file: message.target_file.clone(), commit_message }, + None, + ).await + } + Err(e) => { + error!("{e}"); + } + } + } + + pub fn configure(&mut self, endpoint: String, api_key: Option) { + self.endpoint = endpoint; + self.api_key = api_key; + } + + #[instrument(skip(client))] + async fn generate_commit_message(client: Client, endpoint: String, api_key: Option, diff: String) -> anyhow::Result { + let request = CommitRequest { diff }; + let mut req_builder = client.post(&endpoint) + .header("Content-Type", "application/json"); + + if let Some(key) = &api_key { + req_builder = req_builder.header("Authorization", format!("Bearer {}", key)); + } + + let response = req_builder + .json(&request) + .send() + .await?; + + // Log response details + debug!("Response status: {}", response.status()); + debug!("Response headers:"); + for (name, value) in response.headers().iter() { + debug!(" {}: {}", name, value.to_str().unwrap_or("")); + } + + // Handle streaming response + let mut full_body = String::new(); + let mut stream = response.bytes_stream(); + + while let Some(item) = stream.next().await { + let chunk = item?; + full_body.push_str(&String::from_utf8_lossy(&chunk)); + debug!("Received chunk: {}", String::from_utf8_lossy(&chunk)); + } + + debug!("Full response body: {}", full_body); + + // Parse the full response body + let commit_message: CommitMessage = serde_json::from_str(&full_body)?; + + Ok(commit_message) + } + #[instrument] + async fn create_run_stream_with_circuit_breaker( + circuit_breaker: &(impl CircuitBreaker + Debug), + client: &async_openai::Client, + conversation_thread_id: &str, + format: Option, + ) -> anyhow::Result { + match circuit_breaker.call(timeout(Duration::from_secs(10), client.threads().runs(conversation_thread_id).create_stream(CreateRunRequest { + assistant_id: "asst_xiaBOCpksCenAMJSL2F0qqFL".to_string(), + stream: Some(true), + parallel_tool_calls: Some(true), + response_format: format, + ..Default::default() + }))).await { + Ok(result) => match result { + Ok(stream) => { + info!("Run stream successfully created for conversation_thread_id: {}", conversation_thread_id); + Ok(stream) + } + Err(e) => { + error!("Timeout while creating run stream for conversation_thread_id: {}", conversation_thread_id); + Err(anyhow::Error::from(e).into()) + } + }, + Err(_) => { + error!("Timeout while creating run stream"); + Err(anyhow::Error::msg("Timeout while creating run stream")) + } + } + } + + #[instrument] + async fn create_message_with_circuit_breaker( + circuit_breaker: &(impl CircuitBreaker + Debug), + client: &async_openai::Client, + conversation_thread_id: &str, + diff: String, + ) -> anyhow::Result<()> { + match circuit_breaker.call(timeout(Duration::from_secs(10), client.threads().messages(conversation_thread_id).create(CreateMessageRequest { + role: MessageRole::User, + content: CreateMessageRequestContent::from(diff), + ..Default::default() + }))).await { + Ok(result) => match result { + Ok(_) => { + info!("Message successfully created in conversation_thread_id: {}", conversation_thread_id); + Ok(()) + } + Err(e) => { + error!("Failed to create message in conversation_thread_id {}: {:?}", conversation_thread_id, e); + Err(anyhow::Error::from(e).into()) + } + }, + Err(e) => { + error!("Circuit breaker call failed while creating message in thread_id {}: {:?}", conversation_thread_id, e); + Err(anyhow::Error::from(e)) + } + } + } + + #[instrument] + async fn create_thread_with_circuit_breaker(circuit_breaker: &(impl CircuitBreaker + Debug), client: &async_openai::Client) -> anyhow::Result { + match circuit_breaker.call(timeout(Duration::from_secs(10), client.threads().create(CreateThreadRequest::default()))).await { + Ok(result) => match result { + Ok(conversation_thread) => { + info!("Thread successfully created with id: {}", conversation_thread.id); + Ok(conversation_thread) + } + Err(e) => { + error!("Failed to create thread: {:?}", e); + Err(anyhow::Error::from(e).into()) + } + }, + Err(e) => { + error!("Circuit breaker call failed while creating thread: {:?}", e); + Err(anyhow::Error::from(e)) + } + } + } + #[instrument(skip(broker, tx, client))] + async fn call_ai_endpoint(broker: Context, tx: Sender, diff: String, repository_nickname: String, target_file_clone: PathBuf, client: async_openai::Client) { + let target_file_clone = target_file_clone.clone(); + let target_file_display = &target_file_clone.display().to_string(); + let msg = BrokerRequest::new(GenerationStarted::new( + target_file_clone.clone(), + repository_nickname.clone(), + )); + info!("AI endpoint called for repository: {}, file: {}", repository_nickname, target_file_clone.display()); + broker.emit_async(msg, None).await; + + let circuit_breaker = Config::new().build(); + + let client = client.clone(); + let thread = match Self::create_thread_with_circuit_breaker(&circuit_breaker, &client).await { + Ok(thread) => thread, + Err(e) => { + // TODO: impl fallback logic + error!("Error creating thread with circuit breaker for repository: {}, file: {}: {:?}", repository_nickname, target_file_display, e); + return; // Fail gracefully by returning early + } + }; + + let thread_id = thread.id.clone(); + trace!("Got thread id {} for repository: {}, file: {}", thread_id, repository_nickname, target_file_clone.display()); + match Self::create_message_with_circuit_breaker(&circuit_breaker, &client, &thread.id, diff).await { + Ok(message) => { + trace!("Message created successfully in thread id {} for repository: {}, file: {}", thread_id, repository_nickname, target_file_display); + message + } + Err(e) => { + // TODO: impl fallback logic + error!("Error creating message with circuit breaker for thread id {} in repository: {}, file: {}: {:?}", thread_id, repository_nickname, target_file_display, e); + return; // Fail gracefully by returning early + } + }; + + let format = AssistantsApiResponseFormat { r#type: JsonObject }; + + // Step 3: Initiate a run and handle the event stream. + let mut event_stream = match Self::create_run_stream_with_circuit_breaker(&circuit_breaker, &client, &thread.id, Some(Format(format))).await { + Ok(event_stream) => event_stream, + Err(e) => { + // TODO: impl fallback logic + error!("Error creating run stream with circuit breaker for thread id {} in repository: {}, file: {}: {:?}", thread_id, repository_nickname, target_file_display, e); + return; // Fail gracefully by returning early + } + }; + + let mut commit_message = String::new(); + trace!("Processing events from the event stream for thread id {} in repository: {}, file: {}", thread_id, repository_nickname, target_file_display); + + // Processing events from the event stream. + while let Some(event) = event_stream.next().await { + match event { + Ok(event) => match event { + AssistantStreamEvent::ThreadMessageDelta(message) => { + if let Some(content) = message.delta.content { + for item in content { + match item { + MessageDeltaContent::ImageFile(_) + | MessageDeltaContent::ImageUrl(_) => {} + MessageDeltaContent::Text(text) => { + if let Some(text) = text.text { + if let Some(text) = text.value { + commit_message.push_str(&text); + } + } + } + } + } + } + } + AssistantStreamEvent::Done(_) => { + trace!("Event stream completed for thread id {} in repository: {}, file: {}", thread_id, repository_nickname, target_file_display); + } + _ => { + warn!("Unhandled event type in the stream for thread id {} in repository: {}, file: {}", thread_id, repository_nickname, target_file_display); + } + }, + Err(e) => { + // Event: Error in Event Stream + // Description: An error occurred while processing the event stream. + // Context: Error details. + match e { + OpenAIError::Reqwest(s) => { + error!("Reqwest error in event stream for thread id {} in repository: {}, file: {}: {}", thread_id, repository_nickname, target_file_display, s); + } + OpenAIError::ApiError(_) => {} + OpenAIError::JSONDeserialize(_) => {} + OpenAIError::FileSaveError(_) => {} + OpenAIError::FileReadError(_) => {} + OpenAIError::StreamError(s) => { + error!("Stream error in event stream for thread id {} in repository: {}, file: {}: {}", thread_id, repository_nickname, target_file_display, s); + } + OpenAIError::InvalidArgument(_) => {} + } + } + } + } + + trace!("Returning commit message for repository: {}, file: {}", repository_nickname, target_file_display); + if let Err(e) = tx.send(commit_message).await { + error!("Failed to send commit message for repository: {}, file: {}: {}", repository_nickname, target_file_display, e); + } + } + +} + +#[derive(Deserialize)] +struct CommitResponse { + commit_message: CommitMessage, +} + +#[derive(Deserialize,Serialize)] +struct CommitRequest { + diff: String, +} \ No newline at end of file From cdfd33bbcaee7624e17e00cd2f27ff7b93159436 Mon Sep 17 00:00:00 2001 From: Roland Rodriguez Date: Sun, 30 Jun 2024 01:35:32 -0600 Subject: [PATCH 08/15] refactor(actors): replace OpenAi with LlmClient Refactor the Ntangler actor to replace the OpenAi initialization with LlmClient initialization. Updated the configuration to use llm_actor ARN for LlmClient. This change ensures that the Ntangler actor now uses the LlmClient for its generator state. --- src/actors/ntangler.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/actors/ntangler.rs b/src/actors/ntangler.rs index a970a8b..4537f2c 100644 --- a/src/actors/ntangler.rs +++ b/src/actors/ntangler.rs @@ -5,7 +5,7 @@ use akton::prelude::*; use akton::prelude::Subscribable; use tracing::{debug, instrument, trace}; -use crate::actors::OpenAi; +use crate::actors::{LlmClient}; use crate::actors::repositories::GitRepository; use crate::actors::scribe::Scribe; use crate::messages::{RepositoryPollRequested, SystemStarted}; @@ -38,13 +38,13 @@ impl Ntangler { actor.state.scribe = Scribe::initialize("scribe".to_string(), &mut actor.akton).await; - let generator_config = ActorConfig::new( - Arn::with_root("generator").expect("Failed to create generator Aktor-Arn"), + let llm_config = ActorConfig::new( + Arn::with_root("llm_actor").expect("Failed to create generator Aktor-Arn"), None, Some(broker.clone()), ) .expect("Failed to create generator config"); - actor.state.generator = OpenAi::initialize(generator_config, &mut actor.akton) + actor.state.generator = LlmClient::initialize(llm_config, &mut actor.akton) .await .expect("Failed to initialize generator actor"); From c05455470558d47cea83c87553c10403096c2f95 Mon Sep 17 00:00:00 2001 From: Roland Rodriguez Date: Sun, 30 Jun 2024 01:35:34 -0600 Subject: [PATCH 09/15] refactor(repository): simplify git commit logic This refactor simplifies the git commit logic by removing commented-out code and consolidating commit operations. The commit creation buffer is replaced with a direct commit call, and the status options configuration is streamlined for clarity and efficiency. --- src/actors/repositories.rs | 52 ++++++++++++++++++-------------------- 1 file changed, 24 insertions(+), 28 deletions(-) diff --git a/src/actors/repositories.rs b/src/actors/repositories.rs index e62379d..90364b7 100644 --- a/src/actors/repositories.rs +++ b/src/actors/repositories.rs @@ -3,9 +3,7 @@ use std::future::Future; use std::pin::Pin; use akton::prelude::*; -use git2::{ - DiffOptions, Repository, Status, StatusOptions, -}; +use git2::{Delta, DiffOptions, Repository, Status, StatusOptions}; use tracing::*; use crate::messages::{ @@ -88,7 +86,6 @@ impl GitRepository { ); let reply_to = event.return_address.clone(); actor.state.handle_poll_request(reply_to) - // actor.state.broadcast_futures(futures) }) .act_on_async::(|actor, event| { let repository_path = &actor.state.repo_info.path; @@ -161,8 +158,9 @@ impl GitRepository { let when: TimeStamp = (&sig.when()).into(); let message_string = &commit_message.to_string(); // TODO: optionally sign commits - let unsigned_commit_buffer = repo - .commit_create_buffer( + let hash = repo + .commit( + Some("HEAD"), &sig, &sig, message_string, @@ -170,18 +168,6 @@ impl GitRepository { &[&parent_commit], ) .expect("Failed to commit"); - - let hash = repo.commit_signed(std::str::from_utf8(&unsigned_commit_buffer).unwrap(), "", None).expect("Failed to sign commit"); - // let hash = repo - // .commit( - // Some("HEAD"), - // &sig, - // &sig, - // message_string, - // &tree, - // &[&parent_commit], - // ) - // .expect("Failed to commit"); let hash = hash.to_string(); let commit_message = commit_message.clone(); @@ -218,15 +204,28 @@ impl GitRepository { let repository_path = &self.repo_info.path; let repo = Repository::open(repository_path).expect("Failed to open repository"); + // Log the raw status output + debug!("Raw git status output:"); + for (i, status) in repo.statuses(None).unwrap().iter().enumerate() { + debug!("Status {}: {:?} - {:?}", i, status.path(), status.status()); + } + let mut status_options = StatusOptions::new(); - status_options.include_untracked(true); - status_options.recurse_untracked_dirs(true); - status_options.include_unreadable_as_untracked(true); + status_options.include_untracked(true) + .recurse_untracked_dirs(true) + .include_ignored(false) + .include_unmodified(false) + .exclude_submodules(false) + .update_index(true) + .renames_from_rewrites(true) + .renames_head_to_index(true) + .renames_index_to_workdir(true); let statuses = repo .statuses(Some(&mut status_options)) .expect("Couldn't get repo statuses"); + debug!("Status count: {}", statuses.len()); let modified_files: Vec = statuses .iter() .filter(|f| { @@ -235,17 +234,14 @@ impl GitRepository { && !status.is_wt_deleted() && status != (Status::INDEX_DELETED | Status::WT_NEW) }) - .map(|entry| { - debug!("index_deleted:{}", entry.status().is_index_deleted()); - error!("worktree_deleted:{}", entry.status().is_wt_deleted()); - entry.path().unwrap().to_string() - }) - .collect::>() + .map(|entry| + entry.path().unwrap().to_string() + ) + .collect::>() .into_iter() .collect(); trace!("modified files vec {:?}", &modified_files); - debug!("*"); let id = self.repo_info.nickname.clone(); let outbound_envelope = outbound_envelope.clone(); Box::pin(async move { From 49f7230e3d12d9f83730b494cf81810c8c99d345 Mon Sep 17 00:00:00 2001 From: Roland Rodriguez Date: Sun, 30 Jun 2024 01:35:36 -0600 Subject: [PATCH 10/15] chore(actors): reorder use statements Reorder the use statements in src/actors.rs for better organization and readability. No functional changes made. --- src/actors.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/actors.rs b/src/actors.rs index 4a8252f..1cfbec3 100644 --- a/src/actors.rs +++ b/src/actors.rs @@ -1,7 +1,8 @@ -pub(crate) use generators::OpenAi; + pub(crate) use ntangler::Ntangler; +pub(crate) use llmclient::LlmClient; -mod generators; mod repositories; mod scribe; mod ntangler; +mod llmclient; From 3d7a2ddceff1e474b42532dc7fbe244732952f31 Mon Sep 17 00:00:00 2001 From: Roland Rodriguez Date: Sun, 30 Jun 2024 01:35:40 -0600 Subject: [PATCH 11/15] chore(build): update package version Update package version from 3.5.3 to 3.6.1. This change ensures the latest features and fixes are included. Removed unused 'demo' feature from the features section. --- Cargo.toml | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index fe46c6a..d1f34d1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntangler" -version = "3.5.3" +version = "3.6.1" edition = "2021" [[bin]] @@ -11,9 +11,6 @@ path = "src/main.rs" version = "0.9" features = ["vendored"] -[features] -demo = [] - [dependencies] git2 = "0.19.0" reqwest = { version = "0.12.4", features = ["json", "rustls-tls"] } From 443119ce0cb250946838c60b233d10611d7fb1cf Mon Sep 17 00:00:00 2001 From: Roland Rodriguez Date: Sun, 30 Jun 2024 01:35:42 -0600 Subject: [PATCH 12/15] refactor(logging): improve logging messages and setup This commit improves the logging messages and setup within the main.rs file. It replaces the direct use of Term::stderr().write_line with tracing's info and error macros for consistent logging. Additionally, it fixes an import duplication issue by removing a redundant use statement for RepositoryConfig. --- src/main.rs | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/src/main.rs b/src/main.rs index 1afce97..f6b0d41 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,3 +1,4 @@ +#![allow(unused)] //TODO: remove use std::{env, fs}; use std::ffi::OsString; use std::path::{Path, PathBuf}; @@ -8,7 +9,7 @@ use anyhow::Result; use console::Term; use serde::Deserialize; use tokio::signal; -use tracing::Level; +use tracing::{error, info, Level}; use tracing_appender::rolling::{RollingFileAppender, Rotation}; use tracing_subscriber::{EnvFilter, FmtSubscriber}; use tracing_subscriber::fmt::format::FmtSpan; @@ -36,10 +37,10 @@ async fn main() -> Result<(), Box> { setup_tracing("ntangler", "config.toml"); if check_openai_api_key() { - Term::stderr() - .write_line("API Key Detected: The OPENAI_API_KEY environment variable is set.")?; + info!("API Key Detected: The OPENAI_API_KEY environment variable is set."); } else { Term::stderr().write_line("Startup Error: The OPENAI_API_KEY environment variable is not set. Please set it to proceed. Consult the documentation to set the API key.")?; + error!("Startup Error: The OPENAI_API_KEY environment variable is not set."); std::process::exit(1); } @@ -47,10 +48,10 @@ async fn main() -> Result<(), Box> { let config_content = fs::read_to_string(&config_path)?; let ntangler_config: NtanglerConfig = toml::from_str(&config_content)?; - Term::stderr().write_line(&format!( + info!( "Configuration Loaded: Config found at {}. Initializing...", config_path.display() - ))?; + ); let (ntangler, _broker) = Ntangler::initialize(ntangler_config).await?; @@ -165,8 +166,8 @@ mod tests { use akton::prelude::ActorContext; use crate::actors::Ntangler; - use crate::models::config::RepositoryConfig; use crate::models::config::NtanglerConfig; + use crate::models::config::RepositoryConfig; use super::*; From 0dad313dcdb898419a07501f870697defebadc58 Mon Sep 17 00:00:00 2001 From: Roland Rodriguez Date: Sun, 30 Jun 2024 01:35:45 -0600 Subject: [PATCH 13/15] refactor(logging): enhance logging setup and messages Refactor the main function to enhance logging setup and messages. Add an additional tracing import for Level. Modify logging messages for API key detection and configuration loading to use tracing's info and error macros. This improves consistency and clarity in logging output. From 7a95cc1c2d38b7db0ecdca0a52261715725cd395 Mon Sep 17 00:00:00 2001 From: Roland Rodriguez Date: Sun, 30 Jun 2024 01:35:47 -0600 Subject: [PATCH 14/15] chore(imports): reorder import statements Reorder import statements in src/actors.rs for better organization and readability. No functional changes introduced. From 472b00f25f8eb0a51293d5711417571392c2bb96 Mon Sep 17 00:00:00 2001 From: Roland Rodriguez Date: Sun, 30 Jun 2024 01:35:50 -0600 Subject: [PATCH 15/15] chore(build): update package version Update package version from 3.5.3 to 3.6.1 in Cargo.toml. Remove unused 'demo' feature from the features section. This change does not impact the functionality of the codebase but ensures the package metadata is up to date.