Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/source/start/agent_interaction.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,15 @@ The inference and verification scripts for this page live under `examples/agent_
| Qwen3-Coder-Next | temp=0.8, topp=0.9, tp=16, 300 turns, 128K context | **67.6** (Avg@4) |
| Qwen3.5-4B | temp=0.8, topp=0.9, tp=4, 100 turns, 64K context | **45.2** (Avg@1) |
| Qwen3.5-9B | temp=1.0, topp=0.7, tp=4, 100 turns, 64K context | **53.8** (Avg@1) |
| Qwen3.5-9B | temp=1.0, topp=0.95, tp=4, 200 turns, 128k context | **65.6** (Avg@1) |
| Qwen3.5-35B-A3B | temp=1.0, topp=0.7, tp=4, 300 turns, 128K context | **68.4** (Avg@1) |

**Reference results on SWE-bench Multilingual with Uni-Agent:**

| **Model** | Inference Config | **Uni-Agent** |
| ---------------------------- | ----------------------- |:-------------:|
| Qwen3-Coder-30B-A3B-Instruct | 200 turns, 128K context | **32.3** (Avg@1) |

**Reference results on Terminal-Bench v2 with Uni-Agent:**

| **Model** | Inference Config | **Uni-Agent** |
Expand Down
6 changes: 6 additions & 0 deletions examples/agent_env/demo.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,12 @@
"deployment": deployment_config,
"env_variables": {
"PIP_PROGRESS_BAR": "off",
"PIP_CACHE_DIR": "~/.cache/pip",
"PAGER": "cat",
"MANPAGER": "cat",
"LESS": "-R",
"TQDM_DISABLE": "1",
"GIT_PAGER": "cat",
},
}
env_config = AgentEnvConfig(**env_config)
Expand Down
136 changes: 92 additions & 44 deletions examples/agent_interaction/parallel_verify_swe.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,41 +8,40 @@

import ray
from datasets import load_dataset
from tqdm import tqdm

from uni_agent.async_logging import add_file_handler, cleanup_handlers
from uni_agent.interaction import AgentEnv, AgentEnvConfig
from uni_agent.reward import load_reward_spec

logger = logging.getLogger(__file__)
logging.basicConfig(level=logging.INFO, format="%(asctime)s | %(levelname)s | %(message)s")
logger = logging.getLogger(__name__)
logger.setLevel("INFO")

GLOBAL_CONCURRENCY = int(os.getenv("GLOBAL_CONCURRENCY", 512))
NUM_WORKERS = int(os.getenv("NUM_WORKERS", 8))
DATA_PATH = os.getenv("DATA_PATH", "/home/tiger/data/swe_agent/swe_bench_multilingual_modal.parquet")


async def run_sample(sample):
run_id = str(uuid.uuid4())
instance = sample["extra_info"]["tools_kwargs"]
impl = os.getenv("DEPLOYMENT", "vefaas").lower()

# SWE preprocessors emit ``env.deployment.image`` (nested, matching
# ``AgentEnvConfig`` / ``DeployConfig``). Older parquets used flat
# ``env.image``; accept both so a stale parquet doesn't silently break.
instance_image = instance["env"].get("deployment", {}).get("image") or instance["env"].get("image")
if instance_image is None:
case_deployment = dict(instance["env"].get("deployment", {}))
if not case_deployment.get("image"):
raise KeyError("No image found in instance.env.deployment.image or instance.env.image")

if impl == "vefaas":
deployment_config = {
defaults = {
"type": "vefaas",
"image": instance_image,
"command": "curl -fsSL https://vefaas-swe.tos-cn-beijing.ivolces.com/swe-rex/install_1.4.0.sh | bash -s -- {token}",
"timeout": 600.0,
"startup_timeout": 180.0,
"function_id": os.getenv("VEFAAS_FUNCTION_ID"),
"function_route": os.getenv("VEFAAS_FUNCTION_ROUTE"),
}
elif impl == "modal":
deployment_config = {
defaults = {
"type": "modal",
"image": instance_image,
"startup_timeout": 600.0,
"runtime_timeout": 600.0,
"deployment_timeout": 3600.0,
Expand All @@ -52,6 +51,9 @@ async def run_sample(sample):
else:
raise ValueError(f"Invalid environment implementation: {impl}")

# Case config wins; defaults fill in whatever the case didn't specify.
deployment_config = {**defaults, **case_deployment}

env_config = {
"deployment": deployment_config,
"env_variables": {
Expand All @@ -78,73 +80,119 @@ async def run_sample(sample):
reward_spec = load_reward_spec(reward_config)
add_file_handler(Path(f"/tmp/eval_gold_patch/{run_id}.log"), run_id)

await env.start()
await reward_spec.apply_gold_patch()
_, result = await reward_spec.compute_reward()
await env.close()
cleanup_handlers(run_id)
try:
await env.start()
await reward_spec.apply_gold_patch()
_, result = await reward_spec.compute_reward()
except Exception as e:
logger.error(f"Error running sample {run_id}: {e}")
result = {"resolved": False, "eval_completed": False, "eval_execution_time": None}
finally:
await env.close()
cleanup_handlers(run_id)
return result


@ray.remote
class TestEvalActor:
_semaphore = asyncio.Semaphore(64)

async def run_batch(self, samples):
tasks = [self.run_single(sample) for sample in samples]
return await asyncio.gather(*tasks)
_semaphore = asyncio.Semaphore(max(1, GLOBAL_CONCURRENCY // NUM_WORKERS))

async def run_single(self, sample):
async with self._semaphore:
return await run_sample(sample)
Comment on lines +98 to 102

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Instantiating asyncio.Semaphore at the class level (outside of an active event loop) will raise a RuntimeError: no running event loop in Python 3.11+ during module import or class definition. Additionally, because Ray actors run in their own event loops, class-level semaphores can be bound to the wrong event loop, leading to RuntimeError: Task got Future attached to a different loop.\n\nTo fix this, lazily initialize the semaphore inside the actor's methods or inside the actor's initialization context.

    def __init__(self):\n        self._semaphore = None\n\n    async def run_single(self, sample):\n        if self._semaphore is None:\n            self._semaphore = asyncio.Semaphore(max(1, GLOBAL_CONCURRENCY // NUM_WORKERS))\n        async with self._semaphore:\n            return await run_sample(sample)



def _rule(text: str = "", width: int = 50, ch: str = "─") -> str:
"""A centered-title horizontal rule. Emoji-safe (left-aligned rows below it
carry the values, so we never depend on monospace emoji width)."""
if not text:
return ch * width
pad = max(0, width - len(text) - 2)
return f"{ch * (pad // 2)} {text} {ch * (pad - pad // 2)}"


def main():
ray.init()
# data_path = "/home/tiger/data/swe_agent/swe_rebench_filtered.parquet"
# data_path = "/home/tiger/data/swe_agent/r2e_gym_subset.parquet"
data_path = "/home/tiger/data/swe_agent/swe_bench_verified_modal.parquet"
dataset = load_dataset("parquet", data_files=data_path, split="train")
dataset = load_dataset("parquet", data_files=DATA_PATH, split="train")
samples = dataset.to_list()
workers = [TestEvalActor.remote() for _ in range(8)]
futures = []
chunk_size = (len(samples) - 1) // len(workers) + 1
for i in range(len(workers)):
chunk = samples[i * chunk_size : (i + 1) * chunk_size]
futures.append(workers[i].run_batch.remote(chunk))
# each future returns a list of per-sample results (one chunk per worker)
logger.info(f"loaded {len(samples)} samples from {DATA_PATH}")
logger.info(
f"deployment={os.getenv('DEPLOYMENT', 'vefaas')} workers={NUM_WORKERS} concurrency={GLOBAL_CONCURRENCY}"
)

workers = [TestEvalActor.remote() for _ in range(NUM_WORKERS)]
# one future per sample (round-robin across workers) so we can stream
# per-sample progress; the actor semaphore still bounds real concurrency.
futures = [workers[i % len(workers)].run_single.remote(s) for i, s in enumerate(samples)]
fut_to_idx = {f: i for i, f in enumerate(futures)}

begin_time = time.time()
results_chunk = ray.get(futures)
results: list = [None] * len(futures)
ok = wa = tle = 0
remaining = list(futures)
with tqdm(
total=len(futures),
desc="🚀 eval",
colour="green",
unit="inst",
dynamic_ncols=True,
bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}]{postfix}",
) as pbar:
while remaining:
done, remaining = ray.wait(remaining, num_returns=1)
for d in done:
res = ray.get(d)
results[fut_to_idx[d]] = res
if res.get("resolved"):
ok += 1
elif res.get("eval_completed"):
wa += 1
else:
tle += 1
rate = ok / pbar.n * 100 if pbar.n else 0.0
pbar.set_postfix_str(f"✅{ok} ❌WA{wa} ⏱TLE{tle} | {rate:.0f}% pass")
pbar.update(1)
end_time = time.time()
logger.info(f"time cost: {end_time - begin_time:.2f}s")
results = [item for chunk in results_chunk for item in chunk]

all_num = len(results)
success_num = len([item for item in results if item["resolved"]])
fail_wa_num = len([item for item in results if not item["resolved"] and item["eval_completed"]])
fail_tle_num = len([item for item in results if not item["resolved"] and not item["eval_completed"]])
success_num = len([r for r in results if r.get("resolved")])
fail_wa_num = len([r for r in results if not r.get("resolved") and r.get("eval_completed")])
fail_tle_num = len([r for r in results if not r.get("resolved") and not r.get("eval_completed")])

def instance_name(sample):
return sample["extra_info"]["tools_kwargs"]["reward"]["metadata"]["instance_id"]

fail_wa_names = [
instance_name(sample)
for sample, item in zip(samples, results, strict=False)
if not item["resolved"] and item["eval_completed"]
if not item.get("resolved") and item.get("eval_completed")
]
fail_tle_names = [
instance_name(sample)
for sample, item in zip(samples, results, strict=False)
if not item["resolved"] and not item["eval_completed"]
if not item.get("resolved") and not item.get("eval_completed")
]

exec_times = [r["eval_execution_time"] for r in results if r.get("eval_execution_time") is not None]
avg_exec_time = sum(exec_times) / len(exec_times)

logger.info(
f"all_num: {all_num}, success_num: {success_num}, fail_wa_num: {fail_wa_num}, fail_tle_num: {fail_tle_num}"
avg_exec_time = sum(exec_times) / len(exec_times) if exec_times else 0.0
pass_rate = success_num / all_num * 100 if all_num else 0.0
wall = end_time - begin_time

summary = "\n".join(
[
"",
_rule("🧪 eval summary"),
f" ✅ resolved {success_num:>4} ({pass_rate:.1f}%)",
f" ❌ wrong-ans {fail_wa_num:>4}",
f" ⏱ timeout {fail_tle_num:>4}",
f" Σ total {all_num:>4}",
_rule(f"avg {avg_exec_time:.1f}s · wall {wall:.1f}s · n={len(exec_times)}"),
"",
]
)
logger.info(f"avg_execution_time: {avg_exec_time:.2f}s (n={len(exec_times)})")
print(summary)

logger.info(f"fail_wa instance names: {fail_wa_names}")
logger.info(f"fail_tle instance names: {fail_tle_names}")
Expand Down
Loading
Loading