From 1ff088f00a67ee2362e908da461ac931d45de62c Mon Sep 17 00:00:00 2001 From: "helen@cloud" Date: Fri, 13 Feb 2026 07:10:15 -0500 Subject: [PATCH 1/6] =?UTF-8?q?[bug]=20=E4=BF=AE=E5=A4=8D=20streaming=20?= =?UTF-8?q?=E7=94=A8=E6=88=B7=E5=9B=9E=E6=98=BE=E3=80=81=E9=80=9A=E9=81=93?= =?UTF-8?q?=E6=B7=B7=E6=9D=82=E4=B8=8E=E9=87=8D=E5=A4=8D=E5=BF=AB=E7=85=A7?= =?UTF-8?q?=20#72?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/opencode_a2a_serve/agent.py | 363 +++++++++++++++++++++--- tests/test_streaming_output_contract.py | 235 +++++++++++++++ 2 files changed, 557 insertions(+), 41 deletions(-) create mode 100644 tests/test_streaming_output_contract.py diff --git a/src/opencode_a2a_serve/agent.py b/src/opencode_a2a_serve/agent.py index 8d8a125..c2a21c6 100644 --- a/src/opencode_a2a_serve/agent.py +++ b/src/opencode_a2a_serve/agent.py @@ -7,7 +7,9 @@ import uuid from collections.abc import Mapping from contextlib import suppress +from dataclasses import dataclass, field from pathlib import Path +from typing import Any from a2a.server.agent_execution import AgentExecutor, RequestContext from a2a.server.events.event_queue import EventQueue @@ -27,6 +29,77 @@ logger = logging.getLogger(__name__) +_STREAM_CHANNEL_REASONING = "reasoning" +_STREAM_CHANNEL_TOOL_CALL = "tool_call" +_STREAM_CHANNEL_FINAL_ANSWER = "final_answer" + + +@dataclass(frozen=True) +class _NormalizedStreamChunk: + text: str + append: bool + channel: str + source: str + event_type: str + message_id: str | None + role: str | None + + +@dataclass +class _StreamOutputState: + user_text: str + response_message_id: str | None = None + channel_buffers: dict[str, str] = field(default_factory=dict) + saw_final_answer_chunk: bool = False + saw_any_chunk: bool = False + + def set_response_message_id(self, message_id: str | None) -> None: + if not isinstance(message_id, str): + self.response_message_id = None + return + value = message_id.strip() + self.response_message_id = value or None + + def matches_expected_message(self, message_id: str | None) -> bool: + if not self.response_message_id: + return True + if not message_id: + return True + return message_id == self.response_message_id + + def should_drop_initial_user_echo(self, text: str, *, channel: str, role: str | None) -> bool: + if role is not None: + return False + if channel != _STREAM_CHANNEL_FINAL_ANSWER: + return False + if self.saw_any_chunk: + return False + user_text = self.user_text.strip() + return bool(user_text) and text.strip() == user_text + + def register_chunk(self, *, channel: str, text: str, append: bool) -> tuple[bool, bool]: + previous = self.channel_buffers.get(channel, "") + effective_append = append if previous else False + next_value = f"{previous}{text}" if effective_append else text + if next_value == previous: + return False, effective_append + self.channel_buffers[channel] = next_value + self.saw_any_chunk = True + if channel == _STREAM_CHANNEL_FINAL_ANSWER and next_value.strip(): + self.saw_final_answer_chunk = True + return True, effective_append + + def should_emit_final_snapshot(self, text: str) -> bool: + if not text.strip(): + return False + existing = self.channel_buffers.get(_STREAM_CHANNEL_FINAL_ANSWER, "") + if existing.strip() == text.strip(): + return False + self.channel_buffers[_STREAM_CHANNEL_FINAL_ANSWER] = text + self.saw_any_chunk = True + self.saw_final_answer_chunk = True + return True + class _TTLCache: """Bounded TTL cache for hashable key -> string value. @@ -116,6 +189,7 @@ def __init__( self._pending_session_claims: dict[str, str] = {} self._lock = asyncio.Lock() self._inflight_session_creates: dict[tuple[str, str], asyncio.Task[str]] = {} + self._session_locks: dict[str, asyncio.Lock] = {} def _resolve_and_validate_directory(self, requested: str | None) -> str | None: """Normalizes and validates the directory parameter against workspace boundaries. @@ -230,9 +304,11 @@ async def execute(self, context: RequestContext, event_queue: EventQueue) -> Non ) stream_artifact_id = f"{task_id}:stream" + stream_state = _StreamOutputState(user_text=user_text) stop_event = asyncio.Event() stream_task: asyncio.Task[None] | None = None pending_preferred_claim = False + session_lock: asyncio.Lock | None = None session_id = "" try: @@ -243,6 +319,8 @@ async def execute(self, context: RequestContext, event_queue: EventQueue) -> Non preferred_session_id=bound_session_id, directory=directory, ) + session_lock = await self._get_session_lock(session_id) + await session_lock.acquire() if streaming_request: stream_task = asyncio.create_task( @@ -251,6 +329,7 @@ async def execute(self, context: RequestContext, event_queue: EventQueue) -> Non task_id=task_id, context_id=context_id, artifact_id=stream_artifact_id, + stream_state=stream_state, event_queue=event_queue, stop_event=stop_event, directory=directory, @@ -282,7 +361,7 @@ async def execute(self, context: RequestContext, event_queue: EventQueue) -> Non ) pending_preferred_claim = False - response_text = response.text or "(No text content returned by OpenCode.)" + response_text = response.text or "" logger.debug( "OpenCode response task_id=%s session_id=%s message_id=%s text=%s", task_id, @@ -290,22 +369,26 @@ async def execute(self, context: RequestContext, event_queue: EventQueue) -> Non response.message_id, response_text, ) - assistant_message = _build_assistant_message( - task_id=task_id, - context_id=context_id, - text=response_text, - message_id=response.message_id, - ) if streaming_request: - await _enqueue_artifact_update( - event_queue=event_queue, - task_id=task_id, - context_id=context_id, - artifact_id=stream_artifact_id, - text=response_text, - append=False, - last_chunk=True, - ) + stream_state.set_response_message_id(response.message_id) + if stream_state.should_emit_final_snapshot(response_text): + await _enqueue_artifact_update( + event_queue=event_queue, + task_id=task_id, + context_id=context_id, + artifact_id=_artifact_id_for_stream_channel( + stream_artifact_id, _STREAM_CHANNEL_FINAL_ANSWER + ), + text=response_text, + append=False, + last_chunk=True, + artifact_metadata=_build_stream_artifact_metadata( + channel=_STREAM_CHANNEL_FINAL_ANSWER, + source="final_snapshot", + event_type="message.finalized", + message_id=response.message_id, + ), + ) await event_queue.enqueue_event( TaskStatusUpdateEvent( task_id=task_id, @@ -323,6 +406,13 @@ async def execute(self, context: RequestContext, event_queue: EventQueue) -> Non ) ) else: + response_text = response_text or "(No text content returned by OpenCode.)" + assistant_message = _build_assistant_message( + task_id=task_id, + context_id=context_id, + text=response_text, + message_id=response.message_id, + ) artifact = Artifact( artifact_id=str(uuid.uuid4()), name="response", @@ -366,6 +456,8 @@ async def execute(self, context: RequestContext, event_queue: EventQueue) -> Non stream_task.cancel() with suppress(asyncio.CancelledError): await stream_task + if session_lock and session_lock.locked(): + session_lock.release() async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None: task_id = context.task_id @@ -514,6 +606,14 @@ async def _release_preferred_session_claim(self, *, identity: str, session_id: s if self._pending_session_claims.get(session_id) == identity: self._pending_session_claims.pop(session_id, None) + async def _get_session_lock(self, session_id: str) -> asyncio.Lock: + async with self._lock: + lock = self._session_locks.get(session_id) + if lock is None: + lock = asyncio.Lock() + self._session_locks[session_id] = lock + return lock + async def _emit_error( self, event_queue: EventQueue, @@ -576,14 +676,14 @@ async def _consume_opencode_stream( task_id: str, context_id: str, artifact_id: str, + stream_state: _StreamOutputState, event_queue: EventQueue, stop_event: asyncio.Event, directory: str | None = None, ) -> None: - buffered_text = "" + buffered_text: dict[str, str] = {} backoff = 0.5 max_backoff = 5.0 - sent_chunk = False try: while not stop_event.is_set(): try: @@ -595,53 +695,87 @@ async def _consume_opencode_stream( event_type = event.get("type") if event_type != "message.part.updated": continue - props = event.get("properties", {}) - part = props.get("part") or {} - if part.get("sessionID") != session_id: + props = event.get("properties") + if not isinstance(props, Mapping): + continue + part = props.get("part") + if not isinstance(part, Mapping): + part = {} + if _extract_stream_session_id(part, props) != session_id: continue - role = part.get("role") or props.get("role") - if role is None: - message = props.get("message") - if isinstance(message, dict): - role = message.get("role") - if isinstance(role, str) and role.lower() in {"user", "system"}: + role = _extract_stream_role(part, props) + if role in {"user", "system"}: continue + channel = _classify_stream_channel(part, props) delta = props.get("delta") chunk_text: str | None = None append = True + source = "delta" + message_id = _extract_stream_message_id(part, props) + buffer_key = f"{channel}:{message_id or 'unknown'}" + previous = buffered_text.get(buffer_key, "") if isinstance(delta, str) and delta: chunk_text = delta - buffered_text += delta + buffered_text[buffer_key] = f"{previous}{delta}" elif part.get("type") == "text" and isinstance(part.get("text"), str): next_text = part["text"] - if next_text != buffered_text: - if next_text.startswith(buffered_text): - chunk_text = next_text[len(buffered_text) :] + if next_text != previous: + if next_text.startswith(previous): + chunk_text = next_text[len(previous) :] append = True + source = "part_text_diff" else: chunk_text = next_text append = False - buffered_text = next_text + source = "part_text_reset" + buffered_text[buffer_key] = next_text if not chunk_text: continue - if not sent_chunk: - append = False - sent_chunk = True + chunk = _NormalizedStreamChunk( + text=chunk_text, + append=append, + channel=channel, + source=source, + event_type=event_type, + message_id=message_id, + role=role, + ) + if not stream_state.matches_expected_message(chunk.message_id): + continue + if stream_state.should_drop_initial_user_echo( + chunk.text, channel=chunk.channel, role=chunk.role + ): + continue + should_emit, effective_append = stream_state.register_chunk( + channel=chunk.channel, + text=chunk.text, + append=chunk.append, + ) + if not should_emit: + continue await _enqueue_artifact_update( event_queue=event_queue, task_id=task_id, context_id=context_id, - artifact_id=artifact_id, - text=chunk_text, - append=append, + artifact_id=_artifact_id_for_stream_channel(artifact_id, chunk.channel), + text=chunk.text, + append=effective_append, last_chunk=False, + artifact_metadata=_build_stream_artifact_metadata( + channel=chunk.channel, + source=chunk.source, + event_type=chunk.event_type, + message_id=chunk.message_id, + role=chunk.role, + ), ) logger.debug( - "Stream chunk task_id=%s session_id=%s append=%s text=%s", + "Stream chunk task_id=%s session_id=%s channel=%s append=%s text=%s", task_id, session_id, - append, - chunk_text, + chunk.channel, + effective_append, + chunk.text, ) break except Exception: @@ -679,10 +813,13 @@ async def _enqueue_artifact_update( text: str, append: bool | None, last_chunk: bool | None, + artifact_metadata: Mapping[str, Any] | None = None, + event_metadata: Mapping[str, Any] | None = None, ) -> None: artifact = Artifact( artifact_id=artifact_id, parts=[TextPart(text=text)], + metadata=dict(artifact_metadata) if artifact_metadata else None, ) await event_queue.enqueue_event( TaskArtifactUpdateEvent( @@ -691,10 +828,154 @@ async def _enqueue_artifact_update( artifact=artifact, append=append, last_chunk=last_chunk, + metadata=dict(event_metadata) if event_metadata else None, ) ) +def _artifact_id_for_stream_channel(base_artifact_id: str, channel: str) -> str: + if channel == _STREAM_CHANNEL_FINAL_ANSWER: + return base_artifact_id + return f"{base_artifact_id}:{channel}" + + +def _build_stream_artifact_metadata( + *, + channel: str, + source: str, + event_type: str, + message_id: str | None = None, + role: str | None = None, +) -> dict[str, Any]: + opencode_meta: dict[str, Any] = { + "channel": channel, + "source": source, + "event_type": event_type, + } + if message_id: + opencode_meta["message_id"] = message_id + if role: + opencode_meta["role"] = role + return {"opencode": opencode_meta} + + +def _normalize_role(role: Any) -> str | None: + if not isinstance(role, str): + return None + value = role.strip().lower() + if not value: + return None + if value.startswith("role_"): + value = value[5:] + if value in {"assistant", "agent", "model", "ai"}: + return "agent" + if value in {"user", "human"}: + return "user" + if value == "system": + return "system" + return value + + +def _extract_stream_role(part: Mapping[str, Any], props: Mapping[str, Any]) -> str | None: + role = part.get("role") or props.get("role") + if role is None: + message = props.get("message") + if isinstance(message, Mapping): + role = message.get("role") + return _normalize_role(role) + + +def _extract_stream_session_id(part: Mapping[str, Any], props: Mapping[str, Any]) -> str | None: + session_keys = ("sessionID", "sessionId", "session_id") + for key in session_keys: + value = part.get(key) + if isinstance(value, str) and value: + return value + message = props.get("message") + if isinstance(message, Mapping): + for key in session_keys: + value = message.get(key) + if isinstance(value, str) and value: + return value + return None + + +def _extract_stream_message_id(part: Mapping[str, Any], props: Mapping[str, Any]) -> str | None: + message_keys = ("messageID", "messageId", "message_id", "id") + for key in message_keys: + value = part.get(key) + if isinstance(value, str): + normalized = value.strip() + if normalized: + return normalized + for key in message_keys: + value = props.get(key) + if isinstance(value, str): + normalized = value.strip() + if normalized: + return normalized + message = props.get("message") + if isinstance(message, Mapping): + for key in message_keys: + value = message.get(key) + if isinstance(value, str): + normalized = value.strip() + if normalized: + return normalized + info = message.get("info") + if isinstance(info, Mapping): + for key in message_keys: + value = info.get(key) + if isinstance(value, str): + normalized = value.strip() + if normalized: + return normalized + return None + + +def _classify_stream_channel(part: Mapping[str, Any], props: Mapping[str, Any]) -> str: + def _iter_candidates() -> list[str]: + candidates: list[str] = [] + for value in ( + part.get("channel"), + props.get("channel"), + part.get("kind"), + props.get("kind"), + part.get("type"), + props.get("type"), + props.get("deltaType"), + props.get("contentType"), + props.get("phase"), + props.get("name"), + ): + if isinstance(value, str) and value.strip(): + candidates.append(value.strip().lower()) + return candidates + + candidates = _iter_candidates() + if any( + any(keyword in candidate for keyword in ("reason", "thinking", "thought")) + for candidate in candidates + ): + return _STREAM_CHANNEL_REASONING + if any( + any( + keyword in candidate + for keyword in ( + "tool", + "function_call", + "functioncall", + "tool_call", + "toolcall", + "action", + ) + ) + for candidate in candidates + ): + return _STREAM_CHANNEL_TOOL_CALL + return _STREAM_CHANNEL_FINAL_ANSWER + + def _build_history(context: RequestContext) -> list[Message]: if context.current_task and context.current_task.history: history = list(context.current_task.history) diff --git a/tests/test_streaming_output_contract.py b/tests/test_streaming_output_contract.py new file mode 100644 index 0000000..7f04899 --- /dev/null +++ b/tests/test_streaming_output_contract.py @@ -0,0 +1,235 @@ +import asyncio + +import pytest +from a2a.server.agent_execution import RequestContext +from a2a.types import Message, MessageSendParams, Role, TaskArtifactUpdateEvent, TextPart + +from opencode_a2a_serve.agent import OpencodeAgentExecutor +from opencode_a2a_serve.config import Settings +from opencode_a2a_serve.opencode_client import OpencodeMessage + + +class DummyEventQueue: + def __init__(self) -> None: + self.events = [] + + async def enqueue_event(self, event) -> None: # noqa: ANN001 + self.events.append(event) + + async def close(self) -> None: + return None + + +class DummyStreamingClient: + def __init__( + self, + *, + stream_events_payload: list[dict], + response_text: str, + response_message_id: str = "msg-1", + send_delay: float = 0.02, + ) -> None: + self._stream_events_payload = stream_events_payload + self._response_text = response_text + self._response_message_id = response_message_id + self._send_delay = send_delay + self._in_flight_send = 0 + self.max_in_flight_send = 0 + self.stream_timeout = None + self.directory = None + self.settings = Settings( + A2A_BEARER_TOKEN="test", + OPENCODE_BASE_URL="http://localhost", + ) + + async def create_session( + self, + title: str | None = None, + *, + directory: str | None = None, + ) -> str: + del title, directory + return "ses-1" + + async def send_message( + self, + session_id: str, + text: str, + *, + directory: str | None = None, + timeout_override=None, # noqa: ANN001 + ) -> OpencodeMessage: + del text, directory, timeout_override + self._in_flight_send += 1 + self.max_in_flight_send = max(self.max_in_flight_send, self._in_flight_send) + await asyncio.sleep(self._send_delay) + self._in_flight_send -= 1 + return OpencodeMessage( + text=self._response_text, + session_id=session_id, + message_id=self._response_message_id, + raw={}, + ) + + async def stream_events(self, stop_event=None, *, directory: str | None = None): # noqa: ANN001 + del directory + for event in self._stream_events_payload: + if stop_event and stop_event.is_set(): + break + await asyncio.sleep(0) + yield event + + +def _context( + *, task_id: str, context_id: str, text: str, metadata: dict | None = None +) -> RequestContext: + message = Message( + message_id="req-1", + role=Role.user, + parts=[TextPart(text=text)], + ) + params = MessageSendParams(message=message, metadata=metadata) + return RequestContext(request=params, task_id=task_id, context_id=context_id) + + +def _event(*, session_id: str, role: str | None, part_type: str, delta: str) -> dict: + properties: dict = { + "part": { + "sessionID": session_id, + "type": part_type, + }, + "delta": delta, + } + if role is not None: + properties["part"]["role"] = role + return { + "type": "message.part.updated", + "properties": properties, + } + + +def _artifact_updates(queue: DummyEventQueue) -> list[TaskArtifactUpdateEvent]: + return [event for event in queue.events if isinstance(event, TaskArtifactUpdateEvent)] + + +def _part_text(event: TaskArtifactUpdateEvent) -> str: + part = event.artifact.parts[0] + return getattr(part, "text", None) or getattr(part.root, "text", "") + + +@pytest.mark.asyncio +async def test_streaming_filters_user_echo_and_emits_structured_channels() -> None: + user_text = "who are you" + client = DummyStreamingClient( + stream_events_payload=[ + _event(session_id="ses-1", role="ROLE_USER", part_type="text", delta=user_text), + _event(session_id="ses-1", role="assistant", part_type="reasoning", delta="thinking"), + _event( + session_id="ses-1", + role="assistant", + part_type="tool_call", + delta='{"tool":"search"}', + ), + _event(session_id="ses-1", role="assistant", part_type="text", delta="final answer"), + ], + response_text="final answer", + ) + executor = OpencodeAgentExecutor(client, streaming_enabled=True) + executor._should_stream = lambda context: True # type: ignore[method-assign] + queue = DummyEventQueue() + + await executor.execute(_context(task_id="task-1", context_id="ctx-1", text=user_text), queue) + + updates = _artifact_updates(queue) + assert updates + texts = [_part_text(event) for event in updates] + assert user_text not in texts + channels = [event.artifact.metadata["opencode"]["channel"] for event in updates] + assert _unique(channels) == ["reasoning", "tool_call", "final_answer"] + + +@pytest.mark.asyncio +async def test_streaming_does_not_send_duplicate_final_snapshot_when_chunks_exist() -> None: + client = DummyStreamingClient( + stream_events_payload=[ + _event( + session_id="ses-1", + role="assistant", + part_type="text", + delta="stable final answer", + ), + ], + response_text="stable final answer", + ) + executor = OpencodeAgentExecutor(client, streaming_enabled=True) + executor._should_stream = lambda context: True # type: ignore[method-assign] + queue = DummyEventQueue() + + await executor.execute(_context(task_id="task-2", context_id="ctx-2", text="hi"), queue) + + final_updates = [ + event + for event in _artifact_updates(queue) + if event.artifact.metadata["opencode"]["channel"] == "final_answer" + ] + assert len(final_updates) == 1 + assert _part_text(final_updates[0]) == "stable final answer" + assert final_updates[0].artifact.metadata["opencode"]["source"] != "final_snapshot" + + +@pytest.mark.asyncio +async def test_streaming_emits_final_snapshot_only_when_stream_has_no_final_answer() -> None: + client = DummyStreamingClient( + stream_events_payload=[ + _event(session_id="ses-1", role="assistant", part_type="reasoning", delta="plan step"), + ], + response_text="final answer from send_message", + ) + executor = OpencodeAgentExecutor(client, streaming_enabled=True) + executor._should_stream = lambda context: True # type: ignore[method-assign] + queue = DummyEventQueue() + + await executor.execute(_context(task_id="task-3", context_id="ctx-3", text="hello"), queue) + + final_updates = [ + event + for event in _artifact_updates(queue) + if event.artifact.metadata["opencode"]["channel"] == "final_answer" + ] + assert len(final_updates) == 1 + final_event = final_updates[0] + assert _part_text(final_event) == "final answer from send_message" + assert final_event.artifact.metadata["opencode"]["source"] == "final_snapshot" + assert final_event.append is False + assert final_event.last_chunk is True + + +@pytest.mark.asyncio +async def test_execute_serializes_send_message_per_session() -> None: + client = DummyStreamingClient( + stream_events_payload=[], + response_text="ok", + send_delay=0.05, + ) + executor = OpencodeAgentExecutor(client, streaming_enabled=False) + queue_1 = DummyEventQueue() + queue_2 = DummyEventQueue() + metadata = {"opencode_session_id": "ses-shared"} + + await asyncio.gather( + executor.execute(_context(task_id="task-4", context_id="ctx-4", text="hello", metadata=metadata), queue_1), + executor.execute(_context(task_id="task-5", context_id="ctx-5", text="world", metadata=metadata), queue_2), + ) + + assert client.max_in_flight_send == 1 + + +def _unique(items: list[str]) -> list[str]: + seen: set[str] = set() + ordered: list[str] = [] + for item in items: + if item in seen: + continue + seen.add(item) + ordered.append(item) + return ordered From 269c76d72f04cb9eba6858d73abc9e151250c7b9 Mon Sep 17 00:00:00 2001 From: "helen@cloud" Date: Fri, 13 Feb 2026 07:17:35 -0500 Subject: [PATCH 2/6] =?UTF-8?q?[bug]=20=E4=B8=A5=E6=A0=BC=E8=BF=87?= =?UTF-8?q?=E6=BB=A4=E7=BC=BA=E5=A4=B1=20message=5Fid=20=E7=9A=84=E6=B5=81?= =?UTF-8?q?=E4=BA=8B=E4=BB=B6=E5=B9=B6=E5=90=8C=E6=AD=A5=E6=96=87=E6=A1=A3?= =?UTF-8?q?=20#72?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 6 +++- docs/deployment.md | 7 ++-- docs/guide.md | 10 ++++-- src/opencode_a2a_serve/agent.py | 4 +-- tests/test_streaming_output_contract.py | 47 +++++++++++++++++++++++-- 5 files changed, 63 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index 51c0f61..c208310 100644 --- a/README.md +++ b/README.md @@ -39,7 +39,11 @@ Additional notes: - Standard A2A chat: forwards `message:send` / `message:stream` to OpenCode. - SSE streaming: `/v1/message:stream` emits incremental - `TaskArtifactUpdateEvent`, then `TaskStatusUpdateEvent(final=true)`. + `TaskArtifactUpdateEvent` with channel metadata + (`reasoning` / `tool_call` / `final_answer`), filters user/system + role updates, and only emits a final snapshot when stream chunks did + not already produce the same final answer. Stream closes with + `TaskStatusUpdateEvent(final=true)`. - Re-subscribe after disconnect: `GET /v1/tasks/{task_id}:subscribe` (available while the task is not in a terminal state). - Session continuation contract: clients can explicitly bind to an existing diff --git a/docs/deployment.md b/docs/deployment.md index eb29ed8..dac984f 100644 --- a/docs/deployment.md +++ b/docs/deployment.md @@ -396,5 +396,8 @@ Application-level safeguards: `GET /v1/tasks/{task_id}:subscribe` - service subscribes to OpenCode `/event` stream and forwards filtered per-session updates -- stream emits incremental `TaskArtifactUpdateEvent` (`append=true`) and closes - with `TaskStatusUpdateEvent(final=true)` +- stream emits incremental `TaskArtifactUpdateEvent` with channel metadata + (`reasoning` / `tool_call` / `final_answer`) +- events without `message_id` are discarded to avoid ambiguous correlation +- final snapshot is emitted only when stream chunks did not already produce + the same final answer; stream then closes with `TaskStatusUpdateEvent(final=true)` diff --git a/docs/guide.md b/docs/guide.md index 8816d4e..391af27 100644 --- a/docs/guide.md +++ b/docs/guide.md @@ -55,9 +55,13 @@ This guide covers configuration, authentication, API behavior, streaming re-subs - The service forwards A2A `message:send` to OpenCode session/message calls. - Task state defaults to `input-required` to support multi-turn interactions. - Streaming (`/v1/message:stream`) emits incremental - `TaskArtifactUpdateEvent` (`append=true`) and then - `TaskStatusUpdateEvent(final=true)`. Full output content is carried in - artifacts; non-streaming requests return a `Task` directly. + `TaskArtifactUpdateEvent` and then + `TaskStatusUpdateEvent(final=true)`. Stream artifacts carry + `artifact.metadata.opencode.channel` with values + `reasoning` / `tool_call` / `final_answer`. Events without + `message_id` are dropped. A final snapshot is only emitted when stream + chunks did not already produce the same final answer text. + Non-streaming requests return a `Task` directly. - Requests require `Authorization: Bearer `; otherwise `401` is returned. Agent Card endpoints are public. - Within one `opencode-a2a-serve` instance, all consumers share the same diff --git a/src/opencode_a2a_serve/agent.py b/src/opencode_a2a_serve/agent.py index c2a21c6..8fd50f9 100644 --- a/src/opencode_a2a_serve/agent.py +++ b/src/opencode_a2a_serve/agent.py @@ -61,9 +61,9 @@ def set_response_message_id(self, message_id: str | None) -> None: self.response_message_id = value or None def matches_expected_message(self, message_id: str | None) -> bool: - if not self.response_message_id: - return True if not message_id: + return False + if not self.response_message_id: return True return message_id == self.response_message_id diff --git a/tests/test_streaming_output_contract.py b/tests/test_streaming_output_contract.py index 7f04899..55a9822 100644 --- a/tests/test_streaming_output_contract.py +++ b/tests/test_streaming_output_contract.py @@ -92,7 +92,14 @@ def _context( return RequestContext(request=params, task_id=task_id, context_id=context_id) -def _event(*, session_id: str, role: str | None, part_type: str, delta: str) -> dict: +def _event( + *, + session_id: str, + role: str | None, + part_type: str, + delta: str, + message_id: str | None = "msg-1", +) -> dict: properties: dict = { "part": { "sessionID": session_id, @@ -102,6 +109,8 @@ def _event(*, session_id: str, role: str | None, part_type: str, delta: str) -> } if role is not None: properties["part"]["role"] = role + if message_id is not None: + properties["part"]["messageID"] = message_id return { "type": "message.part.updated", "properties": properties, @@ -217,13 +226,45 @@ async def test_execute_serializes_send_message_per_session() -> None: metadata = {"opencode_session_id": "ses-shared"} await asyncio.gather( - executor.execute(_context(task_id="task-4", context_id="ctx-4", text="hello", metadata=metadata), queue_1), - executor.execute(_context(task_id="task-5", context_id="ctx-5", text="world", metadata=metadata), queue_2), + executor.execute( + _context(task_id="task-4", context_id="ctx-4", text="hello", metadata=metadata), queue_1 + ), + executor.execute( + _context(task_id="task-5", context_id="ctx-5", text="world", metadata=metadata), queue_2 + ), ) assert client.max_in_flight_send == 1 +@pytest.mark.asyncio +async def test_streaming_drops_events_without_message_id_and_falls_back_to_snapshot() -> None: + client = DummyStreamingClient( + stream_events_payload=[ + _event( + session_id="ses-1", + role="assistant", + part_type="text", + delta="stream chunk without id", + message_id=None, + ), + ], + response_text="final answer from send_message", + ) + executor = OpencodeAgentExecutor(client, streaming_enabled=True) + executor._should_stream = lambda context: True # type: ignore[method-assign] + queue = DummyEventQueue() + + await executor.execute(_context(task_id="task-6", context_id="ctx-6", text="hello"), queue) + + updates = _artifact_updates(queue) + assert len(updates) == 1 + update = updates[0] + assert _part_text(update) == "final answer from send_message" + assert update.artifact.metadata["opencode"]["source"] == "final_snapshot" + assert update.artifact.metadata["opencode"]["channel"] == "final_answer" + + def _unique(items: list[str]) -> list[str]: seen: set[str] = set() ordered: list[str] = [] From ce1a933aaf53c7436bde455e2da4d64a5a25a561 Mon Sep 17 00:00:00 2001 From: "helen@cloud" Date: Fri, 13 Feb 2026 07:19:56 -0500 Subject: [PATCH 3/6] =?UTF-8?q?[docs]=20=E6=94=B6=E6=95=9B=20README=20?= =?UTF-8?q?=E6=B5=81=E5=BC=8F=E7=BB=86=E8=8A=82=E5=B9=B6=E7=BB=9F=E4=B8=80?= =?UTF-8?q?=E6=8C=87=E5=90=91=20guide=20#72?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index c208310..67982f0 100644 --- a/README.md +++ b/README.md @@ -38,12 +38,9 @@ Additional notes: ## Capabilities - Standard A2A chat: forwards `message:send` / `message:stream` to OpenCode. -- SSE streaming: `/v1/message:stream` emits incremental - `TaskArtifactUpdateEvent` with channel metadata - (`reasoning` / `tool_call` / `final_answer`), filters user/system - role updates, and only emits a final snapshot when stream chunks did - not already produce the same final answer. Stream closes with - `TaskStatusUpdateEvent(final=true)`. +- SSE streaming: `/v1/message:stream` emits incremental updates and then + closes with `TaskStatusUpdateEvent(final=true)`. For detailed streaming + contract and event semantics, see `docs/guide.md`. - Re-subscribe after disconnect: `GET /v1/tasks/{task_id}:subscribe` (available while the task is not in a terminal state). - Session continuation contract: clients can explicitly bind to an existing From f06bae6ca5066ff83ba9d0049dcfc780f17495df Mon Sep 17 00:00:00 2001 From: "helen@cloud" Date: Sat, 14 Feb 2026 00:24:10 -0500 Subject: [PATCH 4/6] =?UTF-8?q?[docs]=20=E6=B7=BB=E5=8A=A0=20elevator=20pi?= =?UTF-8?q?tch=E3=80=81=E6=8A=80=E6=9C=AF=E6=A0=88=E6=A6=82=E8=A7=88?= =?UTF-8?q?=E4=B8=8E=E5=89=8D=E7=BD=AE=E4=BE=9D=E8=B5=96=E6=8F=90=E7=A4=BA?= =?UTF-8?q?=20#72?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/README.md b/README.md index 67982f0..65248cc 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,9 @@ # opencode-a2a-serve +> **Turning OpenCode into a production-ready, stateful Agent API with REST/JSON-RPC endpoints, authentication, streaming, and session management.** +> +> **Tech Stack:** Python 3.11+ | FastAPI | A2A SDK | `uv` | `pytest` + `opencode-a2a-serve` is an adapter layer that exposes OpenCode as an A2A service (FastAPI + A2A SDK). It provides: - A2A HTTP+JSON (REST): `/v1/message:send`, `/v1/message:stream`, @@ -58,6 +62,8 @@ Additional notes: ## Quick Start +*Prerequisites: Ensure you have [`uv`](https://github.com/astral-sh/uv) installed and an accessible LLM provider configured in your OpenCode environment.* + 1. Start OpenCode: ```bash From 782498e9e76bda0a56987986aacfb4e2f47103b3 Mon Sep 17 00:00:00 2001 From: "helen@cloud" Date: Sat, 14 Feb 2026 00:26:44 -0500 Subject: [PATCH 5/6] =?UTF-8?q?[docs]=20=E6=9B=B4=E6=AD=A3=E5=89=8D?= =?UTF-8?q?=E7=BD=AE=E4=BE=9D=E8=B5=96=E6=8F=90=E7=A4=BA=EF=BC=9A=E9=87=87?= =?UTF-8?q?=E7=94=A8=E7=8E=AF=E5=A2=83=E5=8F=98=E9=87=8F=E6=B3=A8=E5=85=A5?= =?UTF-8?q?=20LLM=20API=20Keys=20#72?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 65248cc..2b6a058 100644 --- a/README.md +++ b/README.md @@ -62,7 +62,7 @@ Additional notes: ## Quick Start -*Prerequisites: Ensure you have [`uv`](https://github.com/astral-sh/uv) installed and an accessible LLM provider configured in your OpenCode environment.* +*Prerequisites: Ensure you have [`uv`](https://github.com/astral-sh/uv) installed and your LLM provider API keys exported as environment variables (e.g., `GOOGLE_GENERATIVE_AI_API_KEY`).* 1. Start OpenCode: From a57afc1920a4368c15354b9ddf41a55625282eca Mon Sep 17 00:00:00 2001 From: "helen@cloud" Date: Sat, 14 Feb 2026 00:28:41 -0500 Subject: [PATCH 6/6] =?UTF-8?q?[docs]=20=E7=A7=BB=E9=99=A4=20Quick=20Start?= =?UTF-8?q?=20=E4=B8=AD=E7=9A=84=E5=89=8D=E6=8F=90=E4=BE=9D=E8=B5=96?= =?UTF-8?q?=E8=AF=B4=E6=98=8E=20#72?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 2 -- 1 file changed, 2 deletions(-) diff --git a/README.md b/README.md index 2b6a058..6bbc4be 100644 --- a/README.md +++ b/README.md @@ -62,8 +62,6 @@ Additional notes: ## Quick Start -*Prerequisites: Ensure you have [`uv`](https://github.com/astral-sh/uv) installed and your LLM provider API keys exported as environment variables (e.g., `GOOGLE_GENERATIVE_AI_API_KEY`).* - 1. Start OpenCode: ```bash