diff --git a/docs/modules/components/pages/processors/aws_bedrock_chat.adoc b/docs/modules/components/pages/processors/aws_bedrock_chat.adoc new file mode 100644 index 0000000000..0db35ced20 --- /dev/null +++ b/docs/modules/components/pages/processors/aws_bedrock_chat.adoc @@ -0,0 +1,249 @@ += aws_bedrock_chat +:type: processor +:status: experimental +:categories: ["AI"] + + + +//// + THIS FILE IS AUTOGENERATED! + + To make changes, edit the corresponding source file under: + + https://github.com/redpanda-data/connect/tree/main/internal/impl/. + + And: + + https://github.com/redpanda-data/connect/tree/main/cmd/tools/docs_gen/templates/plugin.adoc.tmpl +//// + +// © 2024 Redpanda Data Inc. + + +component_type_dropdown::[] + + +Generates responses to messages in a chat conversation, using the AWS Bedrock API. + +Introduced in version 4.34.0. + + +[tabs] +====== +Common:: ++ +-- + +```yml +# Common config fields, showing default values +label: "" +aws_bedrock_chat: + model: amazon.titan-text-express-v1 # No default (required) + prompt: "" # No default (optional) + system_prompt: "" # No default (optional) + max_tokens: 0 # No default (optional) + stop: 0 # No default (optional) +``` + +-- +Advanced:: ++ +-- + +```yml +# All config fields, showing default values +label: "" +aws_bedrock_chat: + region: "" + endpoint: "" + credentials: + profile: "" + id: "" + secret: "" + token: "" + from_ec2_role: false + role: "" + role_external_id: "" + model: amazon.titan-text-express-v1 # No default (required) + prompt: "" # No default (optional) + system_prompt: "" # No default (optional) + max_tokens: 0 # No default (optional) + stop: 0 # No default (optional) + temperature: [] # No default (optional) + top_p: 0 # No default (optional) +``` + +-- +====== + +This processor sends prompts to your chosen large language model (LLM) and generates text from the responses, using the AWS Bedrock API. +For more information, see the https://docs.aws.amazon.com/bedrock/latest/userguide[AWS Bedrock documentation^]. + +== Fields + +=== `region` + +The AWS region to target. + + +*Type*: `string` + +*Default*: `""` + +=== `endpoint` + +Allows you to specify a custom endpoint for the AWS API. + + +*Type*: `string` + +*Default*: `""` + +=== `credentials` + +Optional manual configuration of AWS credentials to use. More information can be found in xref:guides:cloud/aws.adoc[]. + + +*Type*: `object` + + +=== `credentials.profile` + +A profile from `~/.aws/credentials` to use. + + +*Type*: `string` + +*Default*: `""` + +=== `credentials.id` + +The ID of credentials to use. + + +*Type*: `string` + +*Default*: `""` + +=== `credentials.secret` + +The secret for the credentials being used. +[CAUTION] +==== +This field contains sensitive information that usually shouldn't be added to a config directly, read our xref:configuration:secrets.adoc[secrets page for more info]. +==== + + + +*Type*: `string` + +*Default*: `""` + +=== `credentials.token` + +The token for the credentials being used, required when using short term credentials. + + +*Type*: `string` + +*Default*: `""` + +=== `credentials.from_ec2_role` + +Use the credentials of a host EC2 machine configured to assume https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_use_switch-role-ec2.html[an IAM role associated with the instance^]. + + +*Type*: `bool` + +*Default*: `false` +Requires version 4.2.0 or newer + +=== `credentials.role` + +A role ARN to assume. + + +*Type*: `string` + +*Default*: `""` + +=== `credentials.role_external_id` + +An external ID to provide when assuming a role. + + +*Type*: `string` + +*Default*: `""` + +=== `model` + +The model ID to use. For a full list see the https://docs.aws.amazon.com/bedrock/latest/userguide/model-ids.html[AWS Bedrock documentation^]. + + +*Type*: `string` + + +```yml +# Examples + +model: amazon.titan-text-express-v1 + +model: anthropic.claude-3-5-sonnet-20240620-v1:0 + +model: cohere.command-text-v14 + +model: meta.llama3-1-70b-instruct-v1:0 + +model: mistral.mistral-large-2402-v1:0 +``` + +=== `prompt` + +The prompt you want to generate a response for. By default, the processor submits the entire payload as a string. + + +*Type*: `string` + + +=== `system_prompt` + +The system prompt to submit to the AWS Bedrock LLM. + + +*Type*: `string` + + +=== `max_tokens` + +The maximum number of tokens to allow in the generated response. + + +*Type*: `int` + + +=== `stop` + +The likelihood of the model selecting higher-probability options while generating a response. A lower value makes the model omre likely to choose higher-probability options, while a higher value makes the model more likely to choose lower-probability options. + + +*Type*: `float` + + +=== `temperature` + +A list of stop sequences. A stop sequence is a sequence of characters that causes the model to stop generating the response. + + +*Type*: `array` + + +=== `top_p` + +The percentage of most-likely candidates that the model considers for the next token. For example, if you choose a value of 0.8, the model selects from the top 80% of the probability distribution of tokens that could be next in the sequence. + + +*Type*: `float` + + + diff --git a/go.mod b/go.mod index fa1611fcb8..90d264f2b7 100644 --- a/go.mod +++ b/go.mod @@ -26,11 +26,12 @@ require ( github.com/PaesslerAG/jsonpath v0.1.1 github.com/apache/pulsar-client-go v0.13.1 github.com/aws/aws-lambda-go v1.47.0 - github.com/aws/aws-sdk-go-v2 v1.30.3 + github.com/aws/aws-sdk-go-v2 v1.30.4 github.com/aws/aws-sdk-go-v2/config v1.27.27 github.com/aws/aws-sdk-go-v2/credentials v1.17.27 github.com/aws/aws-sdk-go-v2/feature/dynamodb/expression v1.7.32 github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.10 + github.com/aws/aws-sdk-go-v2/service/bedrockruntime v1.15.1 github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.40.3 github.com/aws/aws-sdk-go-v2/service/dynamodb v1.34.4 github.com/aws/aws-sdk-go-v2/service/firehose v1.32.0 @@ -178,11 +179,11 @@ require ( github.com/apapsch/go-jsonmerge/v2 v2.0.0 // indirect github.com/ardielle/ardielle-go v1.5.2 // indirect github.com/armon/go-metrics v0.3.4 // indirect - github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.3 // indirect + github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.4 // indirect github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.14.10 // indirect github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.11 // indirect - github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.15 // indirect - github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.15 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.16 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.16 // indirect github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 // indirect github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.15 // indirect github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.22.3 // indirect @@ -193,7 +194,7 @@ require ( github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.15 // indirect github.com/aws/aws-sdk-go-v2/service/sso v1.22.4 // indirect github.com/aws/aws-sdk-go-v2/service/ssooidc v1.26.4 // indirect - github.com/aws/smithy-go v1.20.3 // indirect + github.com/aws/smithy-go v1.20.4 // indirect github.com/aymerick/douceur v0.2.0 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/bits-and-blooms/bitset v1.4.0 // indirect diff --git a/go.sum b/go.sum index edc79670fb..59cc34130c 100644 --- a/go.sum +++ b/go.sum @@ -190,10 +190,10 @@ github.com/aws/aws-lambda-go v1.47.0/go.mod h1:dpMpZgvWx5vuQJfBt0zqBha60q7Dd7Rfg github.com/aws/aws-sdk-go v1.30.19/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0= github.com/aws/aws-sdk-go v1.32.6/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0= github.com/aws/aws-sdk-go-v2 v1.7.1/go.mod h1:L5LuPC1ZgDr2xQS7AmIec/Jlc7O/Y1u2KxJyNVab250= -github.com/aws/aws-sdk-go-v2 v1.30.3 h1:jUeBtG0Ih+ZIFH0F4UkmL9w3cSpaMv9tYYDbzILP8dY= -github.com/aws/aws-sdk-go-v2 v1.30.3/go.mod h1:nIQjQVp5sfpQcTc9mPSr1B0PaWK5ByX9MOoDadSN4lc= -github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.3 h1:tW1/Rkad38LA15X4UQtjXZXNKsCgkshC3EbmcUmghTg= -github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.3/go.mod h1:UbnqO+zjqk3uIt9yCACHJ9IVNhyhOCnYk8yA19SAWrM= +github.com/aws/aws-sdk-go-v2 v1.30.4 h1:frhcagrVNrzmT95RJImMHgabt99vkXGslubDaDagTk8= +github.com/aws/aws-sdk-go-v2 v1.30.4/go.mod h1:CT+ZPWXbYrci8chcARI3OmI/qgd+f6WtuLOoaIA8PR0= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.4 h1:70PVAiL15/aBMh5LThwgXdSQorVr91L127ttckI9QQU= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.4/go.mod h1:/MQxMqci8tlqDH+pjmoLu1i0tbWCUP1hhyMRuFxpQCw= github.com/aws/aws-sdk-go-v2/config v1.5.0/go.mod h1:RWlPOAW3E3tbtNAqTwvSW54Of/yP3oiZXMI0xfUdjyA= github.com/aws/aws-sdk-go-v2/config v1.27.27 h1:HdqgGt1OAP0HkEDDShEl0oSYa9ZZBSOmKpdpsDMdO90= github.com/aws/aws-sdk-go-v2/config v1.27.27/go.mod h1:MVYamCg76dFNINkZFu4n4RjDixhVr51HLj4ErWzrVwg= @@ -210,15 +210,17 @@ github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.11/go.mod h1:SeSUYBLsMYFoRvH github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.3.2/go.mod h1:qaqQiHSrOUVOfKe6fhgQ6UzhxjwqVW8aHNegd6Ws4w4= github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.10 h1:zeN9UtUlA6FTx0vFSayxSX32HDw73Yb6Hh2izDSFxXY= github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.10/go.mod h1:3HKuexPDcwLWPaqpW2UR/9n8N/u/3CKcGAzSs8p8u8g= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.15 h1:SoNJ4RlFEQEbtDcCEt+QG56MY4fm4W8rYirAmq+/DdU= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.15/go.mod h1:U9ke74k1n2bf+RIgoX1SXFed1HLs51OgUSs+Ph0KJP8= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.15 h1:C6WHdGnTDIYETAm5iErQUiVNsclNx9qbJVPIt03B6bI= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.15/go.mod h1:ZQLZqhcu+JhSrA9/NXRm8SkDvsycE+JkV3WGY41e+IM= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.16 h1:TNyt/+X43KJ9IJJMjKfa3bNTiZbUP7DeCxfbTROESwY= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.16/go.mod h1:2DwJF39FlNAUiX5pAc0UNeiz16lK2t7IaFcm0LFHEgc= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.16 h1:jYfy8UPmd+6kJW5YhY0L1/KftReOGxI/4NtVSTh9O/I= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.16/go.mod h1:7ZfEPZxkW42Afq4uQB8H2E2e6ebh6mXTueEpYzjCzcs= github.com/aws/aws-sdk-go-v2/internal/ini v1.1.1/go.mod h1:Zy8smImhTdOETZqfyn01iNOe0CNggVbPjCajyaz6Gvg= github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 h1:hT8rVHwugYE2lEfdFE0QWVo81lF7jMrYJVDWI+f+VxU= github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0/go.mod h1:8tu/lYfQfFe6IGnaOdrpVgEL2IrrDOf6/m9RQum4NkY= github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.15 h1:Z5r7SycxmSllHYmaAZPpmN8GviDrSGhMS6bldqtXZPw= github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.15/go.mod h1:CetW7bDE00QoGEmPUoZuRog07SGVAUVW6LFpNP0YfIg= +github.com/aws/aws-sdk-go-v2/service/bedrockruntime v1.15.1 h1:Xb5d44UWp+oHJMu6Aza2RG0iSDcOCc2L5fTh2wq80OE= +github.com/aws/aws-sdk-go-v2/service/bedrockruntime v1.15.1/go.mod h1:uI45a6i3xUAkx/xFegQ1SNnClz9OrfOixs96ZH4rca8= github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.40.3 h1:VminN0bFfPQkaJ2MZOJh0d7+sVu0SKdZnO9FfyE1C18= github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.40.3/go.mod h1:SxcxnimuI5pVps173h7VcyuFadgOFFfl2aUXUCswoY0= github.com/aws/aws-sdk-go-v2/service/dynamodb v1.34.4 h1:utG3S4T+X7nONPIpRoi1tVcQdAdJxntiVS2yolPJyXc= @@ -260,8 +262,8 @@ github.com/aws/aws-sdk-go-v2/service/sts v1.6.0/go.mod h1:q7o0j7d7HrJk/vr9uUt3BV github.com/aws/aws-sdk-go-v2/service/sts v1.30.3 h1:ZsDKRLXGWHk8WdtyYMoGNO7bTudrvuKpDKgMVRlepGE= github.com/aws/aws-sdk-go-v2/service/sts v1.30.3/go.mod h1:zwySh8fpFyXp9yOr/KVzxOl8SRqgf/IDw5aUt9UKFcQ= github.com/aws/smithy-go v1.6.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E= -github.com/aws/smithy-go v1.20.3 h1:ryHwveWzPV5BIof6fyDvor6V3iUL7nTfiTKXHiW05nE= -github.com/aws/smithy-go v1.20.3/go.mod h1:krry+ya/rV9RDcV/Q16kpu6ypI4K2czasz0NC3qS14E= +github.com/aws/smithy-go v1.20.4 h1:2HK1zBdPgRbjFOHlfeQZfpC4r72MOb9bZkiFwggKO+4= +github.com/aws/smithy-go v1.20.4/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg= github.com/aymerick/douceur v0.2.0 h1:Mv+mAeH1Q+n9Fr+oyamOlAkUNPWPlA8PPGR0QAaYuPk= github.com/aymerick/douceur v0.2.0/go.mod h1:wlT5vV2O3h55X9m7iVYN0TBM0NH/MmbLnd30/FjWUq4= github.com/beanstalkd/go-beanstalk v0.2.0 h1:6UOJugnu47uNB2jJO/lxyDgeD1Yds7owYi1USELqexA= diff --git a/internal/impl/aws/processor_bedrock.go b/internal/impl/aws/processor_bedrock.go new file mode 100644 index 0000000000..516d3fe636 --- /dev/null +++ b/internal/impl/aws/processor_bedrock.go @@ -0,0 +1,221 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Licensed as a Redpanda Enterprise file under the Redpanda Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/redpanda-data/connect/blob/main/licenses/rcl.md + +package aws + +import ( + "context" + "errors" + "fmt" + "unicode/utf8" + + "github.com/aws/aws-sdk-go-v2/service/bedrockruntime" + bedrocktypes "github.com/aws/aws-sdk-go-v2/service/bedrockruntime/types" + "github.com/redpanda-data/benthos/v4/public/service" + "github.com/redpanda-data/connect/v4/internal/impl/aws/config" +) + +const ( + bedpFieldModel = "model" + bedpFieldUserPrompt = "prompt" + bedpFieldSystemPrompt = "system_prompt" + bedpFieldMaxTokens = "max_tokens" + bedpFieldTemp = "stop" + bedpFieldStop = "temperature" + bedpFieldTopP = "top_p" +) + +func init() { + err := service.RegisterProcessor("aws_bedrock_chat", newBedrockConfigSpec(), newBedrockProcessor) + if err != nil { + panic(err) + } +} + +func newBedrockConfigSpec() *service.ConfigSpec { + return service.NewConfigSpec(). + Summary("Generates responses to messages in a chat conversation, using the AWS Bedrock API."). + Description(`This processor sends prompts to your chosen large language model (LLM) and generates text from the responses, using the AWS Bedrock API. +For more information, see the https://docs.aws.amazon.com/bedrock/latest/userguide[AWS Bedrock documentation^].`). + Categories("AI"). + Version("4.34.0"). + Fields(config.SessionFields()...). + Field(service.NewStringField(bedpFieldModel). + Examples("amazon.titan-text-express-v1", "anthropic.claude-3-5-sonnet-20240620-v1:0", "cohere.command-text-v14", "meta.llama3-1-70b-instruct-v1:0", "mistral.mistral-large-2402-v1:0"). + Description("The model ID to use. For a full list see the https://docs.aws.amazon.com/bedrock/latest/userguide/model-ids.html[AWS Bedrock documentation^].")). + Field(service.NewStringField(bedpFieldUserPrompt). + Description("The prompt you want to generate a response for. By default, the processor submits the entire payload as a string."). + Optional()). + Field(service.NewStringField(bedpFieldSystemPrompt). + Optional(). + Description("The system prompt to submit to the AWS Bedrock LLM.")). + Field(service.NewIntField(bedpFieldMaxTokens). + Optional(). + Description("The maximum number of tokens to allow in the generated response."). + LintRule(`root = this < 1 { ["field must be greater than or equal to 1"] }`)). + Field(service.NewFloatField(bedpFieldTemp). + Optional(). + Description("The likelihood of the model selecting higher-probability options while generating a response. A lower value makes the model omre likely to choose higher-probability options, while a higher value makes the model more likely to choose lower-probability options."). + LintRule(`root = if this < 0 || this > 1 { ["field must be between 0.0-1.0"] }`)). + Field(service.NewStringListField(bedpFieldStop). + Optional(). + Advanced(). + Description("A list of stop sequences. A stop sequence is a sequence of characters that causes the model to stop generating the response.")). + Field(service.NewFloatField(bedpFieldTopP). + Optional(). + Advanced(). + Description("The percentage of most-likely candidates that the model considers for the next token. For example, if you choose a value of 0.8, the model selects from the top 80% of the probability distribution of tokens that could be next in the sequence. "). + LintRule(`root = if this < 0 || this > 1 { ["field must be between 0.0-1.0"] }`)) +} + +func newBedrockProcessor(conf *service.ParsedConfig, mgr *service.Resources) (service.Processor, error) { + aconf, err := GetSession(context.Background(), conf) + if err != nil { + return nil, err + } + client := bedrockruntime.NewFromConfig(aconf) + model, err := conf.FieldString(bedpFieldModel) + if err != nil { + return nil, err + } + p := &bedrockProcessor{ + client: client, + model: model, + } + if conf.Contains(bedpFieldUserPrompt) { + pf, err := conf.FieldInterpolatedString(bedpFieldUserPrompt) + if err != nil { + return nil, err + } + p.userPrompt = pf + } + if conf.Contains(bedpFieldSystemPrompt) { + pf, err := conf.FieldInterpolatedString(bedpFieldSystemPrompt) + if err != nil { + return nil, err + } + p.systemPrompt = pf + } + if conf.Contains(bedpFieldMaxTokens) { + v, err := conf.FieldInt(bedpFieldMaxTokens) + if err != nil { + return nil, err + } + mt := int32(v) + p.maxTokens = &mt + } + if conf.Contains(bedpFieldTemp) { + v, err := conf.FieldFloat(bedpFieldTemp) + if err != nil { + return nil, err + } + t := float32(v) + p.temp = &t + } + if conf.Contains(bedpFieldStop) { + stop, err := conf.FieldStringList(bedpFieldStop) + if err != nil { + return nil, err + } + p.stop = stop + } + if conf.Contains(bedpFieldTopP) { + v, err := conf.FieldFloat(bedpFieldTopP) + if err != nil { + return nil, err + } + tp := float32(v) + p.topP = &tp + } + return p, nil +} + +type bedrockProcessor struct { + client *bedrockruntime.Client + model string + + userPrompt *service.InterpolatedString + systemPrompt *service.InterpolatedString + maxTokens *int32 + stop []string + temp *float32 + topP *float32 +} + +func (b *bedrockProcessor) Process(ctx context.Context, msg *service.Message) (service.MessageBatch, error) { + prompt, err := b.computePrompt(msg) + if err != nil { + return nil, err + } + input := &bedrockruntime.ConverseInput{ + Messages: []bedrocktypes.Message{ + { + Role: bedrocktypes.ConversationRoleUser, + Content: []bedrocktypes.ContentBlock{ + &bedrocktypes.ContentBlockMemberText{ + Value: prompt, + }, + }, + }, + }, + ModelId: &b.model, + InferenceConfig: &bedrocktypes.InferenceConfiguration{ + MaxTokens: b.maxTokens, + StopSequences: b.stop, + Temperature: b.temp, + TopP: b.topP, + }, + } + if b.systemPrompt != nil { + prompt, err := b.systemPrompt.TryString(msg) + if err != nil { + return nil, fmt.Errorf("unable to interpolate `%s`: %w", bedpFieldSystemPrompt, err) + } + input.System = []bedrocktypes.SystemContentBlock{ + &bedrocktypes.SystemContentBlockMemberText{Value: prompt}, + } + } + resp, err := b.client.Converse(ctx, input) + if err != nil { + return nil, err + } + respOut, ok := resp.Output.(*bedrocktypes.ConverseOutputMemberMessage) + if !ok { + return nil, fmt.Errorf("unexpected output: %T", resp) + } + content := respOut.Value.Content + if len(content) != 1 { + return nil, fmt.Errorf("unexpected number of response content: %d", len(content)) + } + out := msg.Copy() + switch c := content[0].(type) { + case *bedrocktypes.ContentBlockMemberText: + out.SetStructured(c.Value) + default: + return nil, fmt.Errorf("unsupported response content type: %T", content[0]) + } + return service.MessageBatch{out}, nil +} + +func (b *bedrockProcessor) computePrompt(msg *service.Message) (string, error) { + if b.userPrompt != nil { + return b.userPrompt.TryString(msg) + } + buf, err := msg.AsBytes() + if err != nil { + return "", err + } + if !utf8.Valid(buf) { + return "", errors.New("message payload contained invalid UTF8") + } + return string(buf), nil +} + +func (b *bedrockProcessor) Close(ctx context.Context) error { + return nil +}