-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Add ElastiCache (Valkey) message queue template #1309
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
vasigorc
wants to merge
4
commits into
vercel:main
Choose a base branch
from
Bit-Quill:message-queue-elasticache-clean
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from 1 commit
Commits
Show all changes
4 commits
Select commit
Hold shift + click to select a range
368b2aa
feat: Added ElastiCache (Valkey) message queue template
vasigorc 7a63f56
fix: Updated README.md
vasigorc 5526840
chore: Improved response destructiring based on PR feedback
vasigorc df1ffdb
improvement: Replaced customCommand with specific SDK method calls
vasigorc File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,4 @@ | ||
| { | ||
| "root": true, | ||
| "extends": "next/core-web-vitals" | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,42 @@ | ||
| # See https://help.github.com/articles/ignoring-files/ for more about ignoring files. | ||
|
|
||
| # Dependencies | ||
| /node_modules | ||
| /.pnp | ||
| .pnp.js | ||
|
|
||
| # Testing | ||
| /coverage | ||
|
|
||
| # Next.js | ||
| /.next/ | ||
| /out/ | ||
| next-env.d.ts | ||
|
|
||
| # Production | ||
| build | ||
| dist | ||
|
|
||
| # Misc | ||
| .DS_Store | ||
| *.pem | ||
|
|
||
| # Debug | ||
| npm-debug.log* | ||
| yarn-debug.log* | ||
| yarn-error.log* | ||
|
|
||
| # Local ENV files | ||
| .env.local | ||
| .env.development.local | ||
| .env.test.local | ||
| .env.production.local | ||
|
|
||
| # Vercel | ||
| .vercel | ||
|
|
||
| # Turborepo | ||
| .turbo | ||
|
|
||
| # typescript | ||
| *.tsbuildinfo |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,242 @@ | ||
| --- | ||
| name: Message Queue with AWS ElastiCache and Next.js | ||
| slug: message-queue-elasticache | ||
| description: Learn to use AWS ElastiCache (Valkey) with Next.js API Routes for reliable message queue processing using streams. | ||
| framework: Next.js | ||
| deployUrl: https://vercel.com/new/clone?repository-url=https://github.com/vercel/examples/tree/main/solutions/message-queue-elasticache&project-name=message-queue-elasticache&repository-name=message-queue-elasticache&env=VALKEY_ENDPOINT&envDescription=Valkey%20endpoint%20URL | ||
| --- | ||
|
|
||
| # Next.js + AWS ElastiCache Message Queue | ||
|
|
||
| This is an example of a Next.js application using AWS ElastiCache (Valkey) for implementing a reliable message queue with streams. The template demonstrates a contact form processor where messages are queued, consumed, and acknowledged using Valkey's streaming capabilities. | ||
vasigorc marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| ## How to Use | ||
|
|
||
| This template demonstrates the code pattern for implementing message queues with Valkey streams. It's designed to work with AWS ElastiCache in production environments. | ||
|
|
||
| ### Local Development | ||
|
|
||
| Execute [`create-next-app`](https://github.com/vercel/next.js/tree/canary/packages/create-next-app) with [pnpm](https://pnpm.io/installation) to bootstrap the example: | ||
|
|
||
| ```bash | ||
| pnpm create next-app --example https://github.com/vercel/examples/tree/main/solutions/message-queue-elasticache | ||
| ``` | ||
|
|
||
| **Run Valkey locally:** | ||
|
|
||
| Using Docker: | ||
|
|
||
| ```bash | ||
| docker run -d -p 6379:6379 valkey/valkey-bundle:latest | ||
vasigorc marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| ``` | ||
|
|
||
| Or install Valkey directly following the [official installation guide](https://valkey.io/download/). | ||
vasigorc marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| **Configure environment:** | ||
|
|
||
| Create an `.env.local` file: | ||
|
|
||
| ```bash | ||
| VALKEY_ENDPOINT=localhost:6379 | ||
| ``` | ||
|
|
||
| **Start the development server:** | ||
|
|
||
| ```bash | ||
| pnpm dev | ||
| ``` | ||
|
|
||
| Visit <http://localhost:3000> to see the application. | ||
|
|
||
| ### Production Deployment with AWS ElastiCache | ||
|
|
||
| AWS ElastiCache clusters run within a VPC (private network), which requires network connectivity setup for production deployments on Vercel. | ||
|
|
||
| #### Networking Requirements | ||
|
|
||
| **For Vercel Enterprise customers**, connectivity to AWS ElastiCache is available through [Vercel Secure Compute](https://vercel.com/docs/connectivity/secure-compute), which enables private network access between Vercel Functions and AWS VPC resources. | ||
|
|
||
| **High-level setup steps:** | ||
|
|
||
| **AWS Side:** | ||
|
|
||
| 1. Create an ElastiCache for Valkey cluster (version 7.0+) in your AWS VPC | ||
| 2. Configure security groups to allow traffic from Vercel's CIDR block | ||
| 3. Set up VPC peering or AWS PrivateLink based on your architecture | ||
| 4. Note the cluster endpoint URL | ||
|
|
||
| **Vercel Side:** | ||
|
|
||
| 1. Contact Vercel to enable Secure Compute for your Enterprise account | ||
| 2. Coordinate with Vercel to receive your dedicated CIDR block | ||
| 3. Add the ElastiCache endpoint to your project environment variables: | ||
|
|
||
| ```bash | ||
| VALKEY_ENDPOINT=your-cluster.cache.amazonaws.com:6379 | ||
vasigorc marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| ``` | ||
|
|
||
| 4. Deploy your application | ||
|
|
||
| [](https://vercel.com/new/clone?repository-url=https://github.com/vercel/examples/tree/main/solutions/message-queue-elasticache&project-name=message-queue-elasticache&repository-name=message-queue-elasticache&env=VALKEY_ENDPOINT&envDescription=Valkey%20endpoint%20URL) | ||
|
|
||
| For detailed networking configuration, refer to the [Vercel Secure Compute documentation](https://vercel.com/docs/connectivity/secure-compute). | ||
|
|
||
| ## How It Works | ||
|
|
||
| This template demonstrates a reliable serverless message queue workflow: | ||
|
|
||
| 1. **Producer**: A visitor submits a contact form, and the message is immediately written to a Valkey stream | ||
| 2. **Consumer**: A reviewer opens the processing view, which reads the next unhandled message from the consumer group | ||
| 3. **Acknowledgment**: When the reviewer confirms, the app acknowledges the message and removes it from the pending list | ||
|
|
||
| **Key Features:** | ||
|
|
||
| - Single consumer group prevents message duplication | ||
| - Message acknowledgment ensures reliable processing | ||
| - Refreshing the page won't cause duplicate processing | ||
| - Demonstrates how ElastiCache Streams support reliable serverless workflows | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would include the 60 second claim timeout here |
||
|
|
||
| ## API Endpoints | ||
|
|
||
| The application provides a single API route (`/api/messages`) with three HTTP methods demonstrating the message queue pattern: | ||
|
|
||
| ### Message Operations | ||
|
|
||
| - `POST /api/messages` - Add a new message to the queue (contact form submission) | ||
| - `GET /api/messages` - Read the next unprocessed message from the consumer group | ||
| - `DELETE /api/messages?messageId=<id>` - Acknowledge and remove a message from the pending list | ||
|
|
||
| ## Testing | ||
|
|
||
| **Important Notes:** | ||
|
|
||
| - Consumer groups track which messages have been delivered. Once a message is consumed (via GET), it moves to the Pending Entries List (PEL) and won't appear in subsequent GET requests until acknowledged. | ||
| - The `streamMessageId` returned by GET is Valkey's unique stream entry ID and **must be used** for the DELETE operation. | ||
| - Always complete the full flow: POST → GET → DELETE. | ||
|
|
||
| ### Complete Message Flow Example | ||
|
|
||
| **1. Produce Message (Add to Queue)** | ||
|
|
||
| ```bash | ||
| curl -X POST http://localhost:3000/api/messages \ | ||
| -H "Content-Type: application/json" \ | ||
| -d '{ | ||
| "name": "Test User", | ||
| "email": "[email protected]", | ||
| "message": "Hello from local dev!" | ||
| }' | ||
| ``` | ||
|
|
||
| Response: | ||
|
|
||
| ```json | ||
| { | ||
| "streamMessageId": "1764009314892-0", | ||
| "timestamp": "2024-11-24T18:35:14.890Z" | ||
| } | ||
| ``` | ||
|
|
||
| **2. Consume Message (Read from Queue)** | ||
|
|
||
| ```bash | ||
| curl http://localhost:3000/api/messages | ||
| ``` | ||
|
|
||
| Response with message: | ||
|
|
||
| ```json | ||
| { | ||
| "message": { | ||
| "streamMessageId": "1764009314892-0", | ||
| "name": "Test User", | ||
| "email": "[email protected]", | ||
| "message": "Hello from local dev!", | ||
| "timestamp": "2024-11-24T18:35:14.890Z", | ||
| "claimed": true | ||
| } | ||
| } | ||
| ``` | ||
|
|
||
| Response when queue is empty: | ||
|
|
||
| ```json | ||
| { "message": null } | ||
| ``` | ||
|
|
||
| **Note**: The `claimed` field indicates whether this message was recovered from the Pending Entries List (a previously delivered but unacknowledged message). Messages idle for more than 60 seconds are automatically reclaimed. | ||
|
|
||
| **3. Acknowledge Message (Mark as Processed)** | ||
|
|
||
| **Critical**: Use the `streamMessageId` from step 2 for the DELETE operation: | ||
|
|
||
| ```bash | ||
| curl -X DELETE "http://localhost:3000/api/messages?messageId=1764009314892-0" | ||
| ``` | ||
|
|
||
| Response: | ||
|
|
||
| ```json | ||
| { "success": true } | ||
| ``` | ||
|
|
||
| ### User Interface Flow | ||
|
|
||
| This template consists of two UI views that demonstrate the complete message queue workflow: | ||
|
|
||
| #### Submitting Messages (`/`) | ||
|
|
||
| The home page features a contact form where visitors can submit their name, email, and message. Upon submission, the message is immediately added to the Valkey stream and a confirmation is displayed. | ||
|
|
||
|  | ||
|
|
||
| #### Processing Messages (``) | ||
|
|
||
| The processing view allows reviewers to consume and acknowledge messages from the queue. The page loads the messages on page load and displays: | ||
|
|
||
| - Message details (name, email, message content) | ||
| - Timestamp when the message was submitted | ||
| - A warning banner if the message was recovered from the pending entries list | ||
|
|
||
|  | ||
|
|
||
| Clicking **Acknowledge** confirms the message and removes it from the queue. A success | ||
| message appears with a **Next Message** button to load the next message in the queue. | ||
|
|
||
|  | ||
|
|
||
| When the queue is empty, a "No message to process" indicator appears. | ||
|
|
||
|  | ||
|
|
||
| ### Troubleshooting | ||
|
|
||
| **Message Recovery**: If you GET a message but don't DELETE (acknowledge) it, the message stays in the Pending Entries List. After 60 seconds of idle time, subsequent GET requests will **automatically reclaim** that message (indicated by `"claimed": true` in the response). This is a reliability feature that handles consumer failures. | ||
|
|
||
| **For clean testing**, if you want to reset and start fresh: | ||
|
|
||
| ```bash | ||
| # Access your Valkey instance | ||
| docker exec -it <container_id> valkey-cli | ||
|
|
||
| # Delete the entire stream (removes consumer group too) | ||
| DEL contact-messages | ||
|
|
||
| # Or just delete the consumer group | ||
| XGROUP DESTROY contact-messages contact-processors | ||
|
|
||
| # The consumer group will be recreated automatically on next GET | ||
| ``` | ||
|
|
||
| **Checking Pending Messages**: To see what's currently in the Pending Entries List: | ||
|
|
||
| ```bash | ||
| # Access Valkey | ||
| docker exec -it <container_id> valkey-cli | ||
|
|
||
| # View pending messages | ||
| XPENDING contact-messages contact-processors | ||
|
|
||
| # View all messages in the stream | ||
| XRANGE contact-messages - + | ||
| ``` | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.