-
Notifications
You must be signed in to change notification settings - Fork 98
refactor: Separate task state building from persistence #508
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Summary of ChangesHello @jmesnil, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces a significant refactoring to the task state management system. The primary goal is to optimize performance and improve reliability by decoupling the in-memory building of task state from its persistence to the database. By centralizing in-memory state management in a new Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
This refactoring introduces TaskStateProcessor as a global singleton to manage task state lifecycle and addresses a critical architectural issue in task state management. Problem: - Task state was persisted multiple times during event propagation, causing redundant database writes and tight coupling between event processing and persistence Solution: - Created TaskStateProcessor as @ApplicationScoped singleton to maintain in-flight tasks globally - Separated state building (TaskStateProcessor) from persistence (TaskStore) - Tasks are now persisted once at appropriate lifecycle points instead of for each event - Tasks are explicitly removed from TaskStateProcessor after final persistence, ensuring bounded memory usage Changes: - New: TaskStateProcessor - global singleton managing task state in memory - Modified: DefaultRequestHandler - injects TaskStateProcessor and ensures cleanup after task completion - Modified: ResultAggregator - uses TaskStateProcessor for state building, removes tasks after background consumption - Modified: TaskManager - delegates state building to TaskStateProcessor - Updated: All test files to inject TaskStateProcessor instances - Fixed: Test isolation issues with defensive cleanup Impact: - Performance: Significantly reduced I/O operations (one write vs multiple) - Memory: Bounded usage that scales with concurrent tasks, not total tasks - Reliability: Improved test isolation and clearer task lifecycle See doc/adr/0001_task_state_management_refactoring.md for detailed architectural decision record.
Simplify the code execution that was using attributes whose value were always the same. Signed-off-by: Jeff Mesnil <[email protected]>
Signed-off-by: Jeff Mesnil <[email protected]>
078d868 to
bae253a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a significant and beneficial refactoring by separating task state building from persistence through the new TaskStateProcessor. This change effectively reduces I/O operations and improves the task lifecycle management. The implementation is solid, and the accompanying Architectural Decision Record (ADR) is very helpful. I've provided a couple of suggestions: one to improve clarity in the ADR and another to simplify and fix a potentially buggy logic in DefaultRequestHandler. Overall, this is a great improvement to the architecture.
server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java
Show resolved
Hide resolved
|
/gemini review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a significant and valuable refactoring of task state management by introducing TaskStateProcessor. This change effectively separates in-memory state building from persistence, which should reduce I/O operations and improve performance as described in the ADR. The introduction of TaskStateProcessor and the delegation of state management from TaskManager and ResultAggregator are well-executed.
My review includes two main points:
- An architectural inconsistency in
onMessageSendStreamwhere state is still persisted on every event, which seems to contradict the main goal of this refactoring. - A suggestion to improve the robustness of
ResultAggregatorby enforcing its dependency onTaskStateProcessorvia a null check in the constructor.
Overall, this is a great improvement to the architecture. Addressing these points will enhance consistency and robustness.
| // For streaming: persist task state after each event before propagating | ||
| // This ensures state is saved BEFORE the event is sent to the client | ||
| Task currentTaskState = mss.taskManager.getTask(); | ||
| if (currentTaskState != null) { | ||
| mss.taskManager.saveTask(currentTaskState); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change introduces task state persistence on every event for streaming responses. While the comment explains the rationale is to save state before sending an event to the client, this seems to contradict the primary goal of this refactoring, which is to reduce I/O and persist the task state only once at the end of its lifecycle.
The PR description states: "Tasks are now persisted once at appropriate lifecycle points instead of for each event". This change for streaming calls deviates from that principle.
If this per-event persistence is indeed necessary for streaming to handle client disconnects and allow for resubscription, this trade-off should be explicitly documented in the Architectural Decision Record (ADR) to clarify why streaming responses are treated differently from blocking/non-blocking calls. The current ADR gives the impression that all task processing now follows the "persist once" model.
| public ResultAggregator(TaskManager taskManager, Message message, Executor executor, TaskStateProcessor stateProcessor) { | ||
| this.taskManager = taskManager; | ||
| this.message = message; | ||
| this.executor = executor; | ||
| this.stateProcessor = stateProcessor; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The TaskStateProcessor seems to be a required dependency for the ResultAggregator. However, the constructor allows it to be null, which forces a null check later in the code (lines 221-224).
To improve robustness and clarify the contract of this class, I suggest making stateProcessor a non-null dependency. You can enforce this by adding a null check in the constructor, similar to how it's done in TaskManager. This would also make the class more consistent with others that have required dependencies.
This change would also require updating ResultAggregatorTest to provide a mock or real TaskStateProcessor instance instead of null.
public ResultAggregator(TaskManager taskManager, Message message, Executor executor, TaskStateProcessor stateProcessor) {
io.a2a.util.Assert.checkNotNullParam("taskManager", taskManager);
io.a2a.util.Assert.checkNotNullParam("executor", executor);
io.a2a.util.Assert.checkNotNullParam("stateProcessor", stateProcessor);
this.taskManager = taskManager;
this.message = message;
this.executor = executor;
this.stateProcessor = stateProcessor;
}…tor is never null Signed-off-by: Jeff Mesnil <[email protected]>
720d1cb to
b8215d2
Compare
|
superseded by #515 |
This refactoring introduces TaskStateProcessor as a global singleton to
manage task state lifecycle and addresses a critical architectural issue
in task state management.
Problem:
causing redundant database writes and tight coupling between event
processing and persistence
Solution:
in-flight tasks globally
for each event
persistence, ensuring bounded memory usage
Changes:
cleanup after task completion
removes tasks after background consumption
Impact:
See doc/adr/0001_task_state_management_refactoring.md for detailed
architectural decision record.
CONTRIBUTINGGuide.