Skip to content

Conversation

@jgordley
Copy link
Contributor

@jgordley jgordley commented Sep 15, 2025

This PR adds the AgentCoreMemorySaver to be used as a LangGraph checkpointer. It implements the LangGraph BaseCheckpointSaver. Before reviewing, I highly recommend reading over the LangGraph Checkpointer documentation here for persistence: https://langchain-ai.github.io/langgraph/concepts/persistence/#checkpoints.

Based on #610 from @akshseh, without branch usage and with refactored serialization and helper classes.

How it works

When initializing the AgentCoreMemorySaver, you must specify a memory_id for a preconfigured memory resource in AgentCore memory. Then when running the agent, your thread_id (already set in LangGraph) and actor_id (set by the user to associate in AgentCore memory) must be set in the runtime config for invoking the agent. This is so checkpoints are saved to unique actor/session combinations and can be retrieved efficiently.

Saving checkpoints

In LangGraph, two types of checkpoints are saved. put is for whole checkpoints which contain checkpoint_id and channel values at that point in time. put_writes are intermediate writes associated with a previous checkpoint_id and channel value.

Each new checkpoint and intermediate write is serialized and saved to an event in AgentCore memory in order.

Loading checkpoints

When a new thread is invoked, it will call get_tuple on the checkpointer to retrieve the latest checkpoint. To do this, AgentCore memory events are listed and pending writes are associated with the latest full checkpoint, then returned to the agent to resume execution.

Agents may also request a specific checkpoint ID, or call get_state or get_execution_history. To do this, AgentCore memory events are listed and processed to construct a full history of the actor_id/thread_id combination. There is room for caching here which has not yet been implemented.

New Files

  • saver.py - the main AgentCoreMemorySaver class that implements BaseCheckpointSaver
  • helpers.py - contains three main helper classes EventSerializer, EventProcessor, and CheckpointEventClient. These helpers bridge the gap between serializing, storing, and loading between LangGraph checkpoints and AgentCore Memory event operations
  • models.py - contains the main pydantic objects for categorizing checkpoint writes, channel value writes, and pending writes
  • constants.py - Contains a few new exceptions for error handling and a constant for empty channel values _empty

Checkpointer Demo

A demo notebook has been included that shows how to use the checkpointer and how it works. Please run through this as it will tie everything together.

What's not included yet

  • Client side caching. There is room in the implementation to implement caching based on most recent checkpoint and pending writes associated with it. For distributed agentcore deployments, this may need to be properly designed, as it would require the assumption that actor_id/checkpoint_id combinations are handled by the same agentcore runtime instance, otherwise, the data may be outdated. TBD
  • Storing messages as plain text for long term memory processing - this feature is more analogous to LangGraph BaseStore, for which there is an implementation here: feat: Adding Bedrock AgentCore Memory Store for long term memory #611. However, it would be cool to identify on pending writes that a channel value was for messages and then store the plaintext for embedding and long term memory retrieval later, perhaps through an initialization parameter like process_messages_for_long_term or something like that. Will require more discussion.

Thanks for reviewing!

payload=[{"blob": serialized}],
)

def store_events_batch(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens if any of the events are too large? (~10mb should be upper bound for a CreateEvent request)
and what happens if there are more than 100 events here? (we should make sure this integration has some way of handling both of those cases instead of just failing)

if not next_token or len(all_events) >= max_results:
break

return all_events[:max_results]

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think we don't need to do this array slice since we're stopping when we have max_results, right? (the performance difference would be super negligible so only a nit)

"memoryId": self.memory_id,
"actorId": actor_id,
"sessionId": session_id,
"maxResults": min(100, max_results - len(all_events)),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

im wondering if this is the right behavior. Is there any reason to not make a single call to ListEvents and just use the max_results from server side? (per the AWS API standards, max_results is a target and not a guarantee, so I think by moving this pagination to the client, we're sort of hacking around that API standard, which I think only makes sense if we have ListAllEvents, in which case you'd have to paginate till no NextToken)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. I'll remove this logic and let the server handle the listing max events and then break if we surpass it in the results.

checkpoint_event, writes, channel_data, checkpoint_config
)

count += 1

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we do anything with this count?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it's part of the limit logic in this function, i.e. someone calls list(..., limit=3) it will break and stop yielding after 3 loops.

Copy link
Collaborator

@3coins 3coins left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jgordley
Thanks for submitting these changes. Left some suggestions on reorganizing the modules to simplify the setup, and updates on the sample notebook.

I will do some testing on checking the event size with a long conversation, and report back. As discussed, one option to explore would be to save the checkpoint event and the values separately.

"Once you have the Memory enabled and in a `ACTIVE` state, take note of the `memoryId`, we will need it later."
]
},
{
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need another cell before imports, that installs LangChain %pip install langchain langchain-aws.

@jgordley
Copy link
Contributor Author

Added unit tests and integration tests and ran them locally:

➜  langgraph-checkpoint-aws git:(agentcore-memory-checkpointer) ✗ make integration_test TEST_FILE=tests/integration_tests/agentcore/

poetry run pytest tests/integration_tests/
========================================== test session starts ===========================================
platform darwin -- Python 3.11.10, pytest-8.4.1, pluggy-1.6.0
rootdir: /Users/jgordle/projects/jgordle-forks/langchain-aws/libs/langgraph-checkpoint-aws
configfile: pyproject.toml
plugins: anyio-4.10.0, cov-6.2.1, langsmith-0.4.13
collected 10 items

tests/integration_tests/agentcore/test_saver.py ......                                             [ 60%]
tests/integration_tests/saver/test_saver.py ...                                                    [ 90%]
tests/integration_tests/test_compile.py .                                                          [100%]

========================================== slowest 5 durations ===========================================
25.86s call     tests/integration_tests/agentcore/test_saver.py::TestAgentCoreMemorySaver::test_checkpoint_listing_with_limit
8.89s call     tests/integration_tests/agentcore/test_saver.py::TestAgentCoreMemorySaver::test_multiple_sessions_isolation
8.60s call     tests/integration_tests/agentcore/test_saver.py::TestAgentCoreMemorySaver::test_math_agent_with_checkpointing
8.11s call     tests/integration_tests/saver/test_saver.py::TestBedrockMemorySaver::test_weather_query_and_checkpointing
5.98s call     tests/integration_tests/agentcore/test_saver.py::TestAgentCoreMemorySaver::test_weather_query_with_checkpointing
===================================== 10 passed in 61.18s (0:01:01) ====================================

logger.warning(f"Failed to decode event: {e}")

next_token = response.get("nextToken")
if not next_token or (limit and len(all_events) >= limit):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does anything bad happen if limit is set to 0?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call, I'll switch the check to != 0, in the rare event that someone does submit with limit=0

Copy link
Collaborator

@3coins 3coins left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jgordley
Left some minor comments on the included samples. Looks good otherwise.

Copy link
Collaborator

@3coins 3coins left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jgordley
LGTM!

@3coins 3coins merged commit 5941622 into langchain-ai:main Sep 24, 2025
12 checks passed
murilosimao pushed a commit to murilosimao/langchain-aws that referenced this pull request Oct 29, 2025
This PR adds the `AgentCoreMemorySaver` to be used as a LangGraph
checkpointer. It implements the LangGraph
[BaseCheckpointSaver](https://langchain-ai.github.io/langgraph/reference/checkpoints/#langgraph.checkpoint.base.BaseCheckpointSaver).
Before reviewing, I highly recommend reading over the LangGraph
Checkpointer documentation here for persistence:
https://langchain-ai.github.io/langgraph/concepts/persistence/#checkpoints.

Based on langchain-ai#610 from
@akshseh, without `branch` usage and with refactored serialization and
helper classes.

## How it works

When initializing the `AgentCoreMemorySaver`, you must specify a
`memory_id` for a preconfigured memory resource in AgentCore memory.
Then when running the agent, your `thread_id` (already set in LangGraph)
and `actor_id` (set by the user to associate in AgentCore memory) must
be set in the runtime config for invoking the agent. This is so
checkpoints are saved to unique `actor`/`session` combinations and can
be retrieved efficiently.

### Saving checkpoints
In LangGraph, two types of checkpoints are saved. `put` is for whole
checkpoints which contain `checkpoint_id` and channel values at that
point in time. `put_writes` are intermediate writes associated with a
previous `checkpoint_id` and channel value.

Each new checkpoint and intermediate write is serialized and saved to an
event in AgentCore memory in order.

### Loading checkpoints
When a new thread is invoked, it will call `get_tuple` on the
checkpointer to retrieve the latest checkpoint. To do this, AgentCore
memory events are listed and pending writes are associated with the
latest full checkpoint, then returned to the agent to resume execution.

Agents may also request a specific checkpoint ID, or call `get_state` or
`get_execution_history`. To do this, AgentCore memory events are listed
and processed to construct a full history of the `actor_id`/`thread_id`
combination. There is room for caching here which has not yet been
implemented.

## New Files
- `saver.py` - the main `AgentCoreMemorySaver` class that implements
`BaseCheckpointSaver`
- `helpers.py` - contains three main helper classes `EventSerializer`,
`EventProcessor`, and `CheckpointEventClient`. These helpers bridge the
gap between serializing, storing, and loading between LangGraph
checkpoints and AgentCore Memory event operations
- `models.py` - contains the main pydantic objects for categorizing
checkpoint writes, channel value writes, and pending writes
- `constants.py` - Contains a few new exceptions for error handling and
a constant for empty channel values `_empty`

## Checkpointer Demo
A demo notebook has been included that shows how to use the checkpointer
and how it works. Please run through this as it will tie everything
together.

## What's not included yet
- Client side caching. There is room in the implementation to implement
caching based on most recent checkpoint and pending writes associated
with it. For distributed agentcore deployments, this may need to be
properly designed, as it would require the assumption that
`actor_id`/`checkpoint_id` combinations are handled by the same
agentcore runtime instance, otherwise, the data may be outdated. TBD
- Storing messages as plain text for long term memory processing - this
feature is more analogous to LangGraph BaseStore, for which there is an
implementation here:
langchain-ai#611. However, it
would be cool to identify on pending writes that a channel value was for
`messages` and then store the plaintext for embedding and long term
memory retrieval later, perhaps through an initialization parameter like
`process_messages_for_long_term` or something like that. Will require
more discussion.

Thanks for reviewing!

---------

Co-authored-by: Jack Gordley <[email protected]>
Co-authored-by: Piyush Jain <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants