Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/graphviz experimental support #237

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Neo4j Credentials
NEO4J_URI=neo4j://localhost:7687
NEO4J_USERNAME=neo4j
NEO4J_PASSWORD=password

# OpenAI Credentials
OPENAI_API_KEY=your_openai_api_key_here

# Optional: OpenAI Organization ID
# OPENAI_ORG_ID=your_organization_id_here
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,9 @@ docs/build/
.DS_Store
.venv
.tox/

# Test and data files
data/
test_*.py
create_pdf.py
*.svg
161 changes: 161 additions & 0 deletions Taskfile.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
version: '3'

dotenv: ['.env']

vars:
GRAPHVIZ_VERSION: 8.1.0
LOCAL_PREFIX: /home/gonzo/local

tasks:
install-graphviz:
desc: Download, build and install Graphviz from source
cmds:
- mkdir -p {{.LOCAL_PREFIX}}
- |
if [ ! -f "graphviz-{{.GRAPHVIZ_VERSION}}.tar.gz" ]; then
wget https://gitlab.com/api/v4/projects/4207231/packages/generic/graphviz-releases/{{.GRAPHVIZ_VERSION}}/graphviz-{{.GRAPHVIZ_VERSION}}.tar.gz
fi
- tar -xzf graphviz-{{.GRAPHVIZ_VERSION}}.tar.gz
- cd graphviz-{{.GRAPHVIZ_VERSION}} && ./configure --prefix={{.LOCAL_PREFIX}}
- cd graphviz-{{.GRAPHVIZ_VERSION}} && make
- cd graphviz-{{.GRAPHVIZ_VERSION}} && make install

setup-env:
desc: Set up environment variables for Graphviz
cmds:
- |
cat > setup_graphviz_env.sh << 'EOF'
#!/bin/bash
# Add Graphviz libraries to LD_LIBRARY_PATH
export LD_LIBRARY_PATH={{.LOCAL_PREFIX}}/lib:$LD_LIBRARY_PATH
# Add Graphviz binaries to PATH
export PATH={{.LOCAL_PREFIX}}/bin:$PATH
EOF
- chmod +x setup_graphviz_env.sh

install-build-deps:
desc: Install necessary build dependencies
cmds:
- sudo apt-get update
- sudo apt-get install -y build-essential python3-dev

install:
desc: Install the package and dependencies with poetry
cmds:
- poetry install --with dev --extras "all"

setup-kg:
desc: Setup the knowledge graph with medical papers
deps: [install]
cmds:
- poetry run python advanced_patterns/setup_knowledge_graph.py
env:
NEO4J_URI: '{{.NEO4J_URI}}'
NEO4J_USERNAME: '{{.NEO4J_USERNAME}}'
NEO4J_PASSWORD: '{{.NEO4J_PASSWORD}}'
OPENAI_API_KEY: '{{.OPENAI_API_KEY}}'

run-example:
desc: Run the GraphRAG example
deps: [install]
cmds:
- poetry run python advanced_patterns/example.py
env:
NEO4J_URI: '{{.NEO4J_URI}}'
NEO4J_USERNAME: '{{.NEO4J_USERNAME}}'
NEO4J_PASSWORD: '{{.NEO4J_PASSWORD}}'
OPENAI_API_KEY: '{{.OPENAI_API_KEY}}'

clean:
desc: Clean up generated files
cmds:
- rm -rf build/
- rm -rf dist/
- rm -rf *.egg-info/
- find . -type d -name __pycache__ -exec rm -rf {} +
- find . -type f -name "*.pyc" -delete
- find . -type f -name "*.pyo" -delete
- find . -type f -name "*.pyd" -delete

test:
desc: Run tests
deps: [install]
cmds:
- poetry run pytest tests/

lint:
desc: Run linters
deps: [install]
cmds:
- poetry run black .
- poetry run isort .
- poetry run flake8 .

build:
desc: Build the package
deps: [clean]
cmds:
- poetry build

default:
desc: Show available tasks
cmds:
- task --list

all:
desc: Run all setup tasks in order
cmds:
- task: install-graphviz
- task: setup-env
- task: install-build-deps
- task: install
- task: test-graphviz
- task: visualize-graph

test-graphviz:
desc: Test Graphviz installation with a simple Python script
cmds:
- |
cat > test_graphviz.py << 'EOF'
import pygraphviz as pgv
# Create a new graph
G = pgv.AGraph()
# Add some nodes and edges
G.add_edge(1, 2)
G.add_edge(2, 3)
G.add_edge(3, 1)
# Set the layout
G.layout(prog='dot')
# Save the graph as SVG
G.draw('test.svg')
print("Graph has been created and saved as test.svg")
EOF
- env LD_LIBRARY_PATH={{.LOCAL_PREFIX}}/lib PATH={{.LOCAL_PREFIX}}/bin:$PATH poetry run python test_graphviz.py

visualize-graph:
desc: Run the graph visualization script
deps: [install]
env:
LD_LIBRARY_PATH: '{{.LOCAL_PREFIX}}/lib'
PATH: '{{.LOCAL_PREFIX}}/bin:/usr/bin:{{.PATH}}'
cmds:
- poetry run python visualize_graph.py

clean-visualizations:
desc: Remove generated visualization files
cmds:
- rm -f knowledge_graph*.svg

run-advanced-examples:
desc: Run the advanced GraphRAG pattern examples
deps: [install]
env:
LD_LIBRARY_PATH: '{{.LOCAL_PREFIX}}/lib'
PATH: '{{.LOCAL_PREFIX}}/bin:/usr/bin:{{.PATH}}'
cmds:
- poetry run python advanced_patterns/example.py

clean-logs:
desc: Remove generated log files
cmds:
- rm -f graphrag.log
166 changes: 166 additions & 0 deletions advanced_patterns/enhanced_rag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
import logging
from datetime import datetime
from typing import Dict, Any, List
from neo4j_graphrag.generation import GraphRAG
from concurrent.futures import ThreadPoolExecutor
from functools import lru_cache

class EnhancedGraphRAG(GraphRAG):
"""
Enhanced GraphRAG with post-processing capabilities and improved result handling.
"""
def __init__(self, llm, retriever, **kwargs):
super().__init__(llm=llm, retriever=retriever, **kwargs)

def post_process_results(self, results: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Enhance retrieved results with additional information"""
enhanced_results = []

for result in results:
# Add citation information
if "chunk" in result:
with self.retriever.neo4j_driver.session() as session:
citation = session.run("""
MATCH (c:Chunk)-[:PART_OF]->(d:Document)
WHERE id(c) = $chunk_id
RETURN d.title, d.authors, d.year
""", chunk_id=result["chunk"].id).single()

if citation:
result["citation"] = {
"title": citation["d.title"],
"authors": citation["d.authors"],
"year": citation["d.year"]
}

# Group entities by type
if "entities" in result:
grouped_entities = {}
for entity in result["entities"]:
entity_type = entity.type
if entity_type not in grouped_entities:
grouped_entities[entity_type] = []
grouped_entities[entity_type].append(entity)
result["grouped_entities"] = grouped_entities

enhanced_results.append(result)

return enhanced_results

def search(self, query: str, **kwargs) -> Dict[str, Any]:
results = self.retriever.retrieve(query)
enhanced_results = self.post_process_results(results)
return super().search(query, results=enhanced_results, **kwargs)


class OptimizedGraphRAG(GraphRAG):
"""
GraphRAG implementation with performance optimizations.
"""
def __init__(self, llm, retriever, max_workers=4, cache_size=1000, **kwargs):
super().__init__(llm=llm, retriever=retriever, **kwargs)
self.max_workers = max_workers
self.cache_size = cache_size

@lru_cache(maxsize=1000)
def _cached_vector_search(self, query_text: str):
"""Cache vector search results"""
return self.retriever.retrieve(query_text)

def _process_chunk_parallel(self, chunk: Dict[str, Any]) -> Dict[str, Any]:
"""Process a single chunk with enhanced information"""
# Add additional processing here
return chunk

def _process_chunks_parallel(self, chunks: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Process multiple chunks in parallel"""
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = [executor.submit(self._process_chunk_parallel, chunk)
for chunk in chunks]
results = [f.result() for f in futures]
return results

def search(self, query: str, **kwargs) -> Dict[str, Any]:
# Try to get cached results first
results = self._cached_vector_search(query)

# Process results in parallel
processed_results = self._process_chunks_parallel(results)

return super().search(query, results=processed_results, **kwargs)


class MonitoredGraphRAG(GraphRAG):
"""
GraphRAG implementation with monitoring and logging capabilities.
"""
def __init__(self, llm, retriever, **kwargs):
super().__init__(llm=llm, retriever=retriever, **kwargs)
self.logger = logging.getLogger("graphrag")
self.setup_logging()

def setup_logging(self):
"""Configure logging settings"""
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)

# File handler
fh = logging.FileHandler('graphrag.log')
fh.setLevel(logging.INFO)
fh.setFormatter(formatter)

# Stream handler
sh = logging.StreamHandler()
sh.setLevel(logging.WARNING)
sh.setFormatter(formatter)

# Add handlers
self.logger.addHandler(fh)
self.logger.addHandler(sh)

def search(self, query: str, **kwargs) -> Dict[str, Any]:
start_time = datetime.now()

try:
# Track retrieval metrics
retrieval_start = datetime.now()
results = self.retriever.retrieve(query)
retrieval_time = (datetime.now() - retrieval_start).total_seconds()

# Track LLM metrics
llm_start = datetime.now()
response = super().search(query, results=results, **kwargs)
llm_time = (datetime.now() - llm_start).total_seconds()

# Calculate total processing time
total_time = (datetime.now() - start_time).total_seconds()

# Log performance metrics
self.logger.info({
"query": query,
"retrieval_time": retrieval_time,
"llm_time": llm_time,
"total_time": total_time,
"num_results": len(results),
"status": "success"
})

return {
**response,
"metrics": {
"retrieval_time": retrieval_time,
"llm_time": llm_time,
"total_time": total_time,
"num_results": len(results)
}
}

except Exception as e:
self.logger.error({
"query": query,
"error": str(e),
"status": "failed",
"total_time": (datetime.now() - start_time).total_seconds()
})
raise
Loading
Loading