Skip to content
Draft
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions res/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,13 @@ Apify input-schema spec semantics (required vs default vs prefill), the root cau
Notes on keeping widget bundles small (narrow `@apify/ui-library` imports, markdown stack cost).
- **Use case**: When changing widget dependencies or markdown rendering, re-measure bundle impact

### [task_status_workaround.md](./task_status_workaround.md)
Why all task results (including errors) are stored as `'completed'` instead of `'failed'`.
- SDK `requestStream()` discards stored results for `'failed'` tasks
- `[error]` prefix convention in `statusMessage` to signal the real outcome
- Upstream fixes needed in the MCP SDK and mcpc
- **Use case**: Reference when touching task status logic or updating the MCP SDK

### [chatgpt-app-submission.md](./chatgpt-app-submission.md)
Checklist and notes for ChatGPT MCP Apps store submission (verify line references against current source before relying on them).
- **Use case**: Submission prep and audits
Expand Down
97 changes: 97 additions & 0 deletions res/task_status_workaround.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
# Task status workaround: why errors are stored as 'completed'

## The problem

The MCP SDK's `requestStream()` (in `shared/protocol.js`) handles terminal task states differently:

- **`'completed'`** → calls `getTaskResult()` → yields `{ type: 'result', result }` → client gets the full `CallToolResult`
- **`'failed'`** → yields `{ type: 'error', error: "Task {id} failed" }` → client gets a generic error, **stored result is discarded**
- **`'cancelled'`** → same as failed, generic error

This means any result stored with `status: 'failed'` is never delivered to the client. The actual error text, x402 payment payload, structured content — all lost.

## What we do

We store **all** task results as `'completed'`, including errors. The error nature is conveyed through:

1. **`isError: true`** in the `CallToolResult` payload — this is what clients (mcpc, Claude, Cursor) use to determine success/failure
2. **`[error]` prefix** in `statusMessage` — so `tasks/list` and `tasks/get` polling clearly shows the task failed

### Affected paths in `executeToolAndUpdateTask()`

| Path | Result | Status | statusMessage |
|---|---|---|---|
| Success | tool output | `'completed'` | `tool-name: completed` |
| SOFT_FAIL (actor not found, validation) | error text + `isError: true` | `'completed'` | `[error] tool-name: Actor not found...` |
| Payment required (pre-flight) | x402 payload + `isError: true` | `'completed'` | `[error] tool-name: payment required` |
| Payment required (catch 402) | x402 payload + `isError: true` | `'completed'` | `[error] tool-name: payment required` |
| Hard error (5xx, network, etc.) | error text + `isError: true` | `'completed'` | `[error] tool-name: error text...` |
| Aborted (signal) | none | `'cancelled'` | `tool-name: aborted by client` |
Comment thread
jirispilka marked this conversation as resolved.

### Why x402 payment specifically requires 'completed'

The mcpc bridge's `handlePaymentRequiredRetry()` inspects the tool result for `isError: true` + `x402Version` + `accepts` fields. If the task is stored as `'failed'`, the SDK never delivers the result, and the auto-pay retry never triggers.

## What should be fixed upstream

### SDK: `requestStream()` should deliver results for 'failed' tasks

Location: `@modelcontextprotocol/sdk/shared/protocol.js`, lines ~566-583

```javascript
// Current behavior (broken):
if (task.status === 'failed') {
yield { type: 'error', error: new McpError(ErrorCode.InternalError, `Task ${taskId} failed`) };
}

// Desired behavior:
if (task.status === 'failed') {
const result = await this.getTaskResult({ taskId }, resultSchema, options);
yield { type: 'result', result }; // result has isError: true
}
```

The `'failed'` status was stored alongside a result via `storeTaskResult()`. The SDK should deliver that result, not discard it. The `isError` flag in the result already tells the client it's an error.

Alternatively, `requestStream()` could yield both the result and a status indicator, but the simplest fix is to treat `'failed'` the same as `'completed'` for result delivery.

### SDK: `requestStream()` 'failed' error should include statusMessage

Even without delivering the full result, the generic `"Task {id} failed"` message should at least include `task.statusMessage`:

```javascript
yield {
type: 'error',
error: new McpError(ErrorCode.InternalError, task.statusMessage || `Task ${taskId} failed`)
};
```

### SDK: `storeTaskResult()` should accept a statusMessage

Currently we use `storeTaskResultWithMessage()` which calls `updateTaskStatus('working', message)` then `storeTaskResult(status, result)` — two non-atomic calls. The SDK's `storeTaskResult()` should accept an optional `statusMessage` parameter to make this atomic.

### mcpc: `pollTask()` should fetch result for 'completed' tasks

Location: `mcp-cli/src/core/mcp-client.ts`, lines ~715-720

```javascript
// Current behavior (incomplete):
if (task.status === 'completed') {
return { content: [{ type: 'text', text: task.statusMessage || 'Task completed' }] };
}

// Desired behavior:
if (task.status === 'completed') {
const result = await this.getTaskResult(taskId);
return result;
}
```

The polling fallback returns the `statusMessage` as fake content instead of fetching the actual stored result.

## When to remove this workaround

Once the SDK's `requestStream()` delivers results for `'failed'` tasks, we can:
1. Store errors as `'failed'` (semantically correct)
2. Remove the `[error]` prefix from statusMessages
3. Remove `storeTaskResultWithMessage()` if `storeTaskResult()` accepts statusMessage
63 changes: 51 additions & 12 deletions src/mcp/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ import { getUserIdFromTokenCached } from '../utils/userid_cache.js';
import { getPackageVersion } from '../utils/version.js';
import { connectMCPClient } from './client.js';
import { EXTERNAL_TOOL_CALL_TIMEOUT_MSEC, LOG_LEVEL_MAP } from './const.js';
import { isTaskCancelled, processParamsGetTools } from './utils.js';
import { isTaskCancelled, processParamsGetTools, storeTaskResultWithMessage } from './utils.js';

/** Mode → actor executor. Add new modes here. */
const actorExecutorsByMode: Record<ServerMode, ActorExecutor> = {
Expand Down Expand Up @@ -603,7 +603,10 @@ export class ActorsMcpServer {
`Cannot cancel task "${taskId}" with status "${task.status}"`,
);
}
await this.taskStore.updateTaskStatus(taskId, 'cancelled', 'Cancelled by client', mcpSessionId);
// Extract tool/actor prefix from the last statusMessage (e.g. "apify/rag-web-browser: Starting...")
const toolPrefix = task.statusMessage?.split(':')?.[0];
Comment thread
jirispilka marked this conversation as resolved.
Outdated
const cancelMessage = toolPrefix ? `${toolPrefix}: cancelled by client` : 'Cancelled by client';
await this.taskStore.updateTaskStatus(taskId, 'cancelled', cancelMessage, mcpSessionId);
Comment thread
jirispilka marked this conversation as resolved.
const updatedTask = await this.taskStore.getTask(taskId, mcpSessionId);
log.debug('[CancelTaskRequestSchema] Task cancelled successfully', { taskId, mcpSessionId });
return updatedTask!;
Expand Down Expand Up @@ -1259,21 +1262,55 @@ export class ActorsMcpServer {
return;
Comment thread
jirispilka marked this conversation as resolved.
}

// Store the result in the task store
log.debug('[executeToolAndUpdateTask] Storing completed result', {
taskId,
mcpSessionId,
});
await this.taskStore.storeTaskResult(taskId, 'completed', result, mcpSessionId);
log.debug('Task completed successfully', { taskId, toolName: tool.name, mcpSessionId });
// Persist every non-aborted result. Task-supporting tools like call-actor can return
// isError/toolStatus without throwing, and task mode must store that payload the same
// way non-task mode returns it.
log.debug('Storing task result', { taskId, toolStatus, mcpSessionId });
if (toolStatus === TOOL_STATUS.ABORTED) {
// Guard: a concurrent tasks/cancel request may have already transitioned the task
// to 'cancelled' between the isTaskCancelled check above and here. Skip if already terminal.
if (!await isTaskCancelled(taskId, mcpSessionId, this.taskStore)) {
await this.taskStore.updateTaskStatus(taskId, 'cancelled', `${getToolFullName(tool)}: aborted by client`, mcpSessionId);
}
} else {
// Always store as 'completed' — even for SOFT_FAIL and paymentRequired.
// The MCP SDK's requestStream() only calls getTaskResult() for 'completed' tasks;
// 'failed' tasks yield a generic "Task X failed" error and discard the stored result.
// Error details are preserved in the result payload (isError: true, content text).
// See res/task_status_workaround.md for full context.
let statusMessage: string;
if (paymentRequiredResult) {
statusMessage = `[error] ${getToolFullName(tool)}: payment required`;
} else if (toolStatus === TOOL_STATUS.SOFT_FAIL) {
const fullText = (result as { content?: { text?: string }[] })?.content?.[0]?.text || '';
const errorText = fullText.length > 200 ? `${fullText.slice(0, 200)}… (truncated)` : fullText;
statusMessage = errorText
? `[error] ${getToolFullName(tool)}: ${errorText}`
: `[error] ${getToolFullName(tool)}: failed`;
} else {
statusMessage = `${getToolFullName(tool)}: completed`;
}
await storeTaskResultWithMessage(this.taskStore, taskId, 'completed', result, statusMessage, mcpSessionId);
Comment thread
jirispilka marked this conversation as resolved.
Outdated
}
log.debug('Task execution finished', { taskId, toolName: tool.name, toolStatus, mcpSessionId });

finishTaskTracking(toolStatus, callDiagnostics);
} catch (error) {
// Handle 402 Payment Required — return structured x402 result so clients can auto-pay
const httpStatus = getHttpStatusCode(error);
if (httpStatus === HTTP_PAYMENT_REQUIRED) {
logHttpError(error, 'Payment required while calling tool (task mode)', { toolName: tool.name });
await this.taskStore.storeTaskResult(taskId, 'completed', buildPaymentRequiredResponse(error), mcpSessionId);
// Guard: task may have been cancelled while the actor was running. The SDK throws
// if we try to transition from terminal 'cancelled' to 'working' (inside storeTaskResultWithMessage).
// Track as ABORTED — the cancellation is what matters, not the 402.
if (await isTaskCancelled(taskId, mcpSessionId, this.taskStore)) {
finishTaskTracking(TOOL_STATUS.ABORTED, { ...buildActorFields(actorName, actorId) });
return;
}
// Store as 'completed' — see comment above and res/task_status_workaround.md.
// The x402 payload must reach the client for auto-pay retry.
const paymentResult = buildPaymentRequiredResponse(error);
await storeTaskResultWithMessage(this.taskStore, taskId, 'completed', paymentResult, `[error] ${getToolFullName(tool)}: payment required`, mcpSessionId);
finishTaskTracking(TOOL_STATUS.SOFT_FAIL, {
failure_category: FAILURE_CATEGORY.INVALID_INPUT,
Comment thread
jirispilka marked this conversation as resolved.
failure_http_status: 402,
Expand Down Expand Up @@ -1337,14 +1374,16 @@ export class ActorsMcpServer {
taskId,
mcpSessionId,
});
Comment thread
jirispilka marked this conversation as resolved.
Outdated
await this.taskStore.storeTaskResult(taskId, 'failed', {
// Store as 'completed' so the SDK's requestStream() delivers the result to the client.
// See res/task_status_workaround.md for full context.
await storeTaskResultWithMessage(this.taskStore, taskId, 'completed', {
content: [{
type: 'text' as const,
text: userText,
}],
isError: true,
internalToolStatus: toolStatus,
}, mcpSessionId);
}, `[error] ${getToolFullName(tool)}: ${userText.length > 200 ? `${userText.slice(0, 200)}… (truncated)` : userText || 'failed'}`, mcpSessionId);
Comment thread
jirispilka marked this conversation as resolved.
Outdated

finishTaskTracking(toolStatus, callDiagnostics);
}
Expand Down
22 changes: 22 additions & 0 deletions src/mcp/utils.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { parse } from 'node:querystring';

import type { TaskStore } from '@modelcontextprotocol/sdk/experimental/tasks/interfaces.js';
import type { Result } from '@modelcontextprotocol/sdk/types.js';
import type { ApifyClient } from 'apify-client';

import { processInput } from '../input.js';
Expand Down Expand Up @@ -47,3 +48,24 @@ export async function isTaskCancelled(
const task = await taskStore.getTask(taskId, mcpSessionId);
return task?.status === 'cancelled';
}

/**
* Stores a task result with a final statusMessage in one logical step.
*
* WARNING: This is NOT atomic. The SDK's storeTaskResult() does not accept a statusMessage,
* so we first call updateTaskStatus('working', message) then storeTaskResult(status, result).
* If the process crashes between the two calls, the task stays 'working' with the final message
* but never transitions to terminal state. This is acceptable because the same crash would leave
* the task stuck regardless — the TTL cleanup will eventually remove it.
*/
export async function storeTaskResultWithMessage(
taskStore: TaskStore,
taskId: string,
status: 'completed' | 'failed',
result: Result,
Comment thread
jirispilka marked this conversation as resolved.
statusMessage: string,
Comment thread
jirispilka marked this conversation as resolved.
sessionId?: string,
): Promise<void> {
await taskStore.updateTaskStatus(taskId, 'working', statusMessage, sessionId);
await taskStore.storeTaskResult(taskId, status, result, sessionId);
}
36 changes: 36 additions & 0 deletions tests/integration/suite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2446,6 +2446,42 @@ export function createIntegrationTestsSuite(
await assertStatusMessagePropagated(client, stream);
});

// Regression test for #638: final statusMessage must be ": completed" after task finishes,
// not a stale intermediate progress message like "Starting the crawler."
it('should set final statusMessage to "completed" after task finishes successfully', async () => {
client = await createClientFn({ tools: [ACTOR_PYTHON_EXAMPLE] });

const stream = client.experimental.tasks.callToolStream(
{
name: actorNameToToolName(ACTOR_PYTHON_EXAMPLE),
arguments: {
first_number: 7,
second_number: 8,
},
},
CallToolResultSchema,
{
task: {
ttl: 60000,
},
},
);

let taskId: string | null = null;
for await (const message of stream) {
if (message.type === 'taskCreated') {
taskId = message.task.taskId;
} else if (message.type === 'error') {
throw message.error;
}
}

expect(taskId).not.toBeNull();
const task = await client.experimental.tasks.getTask(taskId!);
expect(task.status).toBe('completed');
expect(task.statusMessage).toMatch(/: completed$/);
});

it.runIf(options.transport === 'stdio')('should use UI_MODE env var when CLI arg is not provided', async () => {
client = await createClientFn({ useEnv: true, uiMode: 'openai' });
const tools = await client.listTools();
Expand Down
75 changes: 75 additions & 0 deletions tests/unit/server.task_execution.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import { InMemoryTaskStore } from '@modelcontextprotocol/sdk/experimental/tasks/stores/in-memory.js';
import { describe, expect, it, vi } from 'vitest';

import { FAILURE_CATEGORY, TOOL_STATUS } from '../../src/const.js';
import { ActorsMcpServer } from '../../src/mcp/server.js';
import type { ToolEntry } from '../../src/types.js';

describe('ActorsMcpServer task execution', () => {
it('stores non-throwing soft-fail results for task-supporting tools like call-actor', async () => {
const taskStore = new InMemoryTaskStore();
const server = new ActorsMcpServer({
taskStore,
telemetry: { enabled: false },
transportType: 'stdio',
});

const task = await taskStore.createTask(
{ ttl: 60_000 },
'req-1',
{ method: 'tools/call', params: { name: 'soft-fail-tool' } },
);

const tool = {
type: 'internal',
name: 'call-actor',
description: 'Call Actor',
Comment thread
jirispilka marked this conversation as resolved.
inputSchema: { type: 'object', properties: {} },
ajvValidate: vi.fn(),
execution: {
taskSupport: 'optional',
},
call: vi.fn().mockResolvedValue({
content: [{
type: 'text',
text: 'Actor \'missing/actor\' was not found.',
}],
isError: true,
toolTelemetry: {
toolStatus: TOOL_STATUS.SOFT_FAIL,
failureCategory: FAILURE_CATEGORY.INVALID_INPUT,
},
}),
} as unknown as ToolEntry;

await (server as unknown as {
executeToolAndUpdateTask: (params: Record<string, unknown>) => Promise<void>;
}).executeToolAndUpdateTask({
taskId: task.taskId,
tool,
toolArgs: {},
logSafeArgs: {},
apifyClient: {} as never,
apifyToken: '',
progressToken: undefined,
extra: {
signal: new AbortController().signal,
},
mcpSessionId: undefined,
});

// Stored as 'completed' because the SDK's requestStream() only delivers getTaskResult()
// for 'completed' tasks. Error details are in the result payload (isError: true).
const storedTask = await taskStore.getTask(task.taskId);
expect(storedTask?.status).toBe('completed');

const storedResult = await taskStore.getTaskResult(task.taskId);
expect(storedResult).toMatchObject({
isError: true,
content: [{
type: 'text',
text: 'Actor \'missing/actor\' was not found.',
}],
});
});
});
Loading
Loading