Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions app/src/main/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ export const getUserConfig = (path?: string) => {
storedConfig.settings = {
search_engine: 'google',
embedding_model: 'multilingual_small',
embedding_batch_size: 64,
embedding_max_threads: 4,
embedding_max_connections: 8,
tabs_orientation: 'vertical',
app_style: 'light',
use_semantic_search: false,
Expand Down Expand Up @@ -340,6 +343,20 @@ export const getUserConfig = (path?: string) => {
changedConfig = true
}

// Embedding performance settings
if (storedConfig.settings.embedding_batch_size === undefined) {
storedConfig.settings.embedding_batch_size = 64
changedConfig = true
}
if (storedConfig.settings.embedding_max_threads === undefined) {
storedConfig.settings.embedding_max_threads = 4
changedConfig = true
}
if (storedConfig.settings.embedding_max_connections === undefined) {
storedConfig.settings.embedding_max_connections = 8
changedConfig = true
}

if (changedConfig) {
setUserConfig(storedConfig as UserConfig)
}
Expand Down
5 changes: 4 additions & 1 deletion app/src/main/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,10 @@ const setupBackendServer = async (appPath: string, backendRootPath: string, user
surfBackendManager = new SurfBackendServerManager(backendServerPath, [
backendRootPath,
'false',
isDev ? CONFIG.embeddingModelMode : userConfig.settings?.embedding_model
isDev ? CONFIG.embeddingModelMode : userConfig.settings?.embedding_model,
String(userConfig.settings?.embedding_batch_size || 64),
String(userConfig.settings?.embedding_max_threads || 4),
String(userConfig.settings?.embedding_max_connections || 8)
])

surfBackendManager
Expand Down
1 change: 1 addition & 0 deletions packages/backend-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ uds_windows = "1.1.0"
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.20", features = ["env-filter"] }
fastembed = { git = "https://github.com/deta/fastembed-rs", tag = "v3.14.1-patch.1", features = ["ort-download-binaries", "online"] }
rayon = "1.10.0"

[dev-dependencies]
serial_test = "3.2.0"
2 changes: 1 addition & 1 deletion packages/backend-server/src/embeddings/chunking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ mod tests {

#[test]
fn test_sanity_chunker() {
let chunker = ContentChunker::new(100, 1);
let chunker = ContentChunker::new(2500, 1);
let content = "Within endurance running comes two different types of respiration. The more prominent side that runners experience more frequently is aerobic respiration. This occurs when oxygen is present, and the body can utilize oxygen to help generate energy and muscle activity. On the other side, anaerobic respiration occurs when the body is deprived of oxygen, and this is common towards the final stretch of races when there is a drive to speed up to a greater intensity. Overall, both types of respiration are used by endurance runners quite often but are very different from each other. \n

Among mammals, humans are well adapted for running significant distances, particularly so among primates. The capacity for endurance running is also found in migratory ungulates and a limited number of terrestrial carnivores, such as bears, dogs, wolves, and hyenas.
Expand Down
10 changes: 6 additions & 4 deletions packages/backend-server/src/embeddings/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ pub struct EmbeddingModel {
model_name: fastembed::EmbeddingModel,
model: TextEmbedding,
chunker: ContentChunker,
batch_size: usize,
}

fn new_fastembed_model(
Expand All @@ -50,25 +51,26 @@ fn new_fastembed_model(
}

impl EmbeddingModel {
pub fn new_remote(cache_dir: &Path, mode: EmbeddingModelMode) -> BackendResult<Self> {
pub fn new_remote(cache_dir: &Path, mode: EmbeddingModelMode, batch_size: usize) -> BackendResult<Self> {
let model_name: fastembed::EmbeddingModel = mode.into();
let model = new_fastembed_model(cache_dir, model_name.clone(), false)?;
let chunker = ContentChunker::new(2000, 1);
let chunker = ContentChunker::new(2500, 1);

Ok(Self {
model_name,
model,
chunker,
batch_size,
})
}

pub fn get_embedding_dim(&self) -> usize {
TextEmbedding::get_model_info(&self.model_name).dim
}

#[instrument(level = "debug", skip(self, sentences), fields(count = sentences.len()))]
#[instrument(level = "debug", skip(self, sentences), fields(count = sentences.len(), batch_size = self.batch_size))]
pub fn encode(&self, sentences: &[String]) -> BackendResult<Vec<Vec<f32>>> {
self.model.embed(sentences.to_vec(), Some(1)).map_err(|e| {
self.model.embed(sentences.to_vec(), Some(self.batch_size)).map_err(|e| {
error!("Failed to encode {} sentences: {}", sentences.len(), e);
BackendError::GenericError(format!("Error encoding sentences: {}", e))
})
Expand Down
32 changes: 28 additions & 4 deletions packages/backend-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ fn main() {
.ok();

let args: Vec<String> = std::env::args().collect();
if args.len() != 4 {
if args.len() != 7 {
eprintln!(
"Usage: {} <root_path> <local_llm_mode> <embedding_model_mode>",
"Usage: {} <root_path> <local_llm_mode> <embedding_model_mode> <batch_size> <max_threads> <max_connections>",
args[0]
);
std::process::exit(1);
Expand Down Expand Up @@ -74,17 +74,41 @@ fn main() {
std::process::exit(1);
}
};
let batch_size: usize = match args[4].parse() {
Ok(size) => size,
Err(e) => {
eprintln!("Bad batch_size: {:#?}, error: {:#?}", args[4], e);
std::process::exit(1);
}
};
let max_threads: usize = match args[5].parse() {
Ok(threads) => threads,
Err(e) => {
eprintln!("Bad max_threads: {:#?}, error: {:#?}", args[5], e);
std::process::exit(1);
}
};
let max_connections: usize = match args[6].parse() {
Ok(connections) => connections,
Err(e) => {
eprintln!("Bad max_connections: {:#?}, error: {:#?}", args[6], e);
std::process::exit(1);
}
};

info!(
"started with socket_path: {:#?}, local_llm_mode: {:#?}",
socket_path, local_llm_mode
"started with socket_path: {:#?}, local_llm_mode: {:#?}, batch_size: {}, max_threads: {}, max_connections: {}",
socket_path, local_llm_mode, batch_size, max_threads, max_connections
);
let server = LocalAIServer::new(
&socket_path,
&index_path,
&model_cache_dir,
local_llm_mode,
embedding_model_mode,
batch_size,
max_threads,
max_connections,
)
.expect("failed to create new server");

Expand Down
29 changes: 28 additions & 1 deletion packages/backend-server/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub struct LocalAIServer {
index_path: String,
embedding_model: Arc<EmbeddingModel>,
listener: UnixListener,
max_connections: usize,
}

impl LocalAIServer {
Expand All @@ -32,6 +33,9 @@ impl LocalAIServer {
model_cache_dir: &Path,
local_llm: bool,
embedding_model_mode: EmbeddingModelMode,
batch_size: usize,
max_threads: usize,
max_connections: usize,
) -> BackendResult<Self> {
if socket_path.exists() {
fs::remove_file(socket_path)?;
Expand All @@ -45,16 +49,24 @@ impl LocalAIServer {
));
}

// Configure rayon thread pool for parallel processing
rayon::ThreadPoolBuilder::new()
.num_threads(max_threads)
.build_global()
.map_err(|e| BackendError::GenericError(format!("Failed to configure thread pool: {}", e)))?;

let embedding_model = Arc::new(EmbeddingModel::new_remote(
model_cache_dir,
embedding_model_mode,
batch_size,
)?);

Ok(Self {
socket_path: socket_path.to_string_lossy().to_string(),
index_path: index_path.to_string_lossy().to_string(),
embedding_model,
listener,
max_connections,
})
}

Expand Down Expand Up @@ -117,7 +129,7 @@ impl LocalAIServer {
}

pub fn listen(&self) {
info!(socket_path = ?self.socket_path, "server starting");
info!(socket_path = ?self.socket_path, max_connections = self.max_connections, "server starting");
let (tx, rx) = mpsc::channel();

let index_path = self.index_path.clone();
Expand All @@ -127,17 +139,32 @@ impl LocalAIServer {
Self::handle_main_thread_messages(rx, &index_path, &embedding_dim)
});

// Create a channel-based semaphore to limit concurrent connections
let (permit_tx, permit_rx) = mpsc::sync_channel(self.max_connections);
// Fill the channel with permits
for _ in 0..self.max_connections {
let _ = permit_tx.send(());
}

info!("listening for incoming connections");
for stream in self.listener.incoming() {
match stream {
Ok(stream) => {
let embedding_model = Arc::clone(&self.embedding_model);
let tx = tx.clone();
let permit_tx_clone = permit_tx.clone();
let permit_rx_clone = permit_rx.clone();

std::thread::spawn(move || {
// Acquire permit before processing (blocks if none available)
let _permit = permit_rx_clone.recv();

if let Err(e) = handle_client(tx, &embedding_model, stream) {
error!(?e, "client handler error");
}

// Return permit when done
let _ = permit_tx_clone.send(());
});
}
Err(e) => {
Expand Down
7 changes: 6 additions & 1 deletion packages/backend/src/worker/handlers/resource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,12 @@ impl Worker {
// NOTE: for Note content type for performance reasons we do not generate the embeddings
// right away as updates are too frequent but instead do it lazily only when we need it
// we thefore add a tag to the resource indicating that the resource needs post processing
if content_type == ResourceTextContentType::Note {
// Use lazy embeddings for Notes, PDFs, Documents, and Articles to reduce CPU load
if content_type == ResourceTextContentType::Note
|| content_type == ResourceTextContentType::PDF
|| content_type == ResourceTextContentType::Document
|| content_type == ResourceTextContentType::Article
{
let generate_embeddings_tag = ResourceTag::new_generate_lazy_embeddings(&resource_id);
Database::create_resource_tag_tx(&mut tx, &generate_embeddings_tag)?;
tx.commit()?;
Expand Down
3 changes: 3 additions & 0 deletions packages/types/src/config.types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ export type UserConfig = {

export type UserSettings = {
embedding_model: 'english_small' | 'english_large' | 'multilingual_small' | 'multilingual_large'
embedding_batch_size: number
embedding_max_threads: number
embedding_max_connections: number
tabs_orientation: 'vertical' | 'horizontal'
app_style: 'light' | 'dark' // Note intentionally used app_style as "app_theme" would be themes in the future?
use_semantic_search: boolean
Expand Down