Skip to content

[core] Add user-facing API for Streaming Lambda functions that receive JSON events #532

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/pull_request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ jobs:
# We pass the list of examples here, but we can't pass an array as argument
# Instead, we pass a String with a valid JSON array.
# The workaround is mentioned here https://github.com/orgs/community/discussions/11692
examples: "[ 'APIGateway', 'APIGateway+LambdaAuthorizer', 'BackgroundTasks', 'HelloJSON', 'HelloWorld', 'ResourcesPackaging', 'S3EventNotifier', 'S3_AWSSDK', 'S3_Soto', 'Streaming', 'Testing', 'Tutorial' ]"
examples: "[ 'APIGateway', 'APIGateway+LambdaAuthorizer', 'BackgroundTasks', 'HelloJSON', 'HelloWorld', 'ResourcesPackaging', 'S3EventNotifier', 'S3_AWSSDK', 'S3_Soto', 'Streaming', 'StreamingFromEvent', 'Testing', 'Tutorial' ]"
archive_plugin_examples: "[ 'HelloWorld', 'ResourcesPackaging' ]"
archive_plugin_enabled: true

Expand Down
4 changes: 3 additions & 1 deletion Examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ This directory contains example code for Lambda functions.

- **[S3_Soto](S3_Soto/README.md)**: a Lambda function that uses [Soto](https://github.com/soto-project/soto) to invoke an [Amazon S3](https://docs.aws.amazon.com/AmazonS3/latest/userguide/Welcome.html) API (requires [AWS SAM](https://aws.amazon.com/serverless/sam/)).

- **[Streaming]**: create a Lambda function exposed as an URL. The Lambda function streams its response over time. (requires [AWS SAM](https://aws.amazon.com/serverless/sam/)).
- **[Streaming](Streaming/README.md)**: create a Lambda function exposed as an URL. The Lambda function streams its response over time. (requires [AWS SAM](https://aws.amazon.com/serverless/sam/)).

- **[StreamingFromEvent](StreamingFromEvent/README.md)**: a Lambda function that combines JSON input decoding with response streaming capabilities, demonstrating the new streaming codable interface (requires [AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html)).

- **[Testing](Testing/README.md)**: a test suite for Lambda functions.

Expand Down
50 changes: 50 additions & 0 deletions Examples/StreamingFromEvent/Package.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// swift-tools-version: 6.0

import PackageDescription

// needed for CI to test the local version of the library
import struct Foundation.URL

let package = Package(
name: "StreamingFromEvent",
platforms: [.macOS(.v15)],
dependencies: [
// during CI, the dependency on local version of swift-aws-lambda-runtime is added dynamically below
.package(url: "https://github.com/swift-server/swift-aws-lambda-runtime.git", branch: "main")
],
targets: [
.executableTarget(
name: "StreamingFromEvent",
dependencies: [
.product(name: "AWSLambdaRuntime", package: "swift-aws-lambda-runtime")
]
)
]
)

if let localDepsPath = Context.environment["LAMBDA_USE_LOCAL_DEPS"],
localDepsPath != "",
let v = try? URL(fileURLWithPath: localDepsPath).resourceValues(forKeys: [.isDirectoryKey]),
v.isDirectory == true
{
// when we use the local runtime as deps, let's remove the dependency added above
let indexToRemove = package.dependencies.firstIndex { dependency in
if case .sourceControl(
name: _,
location: "https://github.com/swift-server/swift-aws-lambda-runtime.git",
requirement: _
) = dependency.kind {
return true
}
return false
}
if let indexToRemove {
package.dependencies.remove(at: indexToRemove)
}

// then we add the dependency on LAMBDA_USE_LOCAL_DEPS' path (typically ../..)
print("[INFO] Compiling against swift-aws-lambda-runtime located at \(localDepsPath)")
package.dependencies += [
.package(name: "swift-aws-lambda-runtime", path: localDepsPath)
]
}
128 changes: 128 additions & 0 deletions Examples/StreamingFromEvent/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
# Streaming Codable Lambda function

This example demonstrates how to use the new `StreamingLambdaHandlerWithEvent` protocol to create Lambda functions that:

1. **Receive JSON input**: Automatically decode JSON events into Swift structs
2. **Stream responses**: Send data incrementally as it becomes available
3. **Execute background work**: Perform additional processing after the response is sent

The example uses the new streaming codable interface that combines the benefits of:
- Type-safe JSON input decoding (like regular `LambdaHandler`)
- Response streaming capabilities (like `StreamingLambdaHandler`)
- Background work execution after response completion

Streaming responses incurs a cost. For more information, see [AWS Lambda Pricing](https://aws.amazon.com/lambda/pricing/).

You can stream responses through [Lambda function URLs](https://docs.aws.amazon.com/lambda/latest/dg/urls-configuration.html), the AWS SDK, or using the Lambda [InvokeWithResponseStream](https://docs.aws.amazon.com/lambda/latest/dg/API_InvokeWithResponseStream.html) API.

## Code

The sample code creates a `StreamingFromEventHandler` struct that conforms to the `StreamingLambdaHandlerWithEvent` protocol provided by the Swift AWS Lambda Runtime.

The `handle(...)` method of this protocol receives incoming events as a decoded Swift struct (`StreamingRequest`) and returns the output through a `LambdaResponseStreamWriter`.

The Lambda function expects a JSON payload with the following structure:

```json
{
"count": 5,
"message": "Hello from streaming Lambda!",
"delayMs": 1000
}
```

Where:
- `count`: Number of messages to stream (1-100)
- `message`: The message content to repeat
- `delayMs`: Optional delay between messages in milliseconds (defaults to 500ms)

The response is streamed through the `LambdaResponseStreamWriter`, which is passed as an argument in the `handle` function. The code calls the `write(_:)` function of the `LambdaResponseStreamWriter` with partial data repeatedly written before finally closing the response stream by calling `finish()`. Developers can also choose to return the entire output and not stream the response by calling `writeAndFinish(_:)`.

An error is thrown if `finish()` is called multiple times or if it is called after having called `writeAndFinish(_:)`.

The `handle(...)` method is marked as `mutating` to allow handlers to be implemented with a `struct`.

Once the struct is created and the `handle(...)` method is defined, the sample code creates a `LambdaRuntime` struct and initializes it with the handler just created. Then, the code calls `run()` to start the interaction with the AWS Lambda control plane.

Key features demonstrated:
- **JSON Input Decoding**: The function automatically parses the JSON input into a `StreamingRequest` struct
- **Input Validation**: Validates the count parameter and returns an error message if invalid
- **Progressive Streaming**: Sends messages one by one with configurable delays
- **Timestamped Output**: Each message includes an ISO8601 timestamp
- **Background Processing**: Performs cleanup and logging after the response is complete
- **Error Handling**: Gracefully handles invalid input with descriptive error messages

## Build & Package

To build & archive the package, type the following commands.

```bash
swift package archive --allow-network-connections docker
```

If there is no error, there is a ZIP file ready to deploy.
The ZIP file is located at `.build/plugins/AWSLambdaPackager/outputs/AWSLambdaPackager/StreamingFromEvent/StreamingFromEvent.zip`

## Test locally

You can test the function locally before deploying:

```bash
swift run &

# In another terminal, test with curl:
curl -v \
--header "Content-Type: application/json" \
--data '{"count": 3, "message": "Hello World!", "delayMs": 1000}' \
http://127.0.0.1:7000/invoke
```

## Deploy with the AWS CLI

Here is how to deploy using the `aws` command line.

### Step 1: Create the function

```bash
# Replace with your AWS Account ID
AWS_ACCOUNT_ID=012345678901
aws lambda create-function \
--function-name StreamingFromEvent \
--zip-file fileb://.build/plugins/AWSLambdaPackager/outputs/AWSLambdaPackager/StreamingFromEvent/StreamingFromEvent.zip \
--runtime provided.al2 \
--handler provided \
--architectures arm64 \
--role arn:aws:iam::${AWS_ACCOUNT_ID}:role/lambda_basic_execution
```

The `--architectures` flag is only required when you build the binary on an Apple Silicon machine (Apple M1 or more recent). It defaults to `x64`.

Be sure to set `AWS_ACCOUNT_ID` with your actual AWS account ID (for example: 012345678901).

### Invoke your Lambda function

To invoke the Lambda function, use the AWS CLI:

```bash
aws lambda invoke \
--function-name StreamingFromEvent \
--payload $(echo '{"count": 5, "message": "Streaming from AWS!", "delayMs": 500}' | base64) \
response.txt && cat response.txt
```

This should output the following result, with configurable delays between each message:

```
[2024-07-15T05:00:00Z] Message 1/3: Hello World!
[2024-07-15T05:00:01Z] Message 2/3: Hello World!
[2024-07-15T05:00:02Z] Message 3/3: Hello World!
✅ Successfully sent 3 messages
```

### Undeploy

When done testing, you can delete the Lambda function with this command.

```bash
aws lambda delete-function --function-name StreamingFromEvent
```
71 changes: 71 additions & 0 deletions Examples/StreamingFromEvent/Sources/main.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftAWSLambdaRuntime open source project
//
// Copyright (c) 2024 Apple Inc. and the SwiftAWSLambdaRuntime project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import AWSLambdaRuntime
import NIOCore

#if canImport(FoundationEssentials)
import FoundationEssentials
#else
import Foundation
#endif

// Define your input event structure
struct StreamingRequest: Decodable {
let count: Int
let message: String
let delayMs: Int?

// Provide default values for optional fields
var delay: Int {
delayMs ?? 500
}
}

// Use the new streaming handler with JSON decoding
let runtime = LambdaRuntime { (event: StreamingRequest, responseWriter, context: LambdaContext) in
context.logger.info("Received request to send \(event.count) messages: '\(event.message)'")

// Validate input
guard event.count > 0 && event.count <= 100 else {
let errorMessage = "Count must be between 1 and 100, got: \(event.count)"
context.logger.error("\(errorMessage)")
try await responseWriter.writeAndFinish(ByteBuffer(string: "Error: \(errorMessage)\n"))
return
}

// Stream the messages
for i in 1...event.count {
let response = "[\(Date().ISO8601Format())] Message \(i)/\(event.count): \(event.message)\n"
try await responseWriter.write(ByteBuffer(string: response))

// Optional delay between messages
if event.delay > 0 {
try await Task.sleep(for: .milliseconds(event.delay))
}
}

// Send completion message and finish the stream
let completionMessage = "✅ Successfully sent \(event.count) messages\n"
try await responseWriter.writeAndFinish(ByteBuffer(string: completionMessage))

// Optional: Do background work here after response is sent
context.logger.info("Background work: cleaning up resources and logging metrics")

// Simulate some background processing
try await Task.sleep(for: .milliseconds(100))
context.logger.info("Background work completed")
}

try await runtime.run()
5 changes: 5 additions & 0 deletions Examples/StreamingFromEvent/events/sample-request.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"count": 5,
"message": "Hello from streaming Lambda!",
"delayMs": 1000
}
25 changes: 25 additions & 0 deletions Examples/StreamingFromEvent/template.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: SAM Template for Streaming Example

Resources:
# Lambda function
StreamingNumbers:
Type: AWS::Serverless::Function
Properties:
CodeUri: .build/plugins/AWSLambdaPackager/outputs/AWSLambdaPackager/StreamingNumbers/StreamingNumbers.zip
Timeout: 15
Handler: swift.bootstrap # ignored by the Swift runtime
Runtime: provided.al2
MemorySize: 128
Architectures:
- arm64
FunctionUrlConfig:
AuthType: AWS_IAM
InvokeMode: RESPONSE_STREAM

Outputs:
# print Lambda function URL
LambdaURL:
Description: Lambda URL
Value: !GetAtt StreamingNumbersUrl.FunctionUrl
Loading