This repository contains TogetherCrew's Temporal Python workflows for data processing and analysis. It leverages the Temporal workflow engine to orchestrate ETL processes and data summarization tasks.
- Website Ingestion: Extracts, transforms, and loads data from websites defined in the platform configuration.
- MediaWiki Ingestion: Processes content from MediaWiki instances, including extraction of pages, revisions, and content.
- Platform Summaries: Retrieves and processes summaries from Platform data stored in Qdrant, with options to fetch by date or date range.
- Real-Time Summaries: Generates new summaries for recent data across platforms or specific communities.
The project uses Temporal for workflow orchestration with the following components:
- Temporal Server: Manages workflow execution and task queues
- MongoDB: Stores platform and community configuration
- Qdrant: Vector database for storing and retrieving summary content
- Redis: Caching and state management
- PostgreSQL: Used by Temporal for workflow history and state
-
Configure Environment Variables
- Copy the example environment file:
cp .env.example .env
Update the
.env
file with your own values, referencing the services defined indocker-compose.dev.yml
.Required variables:
TEMPORAL_TASK_QUEUE
: Queue name for the worker- Database connection parameters for MongoDB, Qdrant, etc.
-
Start Services
- Use the following command to set up and run the required services:
docker compose -f docker-compose.dev.yml up -d
-
Open localhost:8080 to access the Temporal dashboard.
To fetch existing summaries for a specific community and date range from Qdrant:
from temporalio.client import Client
from hivemind_summarizer.workflows import PlatformSummariesWorkflow
from hivemind_summarizer.schema import PlatformFetchSummariesWorkflowInput
async def run_platform_summaries_workflow():
client = await Client.connect("localhost:7233")
# Create workflow input
input_data = PlatformFetchSummariesWorkflowInput(
platform_id="your_platform_id", # Required: the platform to fetch summaries from
community_id="your_community_id", # Required: the community to fetch summaries from
start_date="2023-05-01", # Optional: fetch summaries from this date
end_date="2023-05-07", # Optional: fetch summaries until this date
extract_text_only=True # Optional: whether to extract only text content
)
# Execute workflow
result = await client.execute_workflow(
PlatformSummariesWorkflow.run,
input_data,
id="platform-summaries-workflow",
task_queue="your_task_queue"
)
# Returns a list of existing summaries from Qdrant
return result
Note: This workflow only retrieves existing summaries that have already been generated and stored in Qdrant. It does not generate new summaries. Use this when you want to access previously generated summaries for a specific platform and community.
To generate new summaries for recent data:
from temporalio.client import Client
from hivemind_summarizer.workflows import RealTimeSummaryWorkflow
from hivemind_summarizer.schema import RealTimeSummaryWorkflowInput
async def run_realtime_summary_workflow():
client = await Client.connect("localhost:7233")
# Create workflow input
input_data = RealTimeSummaryWorkflowInput(
period="4h", # Optional: time period (e.g., "1h", "4h") or date in %Y-%m-%d format
platform_id="your_platform_id", # Optional: filter by platform
community_id="your_community_id", # Optional: filter by community
collection_name="your_collection" # Optional: filter by collection
)
# Execute workflow
result = await client.execute_workflow(
RealTimeSummaryWorkflow.run,
input_data,
id="realtime-summary-workflow",
task_queue="your_task_queue"
)
# Returns newly generated summary text
return result
Note: This workflow actively generates new summaries for recent data. Use this when you want to create fresh summaries for the specified time period and filters. Note 2: Either one of the filter by collection or filter by platform and community should be given. (to identify the collection to access tha raw data)
To process MediaWiki content for all communities or a specific platform:
from temporalio.client import Client
from hivemind_etl.mediawiki.workflows import MediaWikiETLWorkflow
async def run_mediawiki_workflow(platform_id=None):
client = await Client.connect("localhost:7233")
# Execute workflow for all platforms or a specific one
await client.execute_workflow(
MediaWikiETLWorkflow.run,
platform_id, # Pass None to process all platforms
id="mediawiki-etl-workflow",
task_queue="your_task_queue"
)
To ingest content from websites:
from temporalio.client import Client
from hivemind_etl.website.workflows import WebsiteIngestionSchedulerWorkflow
async def run_website_workflow(platform_id=None):
client = await Client.connect("localhost:7233")
# Execute workflow for all communities or a specific one
await client.execute_workflow(
WebsiteIngestionSchedulerWorkflow.run,
platform_id, # Pass None to process all platforms
id="website-ingestion-workflow",
task_queue="your_task_queue"
)
To run the worker locally:
python worker.py
This will start a worker that connects to Temporal and listens for tasks on the configured task queue.