-
Notifications
You must be signed in to change notification settings - Fork 199
feat: Adding Bedrock AgentCore Memory Store for long term memory #611
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
feat: Adding Bedrock AgentCore Memory Store for long term memory #611
Conversation
This PR adds the `AgentCoreMemorySaver` to be used as a LangGraph checkpointer. It implements the LangGraph [BaseCheckpointSaver](https://langchain-ai.github.io/langgraph/reference/checkpoints/#langgraph.checkpoint.base.BaseCheckpointSaver). Before reviewing, I highly recommend reading over the LangGraph Checkpointer documentation here for persistence: https://langchain-ai.github.io/langgraph/concepts/persistence/#checkpoints. Based on #610 from @akshseh, without `branch` usage and with refactored serialization and helper classes. ## How it works When initializing the `AgentCoreMemorySaver`, you must specify a `memory_id` for a preconfigured memory resource in AgentCore memory. Then when running the agent, your `thread_id` (already set in LangGraph) and `actor_id` (set by the user to associate in AgentCore memory) must be set in the runtime config for invoking the agent. This is so checkpoints are saved to unique `actor`/`session` combinations and can be retrieved efficiently. ### Saving checkpoints In LangGraph, two types of checkpoints are saved. `put` is for whole checkpoints which contain `checkpoint_id` and channel values at that point in time. `put_writes` are intermediate writes associated with a previous `checkpoint_id` and channel value. Each new checkpoint and intermediate write is serialized and saved to an event in AgentCore memory in order. ### Loading checkpoints When a new thread is invoked, it will call `get_tuple` on the checkpointer to retrieve the latest checkpoint. To do this, AgentCore memory events are listed and pending writes are associated with the latest full checkpoint, then returned to the agent to resume execution. Agents may also request a specific checkpoint ID, or call `get_state` or `get_execution_history`. To do this, AgentCore memory events are listed and processed to construct a full history of the `actor_id`/`thread_id` combination. There is room for caching here which has not yet been implemented. ## New Files - `saver.py` - the main `AgentCoreMemorySaver` class that implements `BaseCheckpointSaver` - `helpers.py` - contains three main helper classes `EventSerializer`, `EventProcessor`, and `CheckpointEventClient`. These helpers bridge the gap between serializing, storing, and loading between LangGraph checkpoints and AgentCore Memory event operations - `models.py` - contains the main pydantic objects for categorizing checkpoint writes, channel value writes, and pending writes - `constants.py` - Contains a few new exceptions for error handling and a constant for empty channel values `_empty` ## Checkpointer Demo A demo notebook has been included that shows how to use the checkpointer and how it works. Please run through this as it will tie everything together. ## What's not included yet - Client side caching. There is room in the implementation to implement caching based on most recent checkpoint and pending writes associated with it. For distributed agentcore deployments, this may need to be properly designed, as it would require the assumption that `actor_id`/`checkpoint_id` combinations are handled by the same agentcore runtime instance, otherwise, the data may be outdated. TBD - Storing messages as plain text for long term memory processing - this feature is more analogous to LangGraph BaseStore, for which there is an implementation here: #611. However, it would be cool to identify on pending writes that a channel value was for `messages` and then store the plaintext for embedding and long term memory retrieval later, perhaps through an initialization parameter like `process_messages_for_long_term` or something like that. Will require more discussion. Thanks for reviewing! --------- Co-authored-by: Jack Gordley <[email protected]> Co-authored-by: Piyush Jain <[email protected]>
Ran integration tests locally ➜ langgraph-checkpoint-aws git:(agentcore-memory-store-implementation) ✗ poetry run pytest tests/integration_tests/agentcore/test_store.py tests/integration_tests/agentcore/test_store.py ................ [100%] ========================================= slowest 5 durations ========================================== |
LangGraph memory stores are intended to be used hand in hand with checkpointers to process and create long term memories from conversations. This fits well into the Bedrock AgentCore Memory framework where chat events are saved and processed async into long term memories.
https://langchain-ai.github.io/langgraph/concepts/persistence/#memory-store
So for this implementation, the
BedrockAgentCoreMemoryStore
was created as an alternative toInMemoryStore
. It overrides theput
,search
,get
, anddelete
methods to interact with Bedrock AgentCore Memory instead of storing the messages in a list and directly performing the embedding.There are some nuances with Bedrock AgentCore namespaces and the difference between short term events and long term memory records. In LangGraph, long term memories are stored directly and managed through CRUD operations (with the help of an embedding model if the user chooses). For this implementation, I played around with Bedrock AgentCore events being saved with the
put
operation, then thesearch
operation will query long term memories or simply list long term memories if no query is provided.A sample notebook walks through usage of this implementation and how it fits into a LangGraph agent.