Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -227,13 +227,29 @@ public ListTasksResult list(ListTasksParams params) {
countQueryBuilder.append(" AND t.state = :state");
}

// Apply pagination cursor (tasks after pageToken)
// Apply lastUpdatedAfter filter using denormalized timestamp column
if (params.lastUpdatedAfter() != null) {
queryBuilder.append(" AND t.statusTimestamp > :lastUpdatedAfter");
countQueryBuilder.append(" AND t.statusTimestamp > :lastUpdatedAfter");
}

// Apply pagination cursor using keyset pagination for composite sort (timestamp DESC, id ASC)
// PageToken format: "timestamp_millis:taskId" (e.g., "1699999999000:task-123")
if (params.pageToken() != null && !params.pageToken().isEmpty()) {
queryBuilder.append(" AND t.id > :pageToken");
String[] tokenParts = params.pageToken().split(":", 2);
if (tokenParts.length == 2) {
// Keyset pagination: get tasks where timestamp < tokenTimestamp OR (timestamp = tokenTimestamp AND id > tokenId)
// All tasks have timestamps (TaskStatus canonical constructor ensures this)
queryBuilder.append(" AND (t.statusTimestamp < :tokenTimestamp OR (t.statusTimestamp = :tokenTimestamp AND t.id > :tokenId))");
} else {
// Legacy ID-only pageToken format is not supported with timestamp-based sorting
// Throw error to prevent incorrect pagination results
throw new io.a2a.spec.InvalidParamsError(null, "Invalid pageToken format: expected 'timestamp:id'", null);
}
}
Comment on lines 238 to 249
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic for parsing and validating the pageToken is duplicated here and in the block for setting query parameters (lines 267-286). This increases the maintenance burden and risk of inconsistencies. Consider refactoring to parse the token once at the beginning of the method, store the timestamp and ID in local variables, and then reuse them for building the query and setting parameters. This would centralize the validation and parsing logic.


// Sort by task ID for consistent pagination
queryBuilder.append(" ORDER BY t.id");
// Sort by status timestamp descending (most recent first), then by ID for stable ordering
queryBuilder.append(" ORDER BY t.statusTimestamp DESC, t.id ASC");

// Create and configure the main query
TypedQuery<JpaTask> query = em.createQuery(queryBuilder.toString(), JpaTask.class);
Expand All @@ -245,8 +261,28 @@ public ListTasksResult list(ListTasksParams params) {
if (params.status() != null) {
query.setParameter("state", params.status().asString());
}
if (params.lastUpdatedAfter() != null) {
query.setParameter("lastUpdatedAfter", params.lastUpdatedAfter());
}
if (params.pageToken() != null && !params.pageToken().isEmpty()) {
query.setParameter("pageToken", params.pageToken());
String[] tokenParts = params.pageToken().split(":", 2);
if (tokenParts.length == 2) {
// Parse keyset pagination parameters
try {
long timestampMillis = Long.parseLong(tokenParts[0]);
String tokenId = tokenParts[1];

// All tasks have timestamps (TaskStatus canonical constructor ensures this)
Instant tokenTimestamp = Instant.ofEpochMilli(timestampMillis);
query.setParameter("tokenTimestamp", tokenTimestamp);
query.setParameter("tokenId", tokenId);
} catch (NumberFormatException e) {
// Malformed timestamp in pageToken
throw new io.a2a.spec.InvalidParamsError(null,
"Invalid pageToken format: timestamp must be numeric milliseconds", null);
}
}
// Note: Legacy ID-only format already rejected in query building phase
}

// Apply page size limit (+1 to check for next page)
Expand All @@ -270,6 +306,9 @@ public ListTasksResult list(ListTasksParams params) {
if (params.status() != null) {
countQuery.setParameter("state", params.status().asString());
}
if (params.lastUpdatedAfter() != null) {
countQuery.setParameter("lastUpdatedAfter", params.lastUpdatedAfter());
}
int totalSize = countQuery.getSingleResult().intValue();

// Deserialize tasks from JSON
Expand All @@ -283,10 +322,14 @@ public ListTasksResult list(ListTasksParams params) {
}
}

// Determine next page token (ID of last task if there are more results)
// Determine next page token (timestamp:ID of last task if there are more results)
// Format: "timestamp_millis:taskId" for keyset pagination
String nextPageToken = null;
if (hasMore && !tasks.isEmpty()) {
nextPageToken = tasks.get(tasks.size() - 1).getId();
Task lastTask = tasks.get(tasks.size() - 1);
// All tasks have timestamps (TaskStatus canonical constructor ensures this)
long timestampMillis = lastTask.getStatus().timestamp().toInstant().toEpochMilli();
nextPageToken = timestampMillis + ":" + lastTask.getId();
}

// Apply post-processing transformations (history limiting, artifact removal)
Expand All @@ -302,9 +345,13 @@ public ListTasksResult list(ListTasksParams params) {
}

private Task transformTask(Task task, int historyLength, boolean includeArtifacts) {
// Limit history if needed (keep most recent N messages)
// Limit history based on historyLength parameter
List<Message> history = task.getHistory();
if (historyLength > 0 && history != null && history.size() > historyLength) {
if (historyLength == 0) {
// historyLength=0 means no history should be included
history = List.of();
} else if (historyLength > 0 && history != null && history.size() > historyLength) {
// Keep most recent N messages
history = history.subList(history.size() - historyLength, history.size());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ public class JpaTask {
@Column(name = "state")
private String state;

@Column(name = "status_timestamp")
private Instant statusTimestamp;

@Column(name = "task_data", columnDefinition = "TEXT", nullable = false)
private String taskJson;

Expand Down Expand Up @@ -67,6 +70,14 @@ public void setState(String state) {
this.state = state;
}

public Instant getStatusTimestamp() {
return statusTimestamp;
}

public void setStatusTimestamp(Instant statusTimestamp) {
this.statusTimestamp = statusTimestamp;
}

public String getTaskJson() {
return taskJson;
}
Expand Down Expand Up @@ -123,7 +134,7 @@ static JpaTask createFromTask(Task task) throws JsonProcessingException {
}

/**
* Updates denormalized fields (contextId, state) from the task object.
* Updates denormalized fields (contextId, state, statusTimestamp) from the task object.
* These fields are duplicated from the JSON to enable efficient querying.
*
* @param task the task to extract fields from
Expand All @@ -133,8 +144,14 @@ private void updateDenormalizedFields(Task task) {
if (task.getStatus() != null) {
io.a2a.spec.TaskState taskState = task.getStatus().state();
this.state = (taskState != null) ? taskState.asString() : null;
// Extract status timestamp for efficient querying and sorting
// Truncate to milliseconds for keyset pagination consistency (pageToken uses millis)
this.statusTimestamp = (task.getStatus().timestamp() != null)
? task.getStatus().timestamp().toInstant().truncatedTo(java.time.temporal.ChronoUnit.MILLIS)
: null;
} else {
this.state = null;
this.statusTimestamp = null;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -418,12 +419,14 @@ public void testListTasksCombinedFilters() {
@Test
@Transactional
public void testListTasksPagination() {
// Create 5 tasks
// Create 5 tasks with same timestamp to ensure ID-based pagination works
// (With timestamp DESC sorting, same timestamps allow ID ASC tie-breaking)
OffsetDateTime sameTimestamp = OffsetDateTime.now(java.time.ZoneOffset.UTC);
for (int i = 1; i <= 5; i++) {
Task task = new Task.Builder()
.id("task-page-" + i)
.contextId("context-pagination")
.status(new TaskStatus(TaskState.SUBMITTED))
.status(new TaskStatus(TaskState.SUBMITTED, null, sameTimestamp))
.build();
taskStore.save(task);
}
Expand Down Expand Up @@ -465,6 +468,122 @@ public void testListTasksPagination() {
assertNull(result3.nextPageToken(), "Last page should have no next page token");
}

@Test
@Transactional
public void testListTasksPaginationWithDifferentTimestamps() {
// Create tasks with different timestamps to verify keyset pagination
// with composite sort (timestamp DESC, id ASC)
OffsetDateTime now = OffsetDateTime.now(java.time.ZoneOffset.UTC);

// Task 1: 10 minutes ago, ID="task-diff-a"
Task task1 = new Task.Builder()
.id("task-diff-a")
.contextId("context-diff-timestamps")
.status(new TaskStatus(TaskState.WORKING, null, now.minusMinutes(10)))
.build();
taskStore.save(task1);

// Task 2: 5 minutes ago, ID="task-diff-b"
Task task2 = new Task.Builder()
.id("task-diff-b")
.contextId("context-diff-timestamps")
.status(new TaskStatus(TaskState.WORKING, null, now.minusMinutes(5)))
.build();
taskStore.save(task2);

// Task 3: 5 minutes ago, ID="task-diff-c" (same timestamp as task2, tests ID tie-breaker)
Task task3 = new Task.Builder()
.id("task-diff-c")
.contextId("context-diff-timestamps")
.status(new TaskStatus(TaskState.WORKING, null, now.minusMinutes(5)))
.build();
taskStore.save(task3);

// Task 4: Now, ID="task-diff-d"
Task task4 = new Task.Builder()
.id("task-diff-d")
.contextId("context-diff-timestamps")
.status(new TaskStatus(TaskState.WORKING, null, now))
.build();
taskStore.save(task4);

// Task 5: 1 minute ago, ID="task-diff-e"
Task task5 = new Task.Builder()
.id("task-diff-e")
.contextId("context-diff-timestamps")
.status(new TaskStatus(TaskState.WORKING, null, now.minusMinutes(1)))
.build();
taskStore.save(task5);

// Expected order (timestamp DESC, id ASC):
// 1. task-diff-d (now)
// 2. task-diff-e (1 min ago)
// 3. task-diff-b (5 min ago, ID 'b')
// 4. task-diff-c (5 min ago, ID 'c')
// 5. task-diff-a (10 min ago)

// Page 1: Get first 2 tasks
ListTasksParams params1 = new ListTasksParams.Builder()
.contextId("context-diff-timestamps")
.pageSize(2)
.build();
ListTasksResult result1 = taskStore.list(params1);

assertEquals(5, result1.totalSize());
assertEquals(2, result1.pageSize());
assertNotNull(result1.nextPageToken(), "Should have next page token");

// Verify first page order
assertEquals("task-diff-d", result1.tasks().get(0).getId(), "First task should be most recent");
assertEquals("task-diff-e", result1.tasks().get(1).getId(), "Second task should be 1 min ago");

// Verify pageToken format: "timestamp_millis:taskId"
assertTrue(result1.nextPageToken().contains(":"), "PageToken should have format timestamp:id");
String[] tokenParts = result1.nextPageToken().split(":", 2);
assertEquals(2, tokenParts.length, "PageToken should have exactly 2 parts");
assertEquals("task-diff-e", tokenParts[1], "PageToken should contain last task ID");

// Page 2: Get next 2 tasks
ListTasksParams params2 = new ListTasksParams.Builder()
.contextId("context-diff-timestamps")
.pageSize(2)
.pageToken(result1.nextPageToken())
.build();
ListTasksResult result2 = taskStore.list(params2);

assertEquals(5, result2.totalSize());
assertEquals(2, result2.pageSize());
assertNotNull(result2.nextPageToken(), "Should have next page token");

// Verify second page order (tasks with same timestamp, sorted by ID)
assertEquals("task-diff-b", result2.tasks().get(0).getId(), "Third task should be 5 min ago, ID 'b'");
assertEquals("task-diff-c", result2.tasks().get(1).getId(), "Fourth task should be 5 min ago, ID 'c'");

// Page 3: Get last task
ListTasksParams params3 = new ListTasksParams.Builder()
.contextId("context-diff-timestamps")
.pageSize(2)
.pageToken(result2.nextPageToken())
.build();
ListTasksResult result3 = taskStore.list(params3);

assertEquals(5, result3.totalSize());
assertEquals(1, result3.pageSize());
assertNull(result3.nextPageToken(), "Last page should have no next page token");

// Verify last task
assertEquals("task-diff-a", result3.tasks().get(0).getId(), "Last task should be oldest");

// Verify no duplicates across all pages
List<String> allTaskIds = new ArrayList<>();
allTaskIds.addAll(result1.tasks().stream().map(Task::getId).toList());
allTaskIds.addAll(result2.tasks().stream().map(Task::getId).toList());
allTaskIds.addAll(result3.tasks().stream().map(Task::getId).toList());

assertEquals(5, allTaskIds.size(), "Should have exactly 5 tasks across all pages");
assertEquals(5, allTaskIds.stream().distinct().count(), "Should have no duplicate tasks");
}

@Test
@Transactional
public void testListTasksHistoryLimiting() {
Expand Down Expand Up @@ -573,34 +692,80 @@ public void testListTasksDefaultPageSize() {
assertNotNull(result.nextPageToken(), "Should have next page");
}

@Test
@Transactional
public void testListTasksInvalidPageTokenFormat() {
// Create a task
Task task = new Task.Builder()
.id("task-invalid-token")
.contextId("context-invalid-token")
.status(new TaskStatus(TaskState.WORKING))
.build();
taskStore.save(task);

// Test 1: Legacy ID-only pageToken should throw InvalidParamsError
ListTasksParams params1 = new ListTasksParams.Builder()
.contextId("context-invalid-token")
.pageToken("task-invalid-token") // ID-only format (legacy)
.build();

try {
taskStore.list(params1);
throw new AssertionError("Expected InvalidParamsError for legacy ID-only pageToken");
} catch (io.a2a.spec.InvalidParamsError e) {
// Expected - legacy format not supported
assertTrue(e.getMessage().contains("Invalid pageToken format"),
"Error message should mention invalid format");
}

// Test 2: Malformed timestamp in pageToken should throw InvalidParamsError
ListTasksParams params2 = new ListTasksParams.Builder()
.contextId("context-invalid-token")
.pageToken("not-a-number:task-id") // Invalid timestamp
.build();

try {
taskStore.list(params2);
throw new AssertionError("Expected InvalidParamsError for malformed timestamp");
} catch (io.a2a.spec.InvalidParamsError e) {
// Expected - malformed timestamp
assertTrue(e.getMessage().contains("timestamp must be numeric"),
"Error message should mention numeric timestamp requirement");
}
}


@Test
@Transactional
public void testListTasksOrderingById() {
// Create tasks with IDs that will sort in specific order
// Create tasks with same timestamp to test ID-based tie-breaking
// (spec requires sorting by timestamp DESC, then ID ASC)
OffsetDateTime sameTimestamp = OffsetDateTime.now(java.time.ZoneOffset.UTC);

Task task1 = new Task.Builder()
.id("task-order-a")
.contextId("context-order")
.status(new TaskStatus(TaskState.SUBMITTED))
.status(new TaskStatus(TaskState.SUBMITTED, null, sameTimestamp))
.build();

Task task2 = new Task.Builder()
.id("task-order-b")
.contextId("context-order")
.status(new TaskStatus(TaskState.SUBMITTED))
.status(new TaskStatus(TaskState.SUBMITTED, null, sameTimestamp))
.build();

Task task3 = new Task.Builder()
.id("task-order-c")
.contextId("context-order")
.status(new TaskStatus(TaskState.SUBMITTED))
.status(new TaskStatus(TaskState.SUBMITTED, null, sameTimestamp))
.build();

// Save in reverse order
taskStore.save(task3);
taskStore.save(task1);
taskStore.save(task2);

// List should return in ID order
// List should return sorted by timestamp DESC (all same), then by ID ASC
ListTasksParams params = new ListTasksParams.Builder()
.contextId("context-order")
.build();
Expand Down
Loading