Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5,186 changes: 3,938 additions & 1,248 deletions poetry.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ documentation = "https://github.com/timothepearce/synda"
packages = [{include = "synda"}]

[tool.poetry.dependencies]
python = "^3.10"
python = ">=3.10, <3.13"
pyyaml = "^6.0.2"
pydantic = "^2.10.6"
pandas = "^2.2.3"
Expand All @@ -22,6 +22,7 @@ sqlmodel = "^0.0.22"
typer = {extras = ["all"], version = "^0.15.1"}
scikit-learn = "^1.6.1"
openpyxl = "^3.1.5"
vllm = {version = "0.8.4", extras = ["all"]}
pypdf2 = "^3.0.1"

[tool.poetry.group.dev.dependencies]
Expand Down
6 changes: 3 additions & 3 deletions synda/model/step.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,10 @@ def save_at_execution_end(
return self

def save_during_execution(
self, session: Session, input_node: Node, output_node: Node
self, session: Session, input_nodes: list[Node], output_nodes: list[Node]
) -> "Step":
self._create_nodes_with_ancestors(session, [input_node], [output_node])
self._map_output_nodes_to_step(session, [output_node])
self._create_nodes_with_ancestors(session, input_nodes, output_nodes)
self._map_output_nodes_to_step(session, output_nodes)

return self

Expand Down
10 changes: 5 additions & 5 deletions synda/pipeline/ablation/llm_judge_binary.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import json
import asyncio
from typing import Literal, Optional
from typing import Literal

from pydantic import BaseModel
from sqlmodel import Session
Expand Down Expand Up @@ -63,11 +63,11 @@ def _execute_sequential(self, pending_nodes: list[Node], processed_nodes: list[N

def _execute_batch(self, pending_nodes: list[Node], processed_nodes: list[Node], criteria: list[str]) -> list[Node]:
result = processed_nodes or []
num_batches = (len(pending_nodes) + self.batch_size - 1) // self.batch_size
with self.progress.task(
" Ablating...",
num_batches,
completed=0,
len(pending_nodes) + len(processed_nodes),
completed=len(processed_nodes),
batch_size=self.batch_size
) as advance_node:
loop = asyncio.get_event_loop()
for i in range(0, len(pending_nodes), self.batch_size):
Expand Down Expand Up @@ -136,7 +136,7 @@ def _create_result_node(self, node: Node, judge_answers: list[LLMJudgeCriterionB
result_node = Node(
parent_node_id=node.id, value=node.value, ablated=ablated
)
self.step_model.save_during_execution(self.session, node, result_node)
self.step_model.save_during_execution(self.session, [node], [result_node])
return result_node

def _parse_judge_answer(self, judge_answer_str: str) -> LLMJudgeCriterionBinaryAnswer:
Expand Down
79 changes: 57 additions & 22 deletions synda/pipeline/generation/llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from synda.pipeline.executor import Executor
from synda.model.node import Node
from synda.utils.llm_provider import LLMProvider
from synda.utils.vllm_provider import VLLMProvider
from synda.utils.prompt_builder import PromptBuilder
from synda.progress_manager import ProgressManager

Expand All @@ -27,15 +28,30 @@ def execute(self, pending_nodes: list[Node], processed_nodes: list[Node]):
instruction_mode = self.config.parameters.instruction_mode
pending_nodes = self._build_node_occurrences(pending_nodes, occurrences)

result = processed_nodes or []

if not self.batch or self.batch_size is None:
result = self._execute_sequential(pending_nodes, processed_nodes, template, instruction_sets, instruction_mode)
result = self._execute_sequential(
pending_nodes, processed_nodes, template, instruction_sets, instruction_mode
)
else:
result = self._execute_batch(pending_nodes, processed_nodes, template, instruction_sets, instruction_mode)

result = self._execute_batch(
pending_nodes=pending_nodes,
processed_nodes=processed_nodes,
template=template,
instruction_sets=instruction_sets,
instruction_mode=instruction_mode,
use_vllm="vllm" in self.provider.name
)

return result
def _execute_sequential(self, pending_nodes: list[Node], processed_nodes: list[Node], template: str, instruction_sets: list, instruction_mode: str) -> list[Node]:

def _execute_sequential(
self,
pending_nodes: list[Node],
processed_nodes: list[Node],
template: str,
instruction_sets: list,
instruction_mode: str
) -> list[Node]:
result = processed_nodes or []

with self.progress.task(
Expand All @@ -53,7 +69,15 @@ def _execute_sequential(self, pending_nodes: list[Node], processed_nodes: list[N

return result

def _execute_batch(self, pending_nodes: list[Node], processed_nodes: list[Node], template: str, instruction_sets: list, instruction_mode: str) -> list[Node]:
def _execute_batch(
self,
pending_nodes: list[Node],
processed_nodes: list[Node],
template: str,
instruction_sets: list,
instruction_mode: str,
use_vllm: bool
) -> list[Node]:
result = processed_nodes or []

all_prompts = []
Expand All @@ -63,32 +87,43 @@ def _execute_batch(self, pending_nodes: list[Node], processed_nodes: list[Node],
for prompt in prompts:
all_prompts.append(prompt)
node_map[prompt] = node

num_batches = (len(all_prompts) + self.batch_size - 1) // self.batch_size
with self.progress.task(
"Generating...",
num_batches,
completed=0,
len(pending_nodes) + len(processed_nodes),
completed=len(processed_nodes),
batch_size=self.batch_size
) as advance:
for i in range(0, len(all_prompts), self.batch_size):
batch_prompts = all_prompts[i:i + self.batch_size]
loop = asyncio.get_event_loop()
tasks = [
loop.run_in_executor(
None,
self._call_llm,
prompt
)
for prompt in batch_prompts
]
responses = loop.run_until_complete(asyncio.gather(*tasks))
if use_vllm:
responses = self._call_vllm(prompts=batch_prompts)
else:
loop = asyncio.get_event_loop()
tasks = [
loop.run_in_executor(
None,
self._call_llm,
prompt
)
for prompt in batch_prompts
]
responses = loop.run_until_complete(asyncio.gather(*tasks))
for prompt, response in zip(batch_prompts, responses):
node = node_map[prompt]
result_node = self._create_result_node(node, response, prompt)
result.append(result_node)
advance()

return result

def _call_vllm(self, prompts: list[str]):
return VLLMProvider.call(
provider=self.provider.name,
model=self.model,
api_url=self.provider.api_url,
prompts=prompts,
temperature=self.config.parameters.temperature,
)

def _build_prompts(self, node: Node, template: str, instruction_sets: list, instruction_mode: str) -> list[str]:
return PromptBuilder.build(
Expand Down Expand Up @@ -116,7 +151,7 @@ def _create_result_node(self, node: Node, value: str, metadata: str) -> Node:
node_metadata=metadata,
ancestors={'source': node.id}
)
self.step_model.save_during_execution(self.session, node, result_node)
self.step_model.save_during_execution(self.session, [node], [result_node])

return result_node

Expand Down
9 changes: 7 additions & 2 deletions synda/progress_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,15 @@ def __init__(self, executor_type: str):

@contextmanager
def task(
self, description: str, total: int, completed: int = 0, transient: bool = False
self, description: str, total: int, completed: int = 0, batch_size: int = 1, transient: bool = False
):
with self.progress as progress:
task_id = progress.add_task(
description, total=total, transient=transient, completed=completed
)
yield lambda: progress.advance(task_id)
def safe_advance():
remaining = total - progress.tasks[task_id].completed
progress.advance(task_id, min(batch_size, int(remaining)))

yield safe_advance
# yield lambda: progress.advance(task_id, batch_size)
60 changes: 60 additions & 0 deletions synda/utils/vllm_provider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import logging
import requests

import vllm


logging.getLogger("vllm").setLevel(logging.CRITICAL)

class VLLMProvider:
@classmethod
def call(
cls,
provider: str,
model: str,
api_url: str,
prompts: list[str],
temperature: float = 1.0,
max_tokens: int | None = None
) -> list[str]:
if "hosted_vllm" in provider:
if "v1/chat/completions" in api_url:
payload = {
"model": model,
"messages": [
{"role": "user", "content": prompt} for prompt in prompts
],
"temperature": temperature,
**({"max_tokens": max_tokens} if max_tokens is not None else {})
}
elif "v1/completions" in api_url:
payload = {
"model": model,
"prompt": prompts,
"temperature": temperature,
**({"max_tokens": max_tokens} if max_tokens is not None else {})
}
else:
raise ValueError("vLLM API endpoint not recognized in synda")
return cls._invoke_vllm_rest_api(api_url=api_url, payload=payload)
elif "vllm" in provider:
return cls._invoke_vllm_programmatic_api(prompts=prompts, model=model, temperature=temperature)
raise ValueError(
"To use vLLM for inference, the provider name should contains 'vllm' in its name to use vLLM"
"programmatic API or 'hosted_vllm' to request a vLLM server"
)


@staticmethod
def _invoke_vllm_rest_api(api_url: str, payload: dict[str, str | list | dict]) -> list[str]:
llm_answers = requests.post(url=api_url, json=payload).json()
return [response["text"] for response in llm_answers["choices"]]

@staticmethod
def _invoke_vllm_programmatic_api(
prompts: list[str], model: str, temperature: float, max_tokens: int | None = None
) -> list[str]:
llm = vllm.LLM(model=model)
sampling_params = vllm.SamplingParams(temperature=temperature, max_tokens=max_tokens)
llm_answers = llm.generate(prompts=prompts, sampling_params=sampling_params)
return [response.outputs[0].text for response in llm_answers]
64 changes: 64 additions & 0 deletions tests/stubs/vllm_rag/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
input:
type: csv
properties:
path: tests/stubs/vllm_rag/source.csv
target_column: content
separator: "\t"

pipeline:
- type: split
method: chunk
name: chunk_faq
parameters:
size: 1500

- type: split
method: separator
name: sentence_chunk_faq
parameters:
separator: .
keep_separator: true

- type: generation
method: llm
parameters:
provider: hosted_vllm
model: TinyLlama/TinyLlama-1.1B-Chat-v1.0
occurrences: 1
batch_size: 2
batch: true
template: |
Ask a question regarding the sentence about the content.
content: {chunk_faq}
sentence: {sentence_chunk_faq}

Instructions :
1. Use english only
2. Keep it short

question:

- type: clean
method: deduplicate-tf-idf
parameters:
strategy: fuzzy
similarity_threshold: 0.6
keep: first

- type: ablation
method: llm-judge-binary
parameters:
provider: ollama
model: llama3.2
consensus: all
batch_size: 4
batch: true
criteria:
- Is the question written in english?
- Is the question consistent?

output:
type: csv
properties:
path: tests/stubs/vllm_rag/output.csv
separator: "\t"
4 changes: 4 additions & 0 deletions tests/stubs/vllm_rag/source.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
content
A nebula (Latin for 'cloud, fog';[1] pl.: nebulae, nebulæ, or nebulas[2][3][4][5]) is a distinct luminescent part of interstellar medium, which can consist of ionized, neutral, or molecular hydrogen and also cosmic dust. Nebulae are often star-forming regions, such as in the Pillars of Creation in the Eagle Nebula. In these regions, the formations of gas, dust, and other materials "clump" together to form denser regions, which attract further matter and eventually become dense enough to form stars. The remaining material is then thought to form planets and other planetary system objects.
Most nebulae are of vast size; some are hundreds of light-years in diameter. A nebula that is visible to the human eye from Earth would appear larger, but no brighter, from close by.[6] The Orion Nebula, the brightest nebula in the sky and occupying an area twice the angular diameter of the full Moon, can be viewed with the naked eye but was missed by early astronomers.[7] Although denser than the space surrounding them, most nebulae are far less dense than any vacuum created on Earth (105 to 107 molecules per cubic centimeter) – a nebular cloud the size of the Earth would have a total mass of only a few kilograms. Earth's air has a density of approximately 1019 molecules per cubic centimeter; by contrast, the densest nebulae can have densities of 104 molecules per cubic centimeter. Many nebulae are visible due to fluorescence caused by embedded hot stars, while others are so diffused that they can be detected only with long exposures and special filters. Some nebulae are variably illuminated by T Tauri variable stars.
Originally, the term "nebula" was used to describe any diffused astronomical object, including galaxies beyond the Milky Way. The Andromeda Galaxy, for instance, was once referred to as the Andromeda Nebula (and spiral galaxies in general as "spiral nebulae") before the true nature of galaxies was confirmed in the early 20th century by Vesto Slipher, Edwin Hubble, and others. Edwin Hubble discovered that most nebulae are associated with stars and illuminated by starlight. He also helped categorize nebulae based on the type of light spectra they produced.[8]