From 1e3eb7d5e160a9b5b01cc466ec48232888cdc90f Mon Sep 17 00:00:00 2001 From: Vasu Bansal Date: Sat, 28 Mar 2026 17:21:01 +0530 Subject: [PATCH 1/4] feat(rss-twitter): migrate Playwright agent to v0.6 template layout --- .../templates/rss_twitter_agent/README.md | 52 ++++ .../templates/rss_twitter_agent/__init__.py | 5 + .../templates/rss_twitter_agent/__main__.py | 74 +++++ examples/templates/rss_twitter_agent/agent.py | 179 ++++++++++++ .../templates/rss_twitter_agent/config.py | 59 ++++ .../rss_twitter_agent/credentials.py | 59 ++++ examples/templates/rss_twitter_agent/fetch.py | 257 ++++++++++++++++++ .../rss_twitter_agent/nodes/__init__.py | 61 +++++ examples/templates/rss_twitter_agent/run.py | 159 +++++++++++ .../templates/rss_twitter_agent/test_flow.py | 120 ++++++++ .../templates/rss_twitter_agent/twitter.py | 253 +++++++++++++++++ 11 files changed, 1278 insertions(+) create mode 100644 examples/templates/rss_twitter_agent/README.md create mode 100644 examples/templates/rss_twitter_agent/__init__.py create mode 100644 examples/templates/rss_twitter_agent/__main__.py create mode 100644 examples/templates/rss_twitter_agent/agent.py create mode 100644 examples/templates/rss_twitter_agent/config.py create mode 100644 examples/templates/rss_twitter_agent/credentials.py create mode 100644 examples/templates/rss_twitter_agent/fetch.py create mode 100644 examples/templates/rss_twitter_agent/nodes/__init__.py create mode 100644 examples/templates/rss_twitter_agent/run.py create mode 100644 examples/templates/rss_twitter_agent/test_flow.py create mode 100644 examples/templates/rss_twitter_agent/twitter.py diff --git a/examples/templates/rss_twitter_agent/README.md b/examples/templates/rss_twitter_agent/README.md new file mode 100644 index 0000000000..5b4ef3a0c0 --- /dev/null +++ b/examples/templates/rss_twitter_agent/README.md @@ -0,0 +1,52 @@ +# RSS-to-Twitter Agent (Playwright) + +This template keeps the original behavior: + +1. Fetch RSS news +2. Summarize with Ollama +3. Ask you `y/n/q` per thread +4. If `y`, auto-open Twitter/X and post via Playwright + +Updated for Hive v0.6+ project layout and credential namespace support. + +## Run + +From repo root: + +```bash +cd /Users/vasu/Desktop/hive +uv run python -m examples.templates.rss_twitter_agent run \ + --feed-url "https://news.ycombinator.com/rss" \ + --max-articles 3 +``` + +Optional credential ref (v0.6 format): + +```bash +uv run python -m examples.templates.rss_twitter_agent run \ + --feed-url "https://news.ycombinator.com/rss" \ + --max-articles 3 \ + --twitter-credential-ref twitter/default +``` + +## Validate / Info + +```bash +uv run python -m examples.templates.rss_twitter_agent validate +uv run python -m examples.templates.rss_twitter_agent info +``` + +## Ollama prerequisites + +```bash +ollama serve +ollama pull llama3.2 +``` + +## Behavior notes + +- First posting run opens browser login and stores session. +- Later runs reuse session automatically. +- You can override session path with `HIVE_TWITTER_SESSION_DIR`. +- Credential reference uses `{name}/{alias}` (example: `twitter/default`). +- Interactive review still uses per-thread `y/n/q` approval before posting. diff --git a/examples/templates/rss_twitter_agent/__init__.py b/examples/templates/rss_twitter_agent/__init__.py new file mode 100644 index 0000000000..f147aaa51f --- /dev/null +++ b/examples/templates/rss_twitter_agent/__init__.py @@ -0,0 +1,5 @@ +"""RSS-to-Twitter Playwright agent (Hive v0.6-compatible package).""" + +from .agent import RSSTwitterAgent, default_agent + +__all__ = ["RSSTwitterAgent", "default_agent"] diff --git a/examples/templates/rss_twitter_agent/__main__.py b/examples/templates/rss_twitter_agent/__main__.py new file mode 100644 index 0000000000..77282b02d0 --- /dev/null +++ b/examples/templates/rss_twitter_agent/__main__.py @@ -0,0 +1,74 @@ +"""CLI for RSS-to-Twitter Playwright agent.""" + +from __future__ import annotations + +import asyncio +import json +import sys + +import click + +from .agent import default_agent +from .run import run_interactive + + +@click.group() +@click.version_option(version="1.1.0") +def cli() -> None: + """RSS-to-Twitter Playwright agent.""" + + +@cli.command() +@click.option( + "--feed-url", default="https://news.ycombinator.com/rss", show_default=True +) +@click.option("--max-articles", default=3, show_default=True, type=int) +@click.option( + "--twitter-credential-ref", + default=None, + help="Hive credential reference in {name}/{alias} format (example: twitter/default).", +) +def run(feed_url: str, max_articles: int, twitter_credential_ref: str | None) -> None: + """Run the interactive RSS -> summarize -> approve -> post flow.""" + summary = asyncio.run( + run_interactive( + feed_url=feed_url, + max_articles=max_articles, + twitter_credential_ref=twitter_credential_ref, + ) + ) + click.echo(json.dumps(summary, indent=2, default=str)) + sys.exit(0) + + +@cli.command() +def validate() -> None: + """Validate basic graph structure metadata.""" + result = default_agent.validate() + if result["valid"]: + click.echo("Agent is valid") + return + click.echo("Agent has errors:") + for err in result["errors"]: + click.echo(f" ERROR: {err}") + sys.exit(1) + + +@cli.command() +@click.option("--json", "output_json", is_flag=True) +def info(output_json: bool) -> None: + """Show agent metadata.""" + data = default_agent.info() + if output_json: + click.echo(json.dumps(data, indent=2)) + return + click.echo(f"Agent: {data['name']}") + click.echo(f"Version: {data['version']}") + click.echo(f"Description: {data['description']}") + click.echo(f"Nodes: {', '.join(data['nodes'])}") + click.echo(f"Entry: {data['entry_node']}") + click.echo(f"Terminal: {', '.join(data['terminal_nodes'])}") + + +if __name__ == "__main__": + cli() diff --git a/examples/templates/rss_twitter_agent/agent.py b/examples/templates/rss_twitter_agent/agent.py new file mode 100644 index 0000000000..d9ec1794b8 --- /dev/null +++ b/examples/templates/rss_twitter_agent/agent.py @@ -0,0 +1,179 @@ +"""Agent metadata + simple execution wrapper for RSS-to-Twitter Playwright flow.""" + +from __future__ import annotations + +from framework.graph import Constraint, EdgeCondition, EdgeSpec, Goal, SuccessCriterion +from framework.graph.executor import ExecutionResult + +from .config import metadata, validate_ollama +from .nodes import approve_node, fetch_node, generate_node, post_node, process_node +from .run import run_workflow + + +goal = Goal( + id="rss-to-twitter", + name="RSS-to-Twitter Content Repurposing", + description=( + "Fetch articles from RSS feeds, summarize them, generate engaging Twitter threads, " + "ask for explicit user approval, and post approved threads via Playwright." + ), + success_criteria=[ + SuccessCriterion( + id="feed-parsing", + description="Agent fetches and parses at least one feed item", + metric="article_count", + target=">=1", + weight=0.3, + ), + SuccessCriterion( + id="thread-quality", + description="Generated threads contain structured tweets with CTA and link", + metric="thread_count", + target=">=1", + weight=0.35, + ), + SuccessCriterion( + id="approval-gate", + description="User explicitly approves/rejects each thread", + metric="approval_present", + target="true", + weight=0.2, + ), + SuccessCriterion( + id="posting", + description="Approved threads are posted through Playwright", + metric="post_success", + target="true when approved", + weight=0.15, + ), + ], + constraints=[ + Constraint( + id="human-approval-required", + description="Posting requires explicit human y/n decision", + constraint_type="safety", + category="approval", + ), + Constraint( + id="source-attribution", + description="Threads should include source links", + constraint_type="quality", + category="content", + ), + ], +) + +nodes = [fetch_node, process_node, generate_node, approve_node, post_node] +edges = [ + EdgeSpec( + id="fetch-to-process", + source="fetch", + target="process", + condition=EdgeCondition.ON_SUCCESS, + priority=1, + ), + EdgeSpec( + id="process-to-generate", + source="process", + target="generate", + condition=EdgeCondition.ON_SUCCESS, + priority=1, + ), + EdgeSpec( + id="generate-to-approve", + source="generate", + target="approve", + condition=EdgeCondition.ON_SUCCESS, + priority=1, + ), + EdgeSpec( + id="approve-to-post", + source="approve", + target="post", + condition=EdgeCondition.ON_SUCCESS, + priority=1, + ), +] + +entry_node = "fetch" +entry_points = {"start": "fetch"} +terminal_nodes = ["post"] + + +class RSSTwitterAgent: + """Lightweight wrapper preserving the original interactive Playwright workflow.""" + + def __init__(self): + self.goal = goal + self.nodes = nodes + self.edges = edges + self.entry_node = entry_node + self.entry_points = entry_points + self.terminal_nodes = terminal_nodes + + async def start(self) -> None: + ok, msg = validate_ollama() + if not ok: + raise RuntimeError(msg) + + async def stop(self) -> None: + return None + + async def trigger_and_wait( + self, entry_point: str, input_data: dict, timeout: float | None = None + ) -> ExecutionResult: + feed_url = str(input_data.get("feed_url") or "https://news.ycombinator.com/rss") + max_articles = int(input_data.get("max_articles") or 3) + twitter_credential_ref = input_data.get("twitter_credential_ref") + workflow = await run_workflow( + feed_url=feed_url, + max_articles=max_articles, + twitter_credential_ref=( + str(twitter_credential_ref) if twitter_credential_ref else None + ), + ) + + return ExecutionResult( + success=True, + output={ + "articles_json": workflow["articles_json"], + "processed_json": workflow["processed_json"], + "threads_json": workflow["threads_json"], + "approved_json": workflow["approved_json"], + "results_json": workflow["results_json"], + }, + steps_executed=5, + ) + + async def run(self, context: dict) -> ExecutionResult: + await self.start() + try: + return await self.trigger_and_wait("start", context) + finally: + await self.stop() + + def info(self) -> dict: + return { + "name": metadata.name, + "version": metadata.version, + "description": metadata.description, + "goal": {"name": self.goal.name, "description": self.goal.description}, + "nodes": [n.id for n in self.nodes], + "entry_node": self.entry_node, + "terminal_nodes": self.terminal_nodes, + } + + def validate(self) -> dict: + errors: list[str] = [] + node_ids = {n.id for n in self.nodes} + if self.entry_node not in node_ids: + errors.append(f"Entry node '{self.entry_node}' not found") + for edge in self.edges: + if edge.source not in node_ids: + errors.append(f"Edge {edge.id}: source '{edge.source}' not found") + if edge.target not in node_ids: + errors.append(f"Edge {edge.id}: target '{edge.target}' not found") + return {"valid": not errors, "errors": errors, "warnings": []} + + +default_agent = RSSTwitterAgent() diff --git a/examples/templates/rss_twitter_agent/config.py b/examples/templates/rss_twitter_agent/config.py new file mode 100644 index 0000000000..61d604bb66 --- /dev/null +++ b/examples/templates/rss_twitter_agent/config.py @@ -0,0 +1,59 @@ +"""Runtime configuration for RSS-to-Twitter Agent with Ollama.""" + +from __future__ import annotations + +import os +from dataclasses import dataclass, field + +import httpx + +OLLAMA_URL = os.environ.get("OLLAMA_URL", "http://localhost:11434") +DEFAULT_MODEL = os.environ.get("LLM_MODEL", "ollama/llama3.2") + + +def _check_ollama_running() -> bool: + """Check if Ollama is running locally.""" + try: + with httpx.Client() as client: + resp = client.get(f"{OLLAMA_URL}/api/tags", timeout=2.0) + return resp.status_code == 200 + except Exception: + return False + + +def _get_model() -> str: + return DEFAULT_MODEL + + +@dataclass +class RuntimeConfig: + model: str = field(default_factory=_get_model) + temperature: float = 0.7 + max_tokens: int = 8000 + api_key: str | None = os.environ.get("LLM_API_KEY") + api_base: str | None = os.environ.get("LLM_API_BASE") + + +default_config = RuntimeConfig() + + +@dataclass +class AgentMetadata: + name: str = "RSS-to-Twitter Agent" + version: str = "1.1.0" + description: str = ( + "Automated content repurposing from RSS feeds to Twitter threads. " + "Uses Ollama for local LLM inference and Playwright for automated posting." + ) + + +metadata = AgentMetadata() + + +def validate_ollama() -> tuple[bool, str]: + if not _check_ollama_running(): + return ( + False, + "Ollama is not running. Start it with `ollama serve` and ensure your model is pulled.", + ) + return True, "" diff --git a/examples/templates/rss_twitter_agent/credentials.py b/examples/templates/rss_twitter_agent/credentials.py new file mode 100644 index 0000000000..862bd11c31 --- /dev/null +++ b/examples/templates/rss_twitter_agent/credentials.py @@ -0,0 +1,59 @@ +"""Credential helpers for RSS Twitter Playwright agent (Hive v0.6+).""" + +from __future__ import annotations + +import json +import os +from pathlib import Path +from typing import Any + + +def _extract_session_dir(payload: Any) -> str | None: + if isinstance(payload, dict): + for key in ( + "session_dir", + "user_data_dir", + "twitter_session_dir", + "playwright_user_data_dir", + "path", + "value", + ): + value = payload.get(key) + if isinstance(value, str) and value.strip(): + return value.strip() + elif isinstance(payload, str): + raw = payload.strip() + if not raw: + return None + if raw.startswith("{"): + try: + obj = json.loads(raw) + return _extract_session_dir(obj) + except json.JSONDecodeError: + return None + return raw + return None + + +def resolve_twitter_session_dir(credential_ref: str | None = None) -> str: + """Resolve session dir from env first, then Hive credential store.""" + env_dir = os.environ.get("HIVE_TWITTER_SESSION_DIR") + if env_dir: + return str(Path(env_dir).expanduser()) + + ref = credential_ref or os.environ.get("TWITTER_CREDENTIAL_REF") + if ref and "/" in ref: + try: + from framework.credentials.store import CredentialStore + + store = CredentialStore.with_encrypted_storage( + Path.home() / ".hive" / "credentials" + ) + value = store.get(ref) + resolved = _extract_session_dir(value) + if resolved: + return str(Path(resolved).expanduser()) + except Exception: + pass + + return str(Path.home() / ".hive" / "twitter_session") diff --git a/examples/templates/rss_twitter_agent/fetch.py b/examples/templates/rss_twitter_agent/fetch.py new file mode 100644 index 0000000000..1f57d503b8 --- /dev/null +++ b/examples/templates/rss_twitter_agent/fetch.py @@ -0,0 +1,257 @@ +"""RSS fetch + LLM summarization + approval + post helpers.""" + +from __future__ import annotations + +import json +import os +import xml.etree.ElementTree as ET + +import httpx + + +def fetch_rss( + feed_url: str = "https://news.ycombinator.com/rss", max_articles: int = 3 +) -> str: + """Fetch RSS feed and return parsed articles as JSON string.""" + try: + with httpx.Client() as client: + resp = client.get(feed_url, timeout=10.0, follow_redirects=True) + resp.raise_for_status() + xml_content = resp.text + except Exception: + return json.dumps([]) + + try: + root = ET.fromstring(xml_content) + except ET.ParseError: + return json.dumps([]) + + articles = [] + for item in root.findall(".//item")[:max_articles]: + title_elem = item.find("title") + link_elem = item.find("link") + desc_elem = item.find("description") + + article = { + "title": title_elem.text if title_elem is not None else "", + "link": link_elem.text if link_elem is not None else "", + "summary": ( + desc_elem.text[:150] if desc_elem is not None and desc_elem.text else "" + ), + "source": "Hacker News", + } + articles.append(article) + + return json.dumps(articles) + + +def _call_ollama(prompt: str, max_tokens: int = 800) -> str: + """Call Ollama API directly via HTTP.""" + model = os.environ.get("OLLAMA_MODEL", "llama3.2") + try: + with httpx.Client() as client: + resp = client.post( + "http://localhost:11434/api/generate", + json={ + "model": model, + "prompt": prompt, + "stream": False, + "options": {"num_predict": max_tokens, "temperature": 0.75}, + }, + timeout=90.0, + ) + resp.raise_for_status() + data = resp.json() + return data.get("response", "[]") + except Exception as e: + print(f"Ollama error: {e}") + return "[]" + + +def summarize_articles(articles_json: str) -> str: + """Summarize articles into rich tweet-ready format using Ollama.""" + articles = json.loads(articles_json) if articles_json else [] + if not articles: + return json.dumps([]) + + prompt = f"""You are a tech journalist. For each article below, extract rich context for Twitter threads. +Return ONLY a JSON array โ€” one object per article โ€” with this exact format: +[ + {{ + "title": "article title", + "url": "article url", + "hook": "one punchy sentence that grabs attention โ€” a surprising fact, bold claim, or question", + "points": ["key insight 1", "key insight 2", "key insight 3"], + "why_it_matters": "one sentence on why this is important or interesting", + "hashtags": ["#Tag1", "#Tag2", "#Tag3"] + }} +] + +Articles: +{json.dumps(articles, indent=2)} + +Return ONLY the JSON array, no other text:""" + + text = _call_ollama(prompt, 900) + + start = text.find("[") + end = text.rfind("]") + 1 + if start >= 0 and end > start: + try: + parsed = json.loads(text[start:end]) + if isinstance(parsed, list) and parsed: + return json.dumps(parsed) + except json.JSONDecodeError: + pass + + return json.dumps( + [ + { + "title": a["title"], + "url": a["link"], + "hook": a["title"], + "points": [a.get("summary", "")[:150]], + "why_it_matters": "", + "hashtags": ["#Tech"], + } + for a in articles + ] + ) + + +def _generate_thread_for_article(summary: dict) -> dict | None: + """Generate one high-quality Twitter thread for a single article.""" + title = summary.get("title", "News") + url = summary.get("url", "") + hook = summary.get("hook", title) + points = summary.get("points", []) + why = summary.get("why_it_matters", "") + hashtags = " ".join(summary.get("hashtags", ["#Tech"])[:3]) + + prompt = f"""You are a viral tech Twitter personality. Write an engaging 4-tweet thread about this article. + +Article: {title} +URL: {url} +Hook: {hook} +Key points: {json.dumps(points)} +Why it matters: {why} +Hashtags to use: {hashtags} + +Rules: +- Tweet 1: Start with ๐Ÿงต and a PUNCHY hook โ€” a bold claim, surprising fact, or provocative question. Max 240 chars. +- Tweet 2: Start with \"1/\" โ€” explain the core idea in plain English. Conversational, not corporate. Max 260 chars. +- Tweet 3: Start with \"2/\" โ€” give the most interesting insight or implication. Use an emoji. Max 260 chars. +- Tweet 4: Start with \"3/\" โ€” why this matters + call to action + the URL + hashtags. Max 280 chars. +- Sound like a real person, not a press release. + +Return ONLY a JSON object with this exact structure (no other text): +{{"title": "{title[:60]}", "tweets": ["tweet1", "tweet2", "tweet3", "tweet4"]}}""" + + text = _call_ollama(prompt, 700) + + start = text.find("{") + end = text.rfind("}") + 1 + if start >= 0 and end > start: + try: + obj = json.loads(text[start:end]) + if isinstance(obj, dict) and "tweets" in obj and len(obj["tweets"]) >= 3: + return obj + except json.JSONDecodeError: + pass + + tweets = [ + f"๐Ÿงต {hook[:220]}", + f"1/ {points[0][:250]}" if points else f"1/ {title[:250]}", + f"2/ {points[1][:240]} ๐Ÿ’ก" if len(points) > 1 else f"2/ {why[:240]} ๐Ÿ’ก", + f"3/ Why it matters: {why[:150]}\n\n{url}\n\n{hashtags}", + ] + return {"title": title[:60], "tweets": tweets} + + +def generate_tweets(processed_json: str) -> str: + """Generate one engaging Twitter thread per article.""" + summaries = json.loads(processed_json) if processed_json else [] + if not summaries: + return json.dumps([]) + + threads = [] + for i, summary in enumerate(summaries): + print( + f" Generating thread {i + 1}/{len(summaries)}: {summary.get('title', '')[:50]}..." + ) + thread = _generate_thread_for_article(summary) + if thread: + threads.append(thread) + + return json.dumps(threads) + + +def approve_threads(threads_json: str) -> str: + """Display threads and ask user which ones to post. + + Preserves the legacy interactive prompt shape, including `q` to stop + reviewing additional threads while keeping earlier approvals. + """ + threads = json.loads(threads_json) if threads_json else [] + if not threads: + print("No threads to review.") + return json.dumps([]) + + print("\n" + "=" * 60) + print("GENERATED TWEET THREADS") + print("=" * 60 + "\n") + + approved = [] + for i, thread in enumerate(threads, 1): + title = thread.get("title", "Untitled") + tweets = thread.get("tweets", []) + + print(f"\n--- Thread {i}: {title[:50]}{'...' if len(title) > 50 else ''} ---\n") + + for j, tweet in enumerate(tweets, 1): + prefix = "๐Ÿงต" if j == 1 else f"{j}/" + print(f" {prefix} {tweet}") + if len(tweet) > 280: + print(f" WARNING: {len(tweet)} chars (over 280)") + + print() + try: + response = input("Post this thread? (y/n/q): ").strip().lower() + except EOFError: + response = "n" + + if response == "y": + approved.append(thread) + print(" Approved") + elif response == "q": + print(" Quitting...") + break + else: + print(" Skipped") + + print("\n" + "=" * 60) + print(f"APPROVED: {len(approved)}/{len(threads)} threads") + print("=" * 60 + "\n") + + return json.dumps(approved) + + +async def post_to_twitter(approved_json: str) -> str: + """Post approved threads via Playwright automation.""" + threads = json.loads(approved_json) if approved_json else [] + if not threads: + print("No threads to post.") + return json.dumps({"success": False, "error": "No threads"}) + + print(f"\nPosting {len(threads)} thread(s) via Playwright automation...") + print("Browser will open. First run requires manual login.\n") + + from .twitter import post_threads_impl + + credential_ref = os.environ.get("TWITTER_CREDENTIAL_REF") + result = await post_threads_impl(approved_json, None, credential_ref=credential_ref) + return ( + json.dumps(result) + if isinstance(result, dict) + else json.dumps({"success": False, "error": str(result)}) + ) diff --git a/examples/templates/rss_twitter_agent/nodes/__init__.py b/examples/templates/rss_twitter_agent/nodes/__init__.py new file mode 100644 index 0000000000..8c935b0c23 --- /dev/null +++ b/examples/templates/rss_twitter_agent/nodes/__init__.py @@ -0,0 +1,61 @@ +"""Node definitions for RSS-to-Twitter Agent - simple function nodes.""" + +from framework.graph import NodeSpec + +fetch_node = NodeSpec( + id="fetch", + name="Fetch RSS", + description="Fetch and parse RSS feeds", + node_type="function", + client_facing=False, + input_keys=[], + output_keys=["articles_json"], +) + +process_node = NodeSpec( + id="process", + name="Summarize", + description="Summarize articles into key points", + node_type="function", + client_facing=False, + input_keys=["articles_json"], + output_keys=["processed_json"], +) + +generate_node = NodeSpec( + id="generate", + name="Generate Tweets", + description="Generate tweet drafts", + node_type="function", + client_facing=False, + input_keys=["processed_json"], + output_keys=["threads_json"], +) + +approve_node = NodeSpec( + id="approve", + name="Approve Threads", + description="Show threads and get user approval", + node_type="function", + client_facing=False, + input_keys=["threads_json"], + output_keys=["approved_json"], +) + +post_node = NodeSpec( + id="post", + name="Post to Twitter", + description="Post approved threads via Playwright", + node_type="function", + client_facing=False, + input_keys=["approved_json"], + output_keys=["results_json"], +) + +__all__ = [ + "fetch_node", + "process_node", + "generate_node", + "approve_node", + "post_node", +] diff --git a/examples/templates/rss_twitter_agent/run.py b/examples/templates/rss_twitter_agent/run.py new file mode 100644 index 0000000000..8a3445b0dc --- /dev/null +++ b/examples/templates/rss_twitter_agent/run.py @@ -0,0 +1,159 @@ +#!/usr/bin/env python +"""RSS-to-Twitter interactive workflow runner (legacy behavior preserved).""" + +from __future__ import annotations + +import asyncio +import json +import os +from typing import Any + +from .fetch import ( + _generate_thread_for_article, + fetch_rss, + post_to_twitter, + summarize_articles, +) + + +def _render_thread_preview(thread: dict[str, Any], index: int, total: int) -> None: + """Show the generated thread exactly as the legacy interactive flow did.""" + article_title = thread.get("title", "Untitled") + tweets = thread.get("tweets", []) + + print(f"\n{'=' * 60}") + print( + f"Article {index}/{total}: " + f"{article_title[:50]}{'...' if len(article_title) > 50 else ''}" + ) + print("=" * 60) + + print() + for tweet_index, tweet in enumerate(tweets, 1): + tweet_text = tweet.get("text", tweet) if isinstance(tweet, dict) else tweet + prefix = "๐Ÿงต" if tweet_index == 1 else f"{tweet_index}/" + print(f" {prefix} {tweet_text}") + if len(tweet_text) > 280: + print(f" WARNING: {len(tweet_text)} chars (over 280)") + + +async def run_workflow( + feed_url: str = "https://news.ycombinator.com/rss", + max_articles: int = 3, + twitter_credential_ref: str | None = None, +) -> dict[str, Any]: + """Run the original sequential RSS -> summarize -> approve -> post flow.""" + if twitter_credential_ref: + os.environ["TWITTER_CREDENTIAL_REF"] = twitter_credential_ref + + print("=" * 60) + print("RSS-to-Twitter Agent") + print("=" * 60 + "\n") + + print("Generates tweets from RSS feeds and posts automatically.\n") + print("Uses Playwright for browser automation.\n") + + print("1. Fetching RSS...") + articles_json = fetch_rss(feed_url=feed_url, max_articles=max_articles) + articles = json.loads(articles_json) + print(f" Fetched {len(articles)} articles\n") + + print("2. Summarizing articles...") + summaries_json = summarize_articles(articles_json) + summaries = json.loads(summaries_json) + print(f" Summarized {len(summaries)} articles\n") + + total_posted = 0 + reviewed = 0 + results = [] + generated_threads = [] + approved_threads = [] + + for i, summary in enumerate(summaries): + print(" Generating thread...") + thread = _generate_thread_for_article(summary) + if not thread: + print(" Could not generate thread, skipping.") + continue + generated_threads.append(thread) + + _render_thread_preview(thread, i + 1, len(summaries)) + print() + try: + response = input("Post this thread? (y/n/q): ").strip().lower() + except EOFError: + response = "n" + + reviewed += 1 + if response == "q": + print(" Quitting...") + break + if response != "y": + print(" Skipped") + continue + + approved_threads.append(thread) + print(" Posting...\n") + result_json = await post_to_twitter(json.dumps([thread])) + result = json.loads(result_json) + results.append(result) + if isinstance(result, dict) and result.get("success"): + total_posted += 1 + print(" Posted") + else: + err = ( + result.get("error", "Unknown error") + if isinstance(result, dict) + else result + ) + print(f" Error: {err}") + + if i < len(summaries) - 1: + try: + input("\nPress Enter for next thread...") + except EOFError: + pass + + print(f"\n{'=' * 60}") + print(f"Done! Posted {total_posted}/{reviewed} reviewed threads.") + print("=" * 60) + + return { + "success": True, + "feed_url": feed_url, + "articles_json": articles_json, + "processed_json": summaries_json, + "threads_json": json.dumps(generated_threads), + "approved_json": json.dumps(approved_threads), + "results_json": json.dumps(results), + "articles_fetched": len(articles), + "threads_reviewed": reviewed, + "threads_posted": total_posted, + "post_results": results, + } + + +async def run_interactive( + feed_url: str = "https://news.ycombinator.com/rss", + max_articles: int = 3, + twitter_credential_ref: str | None = None, +) -> dict[str, Any]: + """Run fetch -> summarize -> per-thread y/n/q -> post (Playwright).""" + workflow = await run_workflow( + feed_url=feed_url, + max_articles=max_articles, + twitter_credential_ref=twitter_credential_ref, + ) + return { + "success": workflow["success"], + "feed_url": workflow["feed_url"], + "articles_fetched": workflow["articles_fetched"], + "threads_reviewed": workflow["threads_reviewed"], + "threads_posted": workflow["threads_posted"], + "post_results": workflow["post_results"], + } + + +if __name__ == "__main__": + output = asyncio.run(run_interactive()) + print(json.dumps(output, indent=2, default=str)) diff --git a/examples/templates/rss_twitter_agent/test_flow.py b/examples/templates/rss_twitter_agent/test_flow.py new file mode 100644 index 0000000000..d5a02d0d10 --- /dev/null +++ b/examples/templates/rss_twitter_agent/test_flow.py @@ -0,0 +1,120 @@ +#!/usr/bin/env python +"""Focused tests for the RSS-to-Twitter interactive runner.""" + +from __future__ import annotations + +import asyncio +import json + +from .fetch import approve_threads +from . import run as run_module + + +def test_run_interactive_posts_approved_thread(monkeypatch) -> None: + articles = [{"title": "A", "link": "https://example.com/a", "summary": "Sum"}] + summaries = [ + { + "title": "A", + "url": "https://example.com/a", + "hook": "Hook", + "points": ["Point 1", "Point 2"], + "why_it_matters": "Why", + "hashtags": ["#Tech"], + } + ] + post_calls: list[list[dict]] = [] + + monkeypatch.setattr(run_module, "fetch_rss", lambda **_: json.dumps(articles)) + monkeypatch.setattr( + run_module, "summarize_articles", lambda _: json.dumps(summaries) + ) + monkeypatch.setattr( + run_module, + "_generate_thread_for_article", + lambda summary: { + "title": summary["title"], + "tweets": ["tweet 1", "tweet 2", "tweet 3", "tweet 4"], + }, + ) + + async def fake_post(approved_json: str) -> str: + payload = json.loads(approved_json) + post_calls.append(payload) + return json.dumps({"success": True, "posted": len(payload[0]["tweets"])}) + + monkeypatch.setattr(run_module, "post_to_twitter", fake_post) + responses = iter(["y"]) + monkeypatch.setattr("builtins.input", lambda _prompt="": next(responses)) + + result = asyncio.run(run_module.run_interactive(max_articles=1)) + + assert result["success"] is True + assert result["articles_fetched"] == 1 + assert result["threads_reviewed"] == 1 + assert result["threads_posted"] == 1 + assert len(post_calls) == 1 + + +def test_run_interactive_quit_stops_remaining_threads(monkeypatch) -> None: + articles = [ + {"title": "A", "link": "https://example.com/a", "summary": "Sum"}, + {"title": "B", "link": "https://example.com/b", "summary": "Sum"}, + ] + summaries = [ + { + "title": article["title"], + "url": article["link"], + "hook": article["title"], + "points": ["Point 1"], + "why_it_matters": "Why", + "hashtags": ["#Tech"], + } + for article in articles + ] + post_calls: list[list[dict]] = [] + + monkeypatch.setattr(run_module, "fetch_rss", lambda **_: json.dumps(articles)) + monkeypatch.setattr( + run_module, "summarize_articles", lambda _: json.dumps(summaries) + ) + monkeypatch.setattr( + run_module, + "_generate_thread_for_article", + lambda summary: { + "title": summary["title"], + "tweets": ["tweet 1", "tweet 2", "tweet 3", "tweet 4"], + }, + ) + + async def fake_post(approved_json: str) -> str: + payload = json.loads(approved_json) + post_calls.append(payload) + return json.dumps({"success": True, "posted": len(payload[0]["tweets"])}) + + monkeypatch.setattr(run_module, "post_to_twitter", fake_post) + responses = iter(["q"]) + monkeypatch.setattr("builtins.input", lambda _prompt="": next(responses)) + + result = asyncio.run(run_module.run_interactive(max_articles=2)) + + assert result["threads_reviewed"] == 1 + assert result["threads_posted"] == 0 + assert post_calls == [] + + +def test_approve_threads_keeps_prior_approvals_before_quit(monkeypatch) -> None: + threads = [ + {"title": "A", "tweets": ["tweet 1"]}, + {"title": "B", "tweets": ["tweet 2"]}, + ] + responses = iter(["y", "q"]) + monkeypatch.setattr("builtins.input", lambda _prompt="": next(responses)) + + approved = json.loads(approve_threads(json.dumps(threads))) + + assert approved == [threads[0]] + + +if __name__ == "__main__": + result = asyncio.run(run_module.run_interactive(max_articles=1)) + print(json.dumps(result, indent=2, default=str)) diff --git a/examples/templates/rss_twitter_agent/twitter.py b/examples/templates/rss_twitter_agent/twitter.py new file mode 100644 index 0000000000..3aea3cf647 --- /dev/null +++ b/examples/templates/rss_twitter_agent/twitter.py @@ -0,0 +1,253 @@ +"""Twitter posting via Playwright with persistent session support.""" + +from __future__ import annotations + +import asyncio +import json +import os +from pathlib import Path +from typing import Any + +from .credentials import resolve_twitter_session_dir + +TwitterConfig = Any + + +def _session_dir() -> Path: + configured = resolve_twitter_session_dir(os.environ.get("TWITTER_CREDENTIAL_REF")) + return Path(configured).expanduser() + + +def _session_marker() -> Path: + return _session_dir() / ".logged_in" + + +def _is_logged_in() -> bool: + return _session_marker().exists() + + +async def _post_thread_with_playwright(thread: dict) -> dict: + from playwright.async_api import TimeoutError as PlaywrightTimeout + from playwright.async_api import async_playwright + + tweets = thread.get("thread") or thread.get("tweets") or [] + title = thread.get("article_title") or thread.get("title", "Untitled") + + if not tweets: + return {"title": title, "posted": 0, "error": "No tweets in thread"} + + session_dir = _session_dir() + session_marker = _session_marker() + session_dir.mkdir(parents=True, exist_ok=True) + first_run = not _is_logged_in() + + print(f"\n{'=' * 60}") + print(f"Thread: {title[:55]}{'...' if len(title) > 55 else ''}") + print(f"Tweets: {len(tweets)}") + print("=" * 60) + + if first_run: + print("\nFIRST RUN โ€” Browser will open for manual login.") + print("Log in to X/Twitter, then press Enter here to continue.\n") + + async with async_playwright() as p: + context = await p.chromium.launch_persistent_context( + user_data_dir=str(session_dir), + headless=False, + slow_mo=80, + args=[ + "--disable-blink-features=AutomationControlled", + "--no-sandbox", + "--disable-dev-shm-usage", + ], + viewport={"width": 1280, "height": 800}, + ) + + page = await context.new_page() + + if first_run: + await page.goto("https://x.com/login", wait_until="domcontentloaded") + loop = asyncio.get_event_loop() + await loop.run_in_executor( + None, + lambda: input( + " -> Log in to X in the browser, then press Enter here: " + ), + ) + await page.goto("https://x.com/home", wait_until="domcontentloaded") + await asyncio.sleep(2) + if "login" in page.url or "i/flow" in page.url: + await context.close() + return { + "title": title, + "posted": 0, + "error": "Login not detected. Please try again.", + } + session_marker.touch() + print("Session saved โ€” future runs will be fully automatic.\n") + + posted = 0 + first_tweet_url = None + + for i, tweet_text in enumerate(tweets): + if isinstance(tweet_text, dict): + tweet_text = tweet_text.get("text", "") + tweet_text = tweet_text.strip() + if not tweet_text: + continue + + print(f"\n Posting tweet {i + 1}/{len(tweets)}...") + + try: + if i == 0: + await page.goto( + "https://x.com/compose/tweet", wait_until="domcontentloaded" + ) + await asyncio.sleep(1.5) + + if "login" in page.url or "i/flow" in page.url: + session_marker.unlink(missing_ok=True) + await context.close() + return { + "title": title, + "posted": posted, + "error": "Session expired. Re-login and run again.", + } + + textarea = page.locator( + "[data-testid='tweetTextarea_0'], " + "div[role='textbox'][data-testid='tweetTextarea_0'], " + "div.public-DraftEditor-content" + ).first + await textarea.wait_for(timeout=10000) + await textarea.click() + await asyncio.sleep(0.5) + await page.keyboard.type(tweet_text, delay=30) + await asyncio.sleep(0.8) + + post_btn = page.locator( + "[data-testid='tweetButtonInline'], [data-testid='tweetButton']" + ).first + await post_btn.wait_for(timeout=8000) + await post_btn.click() + await asyncio.sleep(2.5) + + first_tweet_url = page.url + posted += 1 + print(" Posted tweet 1") + + else: + if first_tweet_url and "status" in first_tweet_url: + await page.goto(first_tweet_url, wait_until="domcontentloaded") + await asyncio.sleep(1.5) + + reply_btn = page.locator("[data-testid='reply']").first + await reply_btn.wait_for(timeout=8000) + await reply_btn.click() + await asyncio.sleep(1.0) + else: + await page.goto( + "https://x.com/compose/tweet", wait_until="domcontentloaded" + ) + await asyncio.sleep(1.5) + + textarea = page.locator( + "div[data-testid='tweetTextarea_0'], div[role='textbox']" + ).last + await textarea.wait_for(timeout=10000) + await textarea.click() + await asyncio.sleep(0.5) + await page.keyboard.type(tweet_text, delay=30) + await asyncio.sleep(0.8) + + post_btn = page.locator( + "[data-testid='tweetButtonInline'], [data-testid='tweetButton']" + ).first + await post_btn.wait_for(timeout=8000) + await post_btn.click() + await asyncio.sleep(2.5) + + posted += 1 + print(f" Posted tweet {i + 1}") + + except PlaywrightTimeout as e: + print(f" Timeout on tweet {i + 1}: {e}") + except Exception as e: + print(f" Error on tweet {i + 1}: {e}") + + await context.close() + + return { + "title": title, + "posted": posted, + "total": len(tweets), + "url": first_tweet_url, + } + + +async def post_threads_impl( + threads_json: str, + config: TwitterConfig, + credential_ref: str | None = None, +) -> str | dict[str, Any]: + if credential_ref: + os.environ["TWITTER_CREDENTIAL_REF"] = credential_ref + + try: + threads = json.loads(threads_json) + except (json.JSONDecodeError, TypeError) as e: + return f"Invalid threads_json: {e!s}" + + if not isinstance(threads, list): + return "threads_json must be a JSON array of thread objects." + + if not threads: + return "No threads to post." + + results = [] + total_posted = 0 + + for thread in threads: + if not isinstance(thread, dict): + continue + result = await _post_thread_with_playwright(thread) + results.append(result) + total_posted += result.get("posted", 0) + + return { + "success": True, + "threads": results, + "message": f"Posted {total_posted} tweets across {len(results)} threads", + } + + +def register_twitter_tool(registry: Any, config: TwitterConfig) -> None: + from framework.llm.provider import Tool + + tool = Tool( + name="post_to_twitter", + description="Post threads to Twitter/X using Playwright automation.", + parameters={ + "type": "object", + "properties": { + "threads_json": { + "type": "string", + "description": "JSON string of the threads array.", + }, + "twitter_credential_ref": { + "type": "string", + "description": "Optional credential ref in {name}/{alias} format.", + }, + }, + "required": ["threads_json"], + }, + ) + registry.register( + "post_to_twitter", + tool, + lambda inputs: post_threads_impl( + inputs["threads_json"], + config, + inputs.get("twitter_credential_ref"), + ), + ) From 15680c35bff65f381946473124e1f94818764171 Mon Sep 17 00:00:00 2001 From: Vasu Bansal Date: Sat, 28 Mar 2026 17:55:31 +0530 Subject: [PATCH 2/4] fix: address rss twitter template review findings --- .../templates/rss_twitter_agent/README.md | 1 - examples/templates/rss_twitter_agent/agent.py | 16 +-- .../templates/rss_twitter_agent/config.py | 49 ++++++-- .../rss_twitter_agent/credentials.py | 30 ++++- examples/templates/rss_twitter_agent/fetch.py | 15 ++- examples/templates/rss_twitter_agent/run.py | 25 +++- .../templates/rss_twitter_agent/test_flow.py | 108 ++++++++++++++++- .../templates/rss_twitter_agent/twitter.py | 110 +++++++++++++----- 8 files changed, 292 insertions(+), 62 deletions(-) diff --git a/examples/templates/rss_twitter_agent/README.md b/examples/templates/rss_twitter_agent/README.md index 5b4ef3a0c0..03e4c9fded 100644 --- a/examples/templates/rss_twitter_agent/README.md +++ b/examples/templates/rss_twitter_agent/README.md @@ -14,7 +14,6 @@ Updated for Hive v0.6+ project layout and credential namespace support. From repo root: ```bash -cd /Users/vasu/Desktop/hive uv run python -m examples.templates.rss_twitter_agent run \ --feed-url "https://news.ycombinator.com/rss" \ --max-articles 3 diff --git a/examples/templates/rss_twitter_agent/agent.py b/examples/templates/rss_twitter_agent/agent.py index d9ec1794b8..3232c41650 100644 --- a/examples/templates/rss_twitter_agent/agent.py +++ b/examples/templates/rss_twitter_agent/agent.py @@ -123,7 +123,8 @@ async def trigger_and_wait( self, entry_point: str, input_data: dict, timeout: float | None = None ) -> ExecutionResult: feed_url = str(input_data.get("feed_url") or "https://news.ycombinator.com/rss") - max_articles = int(input_data.get("max_articles") or 3) + raw_max_articles = input_data.get("max_articles") + max_articles = 3 if raw_max_articles in (None, "") else int(raw_max_articles) twitter_credential_ref = input_data.get("twitter_credential_ref") workflow = await run_workflow( feed_url=feed_url, @@ -134,14 +135,15 @@ async def trigger_and_wait( ) return ExecutionResult( - success=True, + success=bool(workflow.get("success", True)), output={ - "articles_json": workflow["articles_json"], - "processed_json": workflow["processed_json"], - "threads_json": workflow["threads_json"], - "approved_json": workflow["approved_json"], - "results_json": workflow["results_json"], + "articles_json": workflow.get("articles_json", "[]"), + "processed_json": workflow.get("processed_json", "[]"), + "threads_json": workflow.get("threads_json", "[]"), + "approved_json": workflow.get("approved_json", "[]"), + "results_json": workflow.get("results_json", "[]"), }, + error=workflow.get("error"), steps_executed=5, ) diff --git a/examples/templates/rss_twitter_agent/config.py b/examples/templates/rss_twitter_agent/config.py index 61d604bb66..2c016eefe4 100644 --- a/examples/templates/rss_twitter_agent/config.py +++ b/examples/templates/rss_twitter_agent/config.py @@ -4,6 +4,7 @@ import os from dataclasses import dataclass, field +from typing import Any import httpx @@ -11,18 +12,44 @@ DEFAULT_MODEL = os.environ.get("LLM_MODEL", "ollama/llama3.2") -def _check_ollama_running() -> bool: - """Check if Ollama is running locally.""" +def get_ollama_url() -> str: + """Return the configured Ollama base URL without a trailing slash.""" + return OLLAMA_URL.rstrip("/") + + +def get_ollama_model() -> str: + """Return the configured Ollama model name without the provider prefix.""" + model = os.environ.get("OLLAMA_MODEL") or DEFAULT_MODEL + return model.removeprefix("ollama/") + + +def _fetch_ollama_tags() -> list[dict[str, Any]] | None: + """Fetch Ollama model metadata, returning None when the service is unavailable.""" try: with httpx.Client() as client: - resp = client.get(f"{OLLAMA_URL}/api/tags", timeout=2.0) - return resp.status_code == 200 + resp = client.get(f"{get_ollama_url()}/api/tags", timeout=2.0) + resp.raise_for_status() + data = resp.json() except Exception: - return False + return None + models = data.get("models", []) + return models if isinstance(models, list) else [] def _get_model() -> str: - return DEFAULT_MODEL + return get_ollama_model() + + +def _model_available(models: list[dict[str, Any]], configured_model: str) -> bool: + """Match configured model names regardless of optional Ollama tag/provider prefixes.""" + configured_short = configured_model.split(":", 1)[0] + for model in models: + name = model.get("name") + if not isinstance(name, str) or not name: + continue + if name == configured_model or name.split(":", 1)[0] == configured_short: + return True + return False @dataclass @@ -51,9 +78,17 @@ class AgentMetadata: def validate_ollama() -> tuple[bool, str]: - if not _check_ollama_running(): + models = _fetch_ollama_tags() + configured_model = get_ollama_model() + if models is None: return ( False, "Ollama is not running. Start it with `ollama serve` and ensure your model is pulled.", ) + if not _model_available(models, configured_model): + return ( + False, + f"Ollama model '{configured_model}' is not available. " + f"Pull it with `ollama pull {configured_model}` or update LLM_MODEL/OLLAMA_MODEL.", + ) return True, "" diff --git a/examples/templates/rss_twitter_agent/credentials.py b/examples/templates/rss_twitter_agent/credentials.py index 862bd11c31..759d9ae910 100644 --- a/examples/templates/rss_twitter_agent/credentials.py +++ b/examples/templates/rss_twitter_agent/credentials.py @@ -32,6 +32,24 @@ def _extract_session_dir(payload: Any) -> str | None: except json.JSONDecodeError: return None return raw + keys = getattr(payload, "keys", None) + if isinstance(keys, dict): + for key in ( + "session_dir", + "user_data_dir", + "twitter_session_dir", + "playwright_user_data_dir", + "path", + "value", + ): + getter = getattr(payload, "get_key", None) + if callable(getter): + resolved = getter(key) + if isinstance(resolved, str) and resolved.strip(): + return resolved.strip() + default_getter = getattr(payload, "get_default_key", None) + if callable(default_getter): + return _extract_session_dir(default_getter()) return None @@ -42,14 +60,16 @@ def resolve_twitter_session_dir(credential_ref: str | None = None) -> str: return str(Path(env_dir).expanduser()) ref = credential_ref or os.environ.get("TWITTER_CREDENTIAL_REF") - if ref and "/" in ref: + if ref: try: from framework.credentials.store import CredentialStore - store = CredentialStore.with_encrypted_storage( - Path.home() / ".hive" / "credentials" - ) - value = store.get(ref) + store = CredentialStore.with_encrypted_storage() + if "/" in ref: + provider, alias = ref.split("/", 1) + value = store.get_credential_by_alias(provider, alias) + else: + value = store.get(ref) resolved = _extract_session_dir(value) if resolved: return str(Path(resolved).expanduser()) diff --git a/examples/templates/rss_twitter_agent/fetch.py b/examples/templates/rss_twitter_agent/fetch.py index 1f57d503b8..8c544c2ee8 100644 --- a/examples/templates/rss_twitter_agent/fetch.py +++ b/examples/templates/rss_twitter_agent/fetch.py @@ -8,6 +8,8 @@ import httpx +from .config import get_ollama_model, get_ollama_url + def fetch_rss( feed_url: str = "https://news.ycombinator.com/rss", max_articles: int = 3 @@ -47,11 +49,11 @@ def fetch_rss( def _call_ollama(prompt: str, max_tokens: int = 800) -> str: """Call Ollama API directly via HTTP.""" - model = os.environ.get("OLLAMA_MODEL", "llama3.2") + model = get_ollama_model() try: with httpx.Client() as client: resp = client.post( - "http://localhost:11434/api/generate", + f"{get_ollama_url()}/api/generate", json={ "model": model, "prompt": prompt, @@ -127,6 +129,7 @@ def _generate_thread_for_article(summary: dict) -> dict | None: points = summary.get("points", []) why = summary.get("why_it_matters", "") hashtags = " ".join(summary.get("hashtags", ["#Tech"])[:3]) + title_literal = json.dumps(title[:60]) prompt = f"""You are a viral tech Twitter personality. Write an engaging 4-tweet thread about this article. @@ -145,7 +148,7 @@ def _generate_thread_for_article(summary: dict) -> dict | None: - Sound like a real person, not a press release. Return ONLY a JSON object with this exact structure (no other text): -{{"title": "{title[:60]}", "tweets": ["tweet1", "tweet2", "tweet3", "tweet4"]}}""" +{{"title": {title_literal}, "tweets": ["tweet1", "tweet2", "tweet3", "tweet4"]}}""" text = _call_ollama(prompt, 700) @@ -236,7 +239,9 @@ def approve_threads(threads_json: str) -> str: return json.dumps(approved) -async def post_to_twitter(approved_json: str) -> str: +async def post_to_twitter( + approved_json: str, twitter_credential_ref: str | None = None +) -> str: """Post approved threads via Playwright automation.""" threads = json.loads(approved_json) if approved_json else [] if not threads: @@ -248,7 +253,7 @@ async def post_to_twitter(approved_json: str) -> str: from .twitter import post_threads_impl - credential_ref = os.environ.get("TWITTER_CREDENTIAL_REF") + credential_ref = twitter_credential_ref or os.environ.get("TWITTER_CREDENTIAL_REF") result = await post_threads_impl(approved_json, None, credential_ref=credential_ref) return ( json.dumps(result) diff --git a/examples/templates/rss_twitter_agent/run.py b/examples/templates/rss_twitter_agent/run.py index 8a3445b0dc..2963148bce 100644 --- a/examples/templates/rss_twitter_agent/run.py +++ b/examples/templates/rss_twitter_agent/run.py @@ -5,7 +5,6 @@ import asyncio import json -import os from typing import Any from .fetch import ( @@ -43,9 +42,6 @@ async def run_workflow( twitter_credential_ref: str | None = None, ) -> dict[str, Any]: """Run the original sequential RSS -> summarize -> approve -> post flow.""" - if twitter_credential_ref: - os.environ["TWITTER_CREDENTIAL_REF"] = twitter_credential_ref - print("=" * 60) print("RSS-to-Twitter Agent") print("=" * 60 + "\n") @@ -94,7 +90,9 @@ async def run_workflow( approved_threads.append(thread) print(" Posting...\n") - result_json = await post_to_twitter(json.dumps([thread])) + result_json = await post_to_twitter( + json.dumps([thread]), twitter_credential_ref=twitter_credential_ref + ) result = json.loads(result_json) results.append(result) if isinstance(result, dict) and result.get("success"): @@ -118,8 +116,23 @@ async def run_workflow( print(f"Done! Posted {total_posted}/{reviewed} reviewed threads.") print("=" * 60) + workflow_success = all( + isinstance(result, dict) and result.get("success") for result in results + ) + workflow_error = None + if not workflow_success and results: + errors = [ + result.get("error") or result.get("message", "Unknown posting error") + for result in results + if isinstance(result, dict) and not result.get("success") + ] + workflow_error = ( + "; ".join(error for error in errors if error) or "Posting failed" + ) + return { - "success": True, + "success": workflow_success if results else True, + "error": workflow_error, "feed_url": feed_url, "articles_json": articles_json, "processed_json": summaries_json, diff --git a/examples/templates/rss_twitter_agent/test_flow.py b/examples/templates/rss_twitter_agent/test_flow.py index d5a02d0d10..71fee6b924 100644 --- a/examples/templates/rss_twitter_agent/test_flow.py +++ b/examples/templates/rss_twitter_agent/test_flow.py @@ -7,7 +7,9 @@ import json from .fetch import approve_threads +from . import agent as agent_module from . import run as run_module +from . import twitter as twitter_module def test_run_interactive_posts_approved_thread(monkeypatch) -> None: @@ -37,7 +39,9 @@ def test_run_interactive_posts_approved_thread(monkeypatch) -> None: }, ) - async def fake_post(approved_json: str) -> str: + async def fake_post( + approved_json: str, twitter_credential_ref: str | None = None + ) -> str: payload = json.loads(approved_json) post_calls.append(payload) return json.dumps({"success": True, "posted": len(payload[0]["tweets"])}) @@ -86,7 +90,9 @@ def test_run_interactive_quit_stops_remaining_threads(monkeypatch) -> None: }, ) - async def fake_post(approved_json: str) -> str: + async def fake_post( + approved_json: str, twitter_credential_ref: str | None = None + ) -> str: payload = json.loads(approved_json) post_calls.append(payload) return json.dumps({"success": True, "posted": len(payload[0]["tweets"])}) @@ -115,6 +121,98 @@ def test_approve_threads_keeps_prior_approvals_before_quit(monkeypatch) -> None: assert approved == [threads[0]] -if __name__ == "__main__": - result = asyncio.run(run_module.run_interactive(max_articles=1)) - print(json.dumps(result, indent=2, default=str)) +def test_run_workflow_propagates_post_failure(monkeypatch) -> None: + articles = [{"title": "A", "link": "https://example.com/a", "summary": "Sum"}] + summaries = [ + { + "title": "A", + "url": "https://example.com/a", + "hook": "Hook", + "points": ["Point 1", "Point 2"], + "why_it_matters": "Why", + "hashtags": ["#Tech"], + } + ] + + monkeypatch.setattr(run_module, "fetch_rss", lambda **_: json.dumps(articles)) + monkeypatch.setattr( + run_module, "summarize_articles", lambda _: json.dumps(summaries) + ) + monkeypatch.setattr( + run_module, + "_generate_thread_for_article", + lambda summary: { + "title": summary["title"], + "tweets": ["tweet 1", "tweet 2", "tweet 3", "tweet 4"], + }, + ) + + async def fake_post( + approved_json: str, twitter_credential_ref: str | None = None + ) -> str: + return json.dumps({"success": False, "error": "session expired"}) + + monkeypatch.setattr(run_module, "post_to_twitter", fake_post) + monkeypatch.setattr("builtins.input", lambda _prompt="": "y") + + result = asyncio.run(run_module.run_workflow(max_articles=1)) + + assert result["success"] is False + assert result["threads_posted"] == 0 + assert "session expired" in result["error"] + + +def test_agent_preserves_explicit_zero_max_articles(monkeypatch) -> None: + captured: dict[str, int | None] = {"max_articles": None} + + async def fake_workflow( + feed_url: str = "https://news.ycombinator.com/rss", + max_articles: int = 3, + twitter_credential_ref: str | None = None, + ) -> dict[str, object]: + captured["max_articles"] = max_articles + return { + "success": True, + "articles_json": "[]", + "processed_json": "[]", + "threads_json": "[]", + "approved_json": "[]", + "results_json": "[]", + } + + monkeypatch.setattr(agent_module, "run_workflow", fake_workflow) + + result = asyncio.run( + agent_module.default_agent.trigger_and_wait("start", {"max_articles": 0}) + ) + + assert result.success is True + assert captured["max_articles"] == 0 + + +def test_post_threads_impl_reports_partial_failure(monkeypatch) -> None: + async def fake_post_thread( + thread: dict, credential_ref: str | None = None + ) -> dict[str, object]: + return { + "title": thread["title"], + "posted": 1, + "total": 4, + "error": "reply failed", + } + + monkeypatch.setattr( + twitter_module, "_post_thread_with_playwright", fake_post_thread + ) + + result = asyncio.run( + twitter_module.post_threads_impl( + json.dumps([{"title": "A", "tweets": ["1", "2", "3", "4"]}]), + None, + credential_ref="twitter/default", + ) + ) + + assert isinstance(result, dict) + assert result["success"] is False + assert "failed" in result["message"] diff --git a/examples/templates/rss_twitter_agent/twitter.py b/examples/templates/rss_twitter_agent/twitter.py index 3aea3cf647..6795ac564e 100644 --- a/examples/templates/rss_twitter_agent/twitter.py +++ b/examples/templates/rss_twitter_agent/twitter.py @@ -4,7 +4,6 @@ import asyncio import json -import os from pathlib import Path from typing import Any @@ -13,20 +12,43 @@ TwitterConfig = Any -def _session_dir() -> Path: - configured = resolve_twitter_session_dir(os.environ.get("TWITTER_CREDENTIAL_REF")) +def _session_dir(credential_ref: str | None = None) -> Path: + configured = resolve_twitter_session_dir(credential_ref) return Path(configured).expanduser() -def _session_marker() -> Path: - return _session_dir() / ".logged_in" +def _session_marker(credential_ref: str | None = None) -> Path: + return _session_dir(credential_ref) / ".logged_in" -def _is_logged_in() -> bool: - return _session_marker().exists() +def _is_logged_in(credential_ref: str | None = None) -> bool: + return _session_marker(credential_ref).exists() -async def _post_thread_with_playwright(thread: dict) -> dict: +async def _capture_posted_tweet_url(page: Any, tweet_text: str) -> str | None: + """Best-effort permalink lookup for the freshly posted tweet/reply.""" + if "status/" in page.url: + return page.url + + snippet = tweet_text.strip().replace("\n", " ")[:80] + if not snippet: + return None + + try: + article = page.locator("article").filter(has_text=snippet).first + status_link = article.locator("a[href*='/status/']").first + href = await status_link.get_attribute("href", timeout=5000) + except Exception: + return None + + if not href: + return None + return href if href.startswith("http") else f"https://x.com{href}" + + +async def _post_thread_with_playwright( + thread: dict, credential_ref: str | None = None +) -> dict: from playwright.async_api import TimeoutError as PlaywrightTimeout from playwright.async_api import async_playwright @@ -36,10 +58,10 @@ async def _post_thread_with_playwright(thread: dict) -> dict: if not tweets: return {"title": title, "posted": 0, "error": "No tweets in thread"} - session_dir = _session_dir() - session_marker = _session_marker() + session_dir = _session_dir(credential_ref) + session_marker = _session_marker(credential_ref) session_dir.mkdir(parents=True, exist_ok=True) - first_run = not _is_logged_in() + first_run = not _is_logged_in(credential_ref) print(f"\n{'=' * 60}") print(f"Thread: {title[:55]}{'...' if len(title) > 55 else ''}") @@ -87,7 +109,7 @@ async def _post_thread_with_playwright(thread: dict) -> dict: print("Session saved โ€” future runs will be fully automatic.\n") posted = 0 - first_tweet_url = None + current_tweet_url = None for i, tweet_text in enumerate(tweets): if isinstance(tweet_text, dict): @@ -132,13 +154,25 @@ async def _post_thread_with_playwright(thread: dict) -> dict: await post_btn.click() await asyncio.sleep(2.5) - first_tweet_url = page.url + current_tweet_url = await _capture_posted_tweet_url( + page, tweet_text + ) + if not current_tweet_url: + await context.close() + return { + "title": title, + "posted": posted, + "total": len(tweets), + "error": "Tweet posted but could not resolve its permalink for threading.", + } posted += 1 print(" Posted tweet 1") else: - if first_tweet_url and "status" in first_tweet_url: - await page.goto(first_tweet_url, wait_until="domcontentloaded") + if current_tweet_url and "status" in current_tweet_url: + await page.goto( + current_tweet_url, wait_until="domcontentloaded" + ) await asyncio.sleep(1.5) reply_btn = page.locator("[data-testid='reply']").first @@ -146,10 +180,13 @@ async def _post_thread_with_playwright(thread: dict) -> dict: await reply_btn.click() await asyncio.sleep(1.0) else: - await page.goto( - "https://x.com/compose/tweet", wait_until="domcontentloaded" - ) - await asyncio.sleep(1.5) + await context.close() + return { + "title": title, + "posted": posted, + "total": len(tweets), + "error": "Cannot continue thread because the previous tweet URL was not available.", + } textarea = page.locator( "div[data-testid='tweetTextarea_0'], div[role='textbox']" @@ -167,6 +204,17 @@ async def _post_thread_with_playwright(thread: dict) -> dict: await post_btn.click() await asyncio.sleep(2.5) + current_tweet_url = await _capture_posted_tweet_url( + page, tweet_text + ) + if not current_tweet_url: + await context.close() + return { + "title": title, + "posted": posted, + "total": len(tweets), + "error": "Reply posted but could not resolve its permalink for the next step.", + } posted += 1 print(f" Posted tweet {i + 1}") @@ -181,7 +229,7 @@ async def _post_thread_with_playwright(thread: dict) -> dict: "title": title, "posted": posted, "total": len(tweets), - "url": first_tweet_url, + "url": current_tweet_url, } @@ -190,9 +238,6 @@ async def post_threads_impl( config: TwitterConfig, credential_ref: str | None = None, ) -> str | dict[str, Any]: - if credential_ref: - os.environ["TWITTER_CREDENTIAL_REF"] = credential_ref - try: threads = json.loads(threads_json) except (json.JSONDecodeError, TypeError) as e: @@ -210,14 +255,27 @@ async def post_threads_impl( for thread in threads: if not isinstance(thread, dict): continue - result = await _post_thread_with_playwright(thread) + result = await _post_thread_with_playwright( + thread, credential_ref=credential_ref + ) results.append(result) total_posted += result.get("posted", 0) + success = bool(results) and all( + result.get("posted", 0) >= result.get("total", 0) and not result.get("error") + for result in results + ) + message = f"Posted {total_posted} tweets across {len(results)} threads" + if not success: + message = ( + f"Posted {total_posted} tweets across {len(results)} threads, " + "but one or more threads failed." + ) + return { - "success": True, + "success": success, "threads": results, - "message": f"Posted {total_posted} tweets across {len(results)} threads", + "message": message, } From 19ae3e55fc83f706db030aa7734d51aebddd0827 Mon Sep 17 00:00:00 2001 From: Vasu Bansal Date: Sat, 28 Mar 2026 18:18:39 +0530 Subject: [PATCH 3/4] fix: address additional rss review findings --- examples/templates/rss_twitter_agent/agent.py | 16 ++- .../rss_twitter_agent/credentials.py | 11 ++- examples/templates/rss_twitter_agent/fetch.py | 99 +++++++++++++++++-- .../templates/rss_twitter_agent/test_flow.py | 40 ++++++++ .../templates/rss_twitter_agent/twitter.py | 12 +++ 5 files changed, 166 insertions(+), 12 deletions(-) diff --git a/examples/templates/rss_twitter_agent/agent.py b/examples/templates/rss_twitter_agent/agent.py index 3232c41650..11fb07ae92 100644 --- a/examples/templates/rss_twitter_agent/agent.py +++ b/examples/templates/rss_twitter_agent/agent.py @@ -2,6 +2,8 @@ from __future__ import annotations +import asyncio + from framework.graph import Constraint, EdgeCondition, EdgeSpec, Goal, SuccessCriterion from framework.graph.executor import ExecutionResult @@ -126,13 +128,25 @@ async def trigger_and_wait( raw_max_articles = input_data.get("max_articles") max_articles = 3 if raw_max_articles in (None, "") else int(raw_max_articles) twitter_credential_ref = input_data.get("twitter_credential_ref") - workflow = await run_workflow( + workflow_coro = run_workflow( feed_url=feed_url, max_articles=max_articles, twitter_credential_ref=( str(twitter_credential_ref) if twitter_credential_ref else None ), ) + try: + workflow = ( + await asyncio.wait_for(workflow_coro, timeout=timeout) + if timeout is not None + else await workflow_coro + ) + except asyncio.TimeoutError: + return ExecutionResult( + success=False, + error=f"RSS-to-Twitter workflow timed out after {timeout} seconds.", + steps_executed=0, + ) return ExecutionResult( success=bool(workflow.get("success", True)), diff --git a/examples/templates/rss_twitter_agent/credentials.py b/examples/templates/rss_twitter_agent/credentials.py index 759d9ae910..d94f33dab6 100644 --- a/examples/templates/rss_twitter_agent/credentials.py +++ b/examples/templates/rss_twitter_agent/credentials.py @@ -19,8 +19,9 @@ def _extract_session_dir(payload: Any) -> str | None: "value", ): value = payload.get(key) - if isinstance(value, str) and value.strip(): - return value.strip() + resolved = _extract_session_dir(value) + if resolved: + return resolved elif isinstance(payload, str): raw = payload.strip() if not raw: @@ -44,9 +45,9 @@ def _extract_session_dir(payload: Any) -> str | None: ): getter = getattr(payload, "get_key", None) if callable(getter): - resolved = getter(key) - if isinstance(resolved, str) and resolved.strip(): - return resolved.strip() + resolved = _extract_session_dir(getter(key)) + if resolved: + return resolved default_getter = getattr(payload, "get_default_key", None) if callable(default_getter): return _extract_session_dir(default_getter()) diff --git a/examples/templates/rss_twitter_agent/fetch.py b/examples/templates/rss_twitter_agent/fetch.py index 8c544c2ee8..1dc9fd0b56 100644 --- a/examples/templates/rss_twitter_agent/fetch.py +++ b/examples/templates/rss_twitter_agent/fetch.py @@ -2,24 +2,107 @@ from __future__ import annotations +import ipaddress import json import os +import socket import xml.etree.ElementTree as ET +from urllib.parse import urljoin, urlparse import httpx from .config import get_ollama_model, get_ollama_url +def _is_public_ip(host: str) -> bool: + """Reject loopback/private/link-local targets to avoid local network fetches.""" + try: + address = ipaddress.ip_address(host) + except ValueError: + return False + return not ( + address.is_private + or address.is_loopback + or address.is_link_local + or address.is_multicast + or address.is_reserved + or address.is_unspecified + ) + + +def _validate_public_feed_url(feed_url: str) -> str: + """Allow only http(s) URLs that resolve to public destinations.""" + parsed = urlparse(feed_url) + if parsed.scheme not in {"http", "https"} or not parsed.hostname: + raise ValueError("Feed URL must use http(s) and include a hostname.") + + try: + addrinfo = socket.getaddrinfo( + parsed.hostname, parsed.port or None, proto=socket.IPPROTO_TCP + ) + except OSError as exc: + raise ValueError(f"Could not resolve feed host '{parsed.hostname}'.") from exc + + if not addrinfo: + raise ValueError(f"Could not resolve feed host '{parsed.hostname}'.") + + for family, *_rest, sockaddr in addrinfo: + host = sockaddr[0] if family in (socket.AF_INET, socket.AF_INET6) else None + if not host or not _is_public_ip(host): + raise ValueError("Feed URL must resolve to a public network destination.") + + return feed_url + + +def _fetch_public_feed(feed_url: str, max_redirects: int = 5) -> str: + """Fetch RSS while validating every redirect target against SSRF-prone destinations.""" + current_url = _validate_public_feed_url(feed_url) + with httpx.Client(follow_redirects=False) as client: + for _ in range(max_redirects + 1): + resp = client.get(current_url, timeout=10.0) + if resp.status_code in {301, 302, 303, 307, 308}: + location = resp.headers.get("location") + if not location: + break + current_url = _validate_public_feed_url( + urljoin(str(resp.url), location) + ) + continue + resp.raise_for_status() + return resp.text + raise ValueError("Too many redirects while fetching feed URL.") + + +def _valid_summary_item(item: object) -> bool: + return ( + isinstance(item, dict) + and isinstance(item.get("title"), str) + and isinstance(item.get("url"), str) + and isinstance(item.get("hook"), str) + and isinstance(item.get("why_it_matters"), str) + and isinstance(item.get("points"), list) + and all(isinstance(point, str) for point in item["points"]) + and isinstance(item.get("hashtags"), list) + and all(isinstance(tag, str) for tag in item["hashtags"]) + ) + + +def _valid_thread_payload(item: object) -> bool: + return ( + isinstance(item, dict) + and isinstance(item.get("title"), str) + and isinstance(item.get("tweets"), list) + and len(item["tweets"]) >= 3 + and all(isinstance(tweet, str) and tweet.strip() for tweet in item["tweets"]) + ) + + def fetch_rss( feed_url: str = "https://news.ycombinator.com/rss", max_articles: int = 3 ) -> str: """Fetch RSS feed and return parsed articles as JSON string.""" try: - with httpx.Client() as client: - resp = client.get(feed_url, timeout=10.0, follow_redirects=True) - resp.raise_for_status() - xml_content = resp.text + xml_content = _fetch_public_feed(feed_url) except Exception: return json.dumps([]) @@ -101,7 +184,11 @@ def summarize_articles(articles_json: str) -> str: if start >= 0 and end > start: try: parsed = json.loads(text[start:end]) - if isinstance(parsed, list) and parsed: + if ( + isinstance(parsed, list) + and parsed + and all(_valid_summary_item(item) for item in parsed) + ): return json.dumps(parsed) except json.JSONDecodeError: pass @@ -157,7 +244,7 @@ def _generate_thread_for_article(summary: dict) -> dict | None: if start >= 0 and end > start: try: obj = json.loads(text[start:end]) - if isinstance(obj, dict) and "tweets" in obj and len(obj["tweets"]) >= 3: + if _valid_thread_payload(obj): return obj except json.JSONDecodeError: pass diff --git a/examples/templates/rss_twitter_agent/test_flow.py b/examples/templates/rss_twitter_agent/test_flow.py index 71fee6b924..f93f463a82 100644 --- a/examples/templates/rss_twitter_agent/test_flow.py +++ b/examples/templates/rss_twitter_agent/test_flow.py @@ -8,6 +8,8 @@ from .fetch import approve_threads from . import agent as agent_module +from . import credentials as credentials_module +from . import fetch as fetch_module from . import run as run_module from . import twitter as twitter_module @@ -216,3 +218,41 @@ async def fake_post_thread( assert isinstance(result, dict) assert result["success"] is False assert "failed" in result["message"] + + +def test_trigger_and_wait_honors_timeout(monkeypatch) -> None: + async def slow_workflow(**kwargs) -> dict[str, object]: + await asyncio.sleep(0.05) + return {"success": True} + + monkeypatch.setattr(agent_module, "run_workflow", slow_workflow) + + result = asyncio.run( + agent_module.default_agent.trigger_and_wait("start", {}, timeout=0.001) + ) + + assert result.success is False + assert "timed out" in (result.error or "") + + +def test_extract_session_dir_recurses_into_json_value() -> None: + payload = {"value": '{"session_dir": "/tmp/twitter-profile"}'} + + assert credentials_module._extract_session_dir(payload) == "/tmp/twitter-profile" + + +def test_fetch_rss_rejects_localhost_target() -> None: + assert fetch_module.fetch_rss("http://127.0.0.1/rss") == "[]" + + +def test_summarize_articles_falls_back_when_model_shape_is_invalid(monkeypatch) -> None: + articles = [{"title": "A", "link": "https://example.com/a", "summary": "Sum"}] + monkeypatch.setattr( + fetch_module, "_call_ollama", lambda prompt, max_tokens=800: '["oops"]' + ) + + result = json.loads(fetch_module.summarize_articles(json.dumps(articles))) + + assert isinstance(result, list) + assert isinstance(result[0], dict) + assert result[0]["title"] == "A" diff --git a/examples/templates/rss_twitter_agent/twitter.py b/examples/templates/rss_twitter_agent/twitter.py index 6795ac564e..76d139a353 100644 --- a/examples/templates/rss_twitter_agent/twitter.py +++ b/examples/templates/rss_twitter_agent/twitter.py @@ -220,8 +220,20 @@ async def _post_thread_with_playwright( except PlaywrightTimeout as e: print(f" Timeout on tweet {i + 1}: {e}") + return { + "title": title, + "posted": posted, + "total": len(tweets), + "error": f"Timeout while posting tweet {i + 1}: {e}", + } except Exception as e: print(f" Error on tweet {i + 1}: {e}") + return { + "title": title, + "posted": posted, + "total": len(tweets), + "error": f"Error while posting tweet {i + 1}: {e}", + } await context.close() From 4931ff2cea5c32842c573f5f95ec1385d43421d6 Mon Sep 17 00:00:00 2001 From: Vasu Bansal Date: Sat, 28 Mar 2026 18:28:43 +0530 Subject: [PATCH 4/4] fix: validate rss max_articles input --- examples/templates/rss_twitter_agent/agent.py | 11 ++++++++++- examples/templates/rss_twitter_agent/test_flow.py | 9 +++++++++ 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/examples/templates/rss_twitter_agent/agent.py b/examples/templates/rss_twitter_agent/agent.py index 11fb07ae92..cb22eb87b7 100644 --- a/examples/templates/rss_twitter_agent/agent.py +++ b/examples/templates/rss_twitter_agent/agent.py @@ -126,7 +126,16 @@ async def trigger_and_wait( ) -> ExecutionResult: feed_url = str(input_data.get("feed_url") or "https://news.ycombinator.com/rss") raw_max_articles = input_data.get("max_articles") - max_articles = 3 if raw_max_articles in (None, "") else int(raw_max_articles) + try: + max_articles = ( + 3 if raw_max_articles in (None, "") else int(raw_max_articles) + ) + except (TypeError, ValueError): + return ExecutionResult( + success=False, + error="max_articles must be an integer.", + steps_executed=0, + ) twitter_credential_ref = input_data.get("twitter_credential_ref") workflow_coro = run_workflow( feed_url=feed_url, diff --git a/examples/templates/rss_twitter_agent/test_flow.py b/examples/templates/rss_twitter_agent/test_flow.py index f93f463a82..f5480a4af1 100644 --- a/examples/templates/rss_twitter_agent/test_flow.py +++ b/examples/templates/rss_twitter_agent/test_flow.py @@ -192,6 +192,15 @@ async def fake_workflow( assert captured["max_articles"] == 0 +def test_agent_rejects_non_numeric_max_articles() -> None: + result = asyncio.run( + agent_module.default_agent.trigger_and_wait("start", {"max_articles": "abc"}) + ) + + assert result.success is False + assert result.error == "max_articles must be an integer." + + def test_post_threads_impl_reports_partial_failure(monkeypatch) -> None: async def fake_post_thread( thread: dict, credential_ref: str | None = None