diff --git a/docs/components/AWS.mdx b/docs/components/AWS.mdx index 2204b13964..503dcb3cdf 100644 --- a/docs/components/AWS.mdx +++ b/docs/components/AWS.mdx @@ -18,6 +18,7 @@ import { CardGrid, LinkCard } from "@astrojs/starlight/components"; ## Actions + @@ -94,6 +95,38 @@ Each alarm event includes: } ``` + + +## CloudWatch • Put Metric Data + +The Put Metric Data component publishes one or more custom metric data points to AWS CloudWatch. + +### Use Cases + +- **Application telemetry**: Publish request counts, latency, and error rates +- **Business KPIs**: Send domain metrics (orders, signups, revenue) to dashboards +- **Operational visibility**: Feed custom service health metrics into alarms and autoscaling + +### Configuration + +- **Region**: AWS region where the metric data will be published +- **Namespace**: CloudWatch namespace for your custom metrics (for example: `MyService/Production`) +- **Metric Data**: List of metric points to publish, each with: + - **Metric Name** and **Value** (required) + - Optional **Unit**, **Timestamp**, **Storage Resolution**, and **Dimensions** + +### Example Output + +```json +{ + "requestId": "b6a5cd4f-2a52-4f43-8b68-61f70a5cbb43", + "region": "us-east-1", + "namespace": "MyService/Production", + "metricCount": 2, + "metricNames": ["RequestCount", "LatencyMs"] +} +``` + ## CodeArtifact • On Package Version diff --git a/pkg/integrations/aws/aws.go b/pkg/integrations/aws/aws.go index 135915f9a5..536995c202 100644 --- a/pkg/integrations/aws/aws.go +++ b/pkg/integrations/aws/aws.go @@ -141,6 +141,7 @@ func (a *AWS) Components() []core.Component { &ecr.GetImage{}, &ecr.GetImageScanFindings{}, &ecr.ScanImage{}, + &cloudwatch.PutMetricData{}, &lambda.RunFunction{}, } } diff --git a/pkg/integrations/aws/cloudwatch/client.go b/pkg/integrations/aws/cloudwatch/client.go new file mode 100644 index 0000000000..33aea0a769 --- /dev/null +++ b/pkg/integrations/aws/cloudwatch/client.go @@ -0,0 +1,214 @@ +package cloudwatch + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "encoding/xml" + "fmt" + "io" + "net/http" + "net/url" + "strconv" + "strings" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + v4 "github.com/aws/aws-sdk-go-v2/aws/signer/v4" + "github.com/superplanehq/superplane/pkg/core" + "github.com/superplanehq/superplane/pkg/integrations/aws/common" +) + +const ( + cloudWatchServiceName = "monitoring" + cloudWatchAPIVersion = "2010-08-01" + cloudWatchContentType = "application/x-www-form-urlencoded; charset=utf-8" +) + +type Client struct { + http core.HTTPContext + region string + credentials *aws.Credentials + signer *v4.Signer +} + +type PutMetricDataResponse struct { + RequestID string `xml:"ResponseMetadata>RequestId"` +} + +type MetricDatum struct { + MetricName string + Value float64 + Unit string + Timestamp *time.Time + StorageResolution *int + Dimensions []Dimension +} + +type Dimension struct { + Name string + Value string +} + +func NewClient(httpCtx core.HTTPContext, credentials *aws.Credentials, region string) *Client { + return &Client{ + http: httpCtx, + region: region, + credentials: credentials, + signer: v4.NewSigner(), + } +} + +func (c *Client) PutMetricData(namespace string, metricData []MetricDatum) (*PutMetricDataResponse, error) { + namespace = strings.TrimSpace(namespace) + if namespace == "" { + return nil, fmt.Errorf("namespace is required") + } + + if len(metricData) == 0 { + return nil, fmt.Errorf("at least one metric datum is required") + } + + values := url.Values{} + values.Set("Action", "PutMetricData") + values.Set("Version", cloudWatchAPIVersion) + values.Set("Namespace", namespace) + + for i, metric := range metricData { + metricIndex := i + 1 + metricName := strings.TrimSpace(metric.MetricName) + if metricName == "" { + return nil, fmt.Errorf("metricData[%d].metricName is required", i) + } + + values.Set(fmt.Sprintf("MetricData.member.%d.MetricName", metricIndex), metricName) + values.Set(fmt.Sprintf("MetricData.member.%d.Value", metricIndex), strconv.FormatFloat(metric.Value, 'f', -1, 64)) + + unit := strings.TrimSpace(metric.Unit) + if unit != "" { + values.Set(fmt.Sprintf("MetricData.member.%d.Unit", metricIndex), unit) + } + + if metric.Timestamp != nil { + values.Set( + fmt.Sprintf("MetricData.member.%d.Timestamp", metricIndex), + metric.Timestamp.UTC().Format(time.RFC3339), + ) + } + + if metric.StorageResolution != nil { + values.Set( + fmt.Sprintf("MetricData.member.%d.StorageResolution", metricIndex), + strconv.Itoa(*metric.StorageResolution), + ) + } + + for j, dimension := range metric.Dimensions { + dimensionIndex := j + 1 + name := strings.TrimSpace(dimension.Name) + value := strings.TrimSpace(dimension.Value) + if name == "" { + return nil, fmt.Errorf("metricData[%d].dimensions[%d].name is required", i, j) + } + if value == "" { + return nil, fmt.Errorf("metricData[%d].dimensions[%d].value is required", i, j) + } + + values.Set(fmt.Sprintf("MetricData.member.%d.Dimensions.member.%d.Name", metricIndex, dimensionIndex), name) + values.Set(fmt.Sprintf("MetricData.member.%d.Dimensions.member.%d.Value", metricIndex, dimensionIndex), value) + } + } + + response := PutMetricDataResponse{} + if err := c.postForm(values, &response); err != nil { + return nil, err + } + + return &response, nil +} + +func (c *Client) postForm(values url.Values, out any) error { + body := values.Encode() + endpoint := fmt.Sprintf("https://monitoring.%s.amazonaws.com/", c.region) + req, err := http.NewRequest(http.MethodPost, endpoint, strings.NewReader(body)) + if err != nil { + return fmt.Errorf("failed to build request: %w", err) + } + + req.Header.Set("Content-Type", cloudWatchContentType) + req.Header.Set("Accept", "application/xml") + + if err := c.signRequest(req, []byte(body)); err != nil { + return err + } + + res, err := c.http.Do(req) + if err != nil { + return fmt.Errorf("request failed: %w", err) + } + defer res.Body.Close() + + responseBody, err := io.ReadAll(res.Body) + if err != nil { + return fmt.Errorf("failed to read response: %w", err) + } + + if res.StatusCode < 200 || res.StatusCode >= 300 { + if awsErr := parseError(responseBody); awsErr != nil { + return awsErr + } + + return fmt.Errorf("CloudWatch API request failed with %d: %s", res.StatusCode, string(responseBody)) + } + + if out == nil { + return nil + } + + if err := xml.Unmarshal(responseBody, out); err != nil { + return fmt.Errorf("failed to decode response: %w", err) + } + + return nil +} + +func (c *Client) signRequest(req *http.Request, payload []byte) error { + hash := sha256.Sum256(payload) + payloadHash := hex.EncodeToString(hash[:]) + return c.signer.SignHTTP(context.Background(), *c.credentials, req, payloadHash, cloudWatchServiceName, c.region, time.Now()) +} + +func parseError(body []byte) *common.Error { + var payload struct { + Error struct { + Code string `xml:"Code"` + Message string `xml:"Message"` + } `xml:"Error"` + Errors struct { + Error struct { + Code string `xml:"Code"` + Message string `xml:"Message"` + } `xml:"Error"` + } `xml:"Errors"` + } + + if err := xml.Unmarshal(body, &payload); err != nil { + return nil + } + + code := strings.TrimSpace(payload.Error.Code) + message := strings.TrimSpace(payload.Error.Message) + if code == "" && message == "" { + code = strings.TrimSpace(payload.Errors.Error.Code) + message = strings.TrimSpace(payload.Errors.Error.Message) + } + + if code == "" && message == "" { + return nil + } + + return &common.Error{ + Code: code, + Message: message, + } +} diff --git a/pkg/integrations/aws/cloudwatch/example.go b/pkg/integrations/aws/cloudwatch/example.go index 5e5ad28fa0..fa8873f0c3 100644 --- a/pkg/integrations/aws/cloudwatch/example.go +++ b/pkg/integrations/aws/cloudwatch/example.go @@ -10,9 +10,23 @@ import ( //go:embed example_data_on_alarm.json var exampleDataOnAlarmBytes []byte +//go:embed example_output_put_metric_data.json +var exampleOutputPutMetricDataBytes []byte + var exampleDataOnAlarmOnce sync.Once var exampleDataOnAlarm map[string]any +var exampleOutputPutMetricDataOnce sync.Once +var exampleOutputPutMetricData map[string]any + func (t *OnAlarm) ExampleData() map[string]any { return utils.UnmarshalEmbeddedJSON(&exampleDataOnAlarmOnce, exampleDataOnAlarmBytes, &exampleDataOnAlarm) } + +func (c *PutMetricData) ExampleOutput() map[string]any { + return utils.UnmarshalEmbeddedJSON( + &exampleOutputPutMetricDataOnce, + exampleOutputPutMetricDataBytes, + &exampleOutputPutMetricData, + ) +} diff --git a/pkg/integrations/aws/cloudwatch/example_output_put_metric_data.json b/pkg/integrations/aws/cloudwatch/example_output_put_metric_data.json new file mode 100644 index 0000000000..d6fc8f4e80 --- /dev/null +++ b/pkg/integrations/aws/cloudwatch/example_output_put_metric_data.json @@ -0,0 +1,7 @@ +{ + "requestId": "b6a5cd4f-2a52-4f43-8b68-61f70a5cbb43", + "region": "us-east-1", + "namespace": "MyService/Production", + "metricCount": 2, + "metricNames": ["RequestCount", "LatencyMs"] +} diff --git a/pkg/integrations/aws/cloudwatch/put_metric_data.go b/pkg/integrations/aws/cloudwatch/put_metric_data.go new file mode 100644 index 0000000000..b9f86d10a2 --- /dev/null +++ b/pkg/integrations/aws/cloudwatch/put_metric_data.go @@ -0,0 +1,413 @@ +package cloudwatch + +import ( + "fmt" + "net/http" + "strconv" + "strings" + "time" + + "github.com/google/uuid" + "github.com/mitchellh/mapstructure" + "github.com/superplanehq/superplane/pkg/configuration" + "github.com/superplanehq/superplane/pkg/core" + "github.com/superplanehq/superplane/pkg/integrations/aws/common" +) + +const ( + storageResolutionStandard = "60" + storageResolutionHighResolution = "1" + maxMetricDataPerRequest = 1000 + maxDimensionsPerMetric = 30 +) + +type PutMetricData struct{} + +type PutMetricDataConfiguration struct { + Region string `json:"region" mapstructure:"region"` + Namespace string `json:"namespace" mapstructure:"namespace"` + MetricData []PutMetricDatumConfigurationInput `json:"metricData" mapstructure:"metricData"` +} + +type PutMetricDatumConfigurationInput struct { + MetricName string `json:"metricName" mapstructure:"metricName"` + Value *float64 `json:"value" mapstructure:"value"` + Unit string `json:"unit" mapstructure:"unit"` + Timestamp string `json:"timestamp" mapstructure:"timestamp"` + StorageResolution string `json:"storageResolution" mapstructure:"storageResolution"` + Dimensions []PutMetricDimensionConfiguration `json:"dimensions" mapstructure:"dimensions"` +} + +type PutMetricDimensionConfiguration struct { + Name string `json:"name" mapstructure:"name"` + Value string `json:"value" mapstructure:"value"` +} + +type PutMetricDataOutput struct { + RequestID string `json:"requestId"` + Region string `json:"region"` + Namespace string `json:"namespace"` + MetricCount int `json:"metricCount"` + MetricNames []string `json:"metricNames"` +} + +func (c *PutMetricData) Name() string { + return "aws.cloudwatch.putMetricData" +} + +func (c *PutMetricData) Label() string { + return "CloudWatch • Put Metric Data" +} + +func (c *PutMetricData) Description() string { + return "Push custom metrics to AWS CloudWatch" +} + +func (c *PutMetricData) Documentation() string { + return `The Put Metric Data component publishes one or more custom metric data points to AWS CloudWatch. + +## Use Cases + +- **Application telemetry**: Publish request counts, latency, and error rates +- **Business KPIs**: Send domain metrics (orders, signups, revenue) to dashboards +- **Operational visibility**: Feed custom service health metrics into alarms and autoscaling + +## Configuration + +- **Region**: AWS region where the metric data will be published +- **Namespace**: CloudWatch namespace for your custom metrics (for example: `MyService/Production`) +- **Metric Data**: List of metric points to publish, each with: + - **Metric Name** and **Value** (required) + - Optional **Unit**, **Timestamp**, **Storage Resolution**, and **Dimensions**` +} + +func (c *PutMetricData) Icon() string { + return "aws" +} + +func (c *PutMetricData) Color() string { + return "gray" +} + +func (c *PutMetricData) OutputChannels(configuration any) []core.OutputChannel { + return []core.OutputChannel{core.DefaultOutputChannel} +} + +func (c *PutMetricData) Configuration() []configuration.Field { + return []configuration.Field{ + { + Name: "region", + Label: "Region", + Type: configuration.FieldTypeSelect, + Required: true, + Default: "us-east-1", + TypeOptions: &configuration.TypeOptions{ + Select: &configuration.SelectTypeOptions{ + Options: common.AllRegions, + }, + }, + }, + { + Name: "namespace", + Label: "Namespace", + Type: configuration.FieldTypeString, + Required: true, + Placeholder: "MyService/Production", + Description: "CloudWatch namespace to publish metrics to", + }, + { + Name: "metricData", + Label: "Metric Data", + Type: configuration.FieldTypeList, + Required: true, + Description: "Metric data points to publish (up to 1000 per request)", + TypeOptions: &configuration.TypeOptions{ + List: &configuration.ListTypeOptions{ + ItemLabel: "Metric Data Point", + ItemDefinition: &configuration.ListItemDefinition{ + Type: configuration.FieldTypeObject, + Schema: []configuration.Field{ + { + Name: "metricName", + Label: "Metric Name", + Type: configuration.FieldTypeString, + Required: true, + Description: "Metric name", + }, + { + Name: "value", + Label: "Value", + Type: configuration.FieldTypeNumber, + Required: true, + Description: "Metric value", + }, + { + Name: "unit", + Label: "Unit", + Type: configuration.FieldTypeString, + Required: false, + Placeholder: "Count", + Description: "Optional CloudWatch unit (for example: Count, Percent, Milliseconds)", + }, + { + Name: "timestamp", + Label: "Timestamp", + Type: configuration.FieldTypeDateTime, + Required: false, + Description: "Optional timestamp for the data point", + }, + { + Name: "storageResolution", + Label: "Storage Resolution", + Type: configuration.FieldTypeSelect, + Required: false, + Description: "60 for standard metrics, 1 for high-resolution metrics", + TypeOptions: &configuration.TypeOptions{ + Select: &configuration.SelectTypeOptions{ + Options: []configuration.FieldOption{ + {Label: "Standard (60 seconds)", Value: storageResolutionStandard}, + {Label: "High Resolution (1 second)", Value: storageResolutionHighResolution}, + }, + }, + }, + }, + { + Name: "dimensions", + Label: "Dimensions", + Type: configuration.FieldTypeList, + Required: false, + Description: "Optional dimensions as name/value pairs", + TypeOptions: &configuration.TypeOptions{ + List: &configuration.ListTypeOptions{ + ItemLabel: "Dimension", + ItemDefinition: &configuration.ListItemDefinition{ + Type: configuration.FieldTypeObject, + Schema: []configuration.Field{ + { + Name: "name", + Label: "Name", + Type: configuration.FieldTypeString, + Required: true, + }, + { + Name: "value", + Label: "Value", + Type: configuration.FieldTypeString, + Required: true, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + } +} + +func (c *PutMetricData) Setup(ctx core.SetupContext) error { + config := PutMetricDataConfiguration{} + if err := mapstructure.Decode(ctx.Configuration, &config); err != nil { + return fmt.Errorf("failed to decode configuration: %w", err) + } + + return validatePutMetricDataConfiguration(config) +} + +func (c *PutMetricData) ProcessQueueItem(ctx core.ProcessQueueContext) (*uuid.UUID, error) { + return ctx.DefaultProcessing() +} + +func (c *PutMetricData) Execute(ctx core.ExecutionContext) error { + config := PutMetricDataConfiguration{} + if err := mapstructure.Decode(ctx.Configuration, &config); err != nil { + return fmt.Errorf("failed to decode configuration: %w", err) + } + + if err := validatePutMetricDataConfiguration(config); err != nil { + return err + } + + credentials, err := common.CredentialsFromInstallation(ctx.Integration) + if err != nil { + return fmt.Errorf("failed to get AWS credentials: %w", err) + } + + metricData, metricNames, err := buildMetricData(config.MetricData) + if err != nil { + return err + } + + client := NewClient(ctx.HTTP, credentials, strings.TrimSpace(config.Region)) + response, err := client.PutMetricData(strings.TrimSpace(config.Namespace), metricData) + if err != nil { + return fmt.Errorf("failed to put metric data: %w", err) + } + + return ctx.ExecutionState.Emit(core.DefaultOutputChannel.Name, "aws.cloudwatch.metricData", []any{ + PutMetricDataOutput{ + RequestID: response.RequestID, + Region: strings.TrimSpace(config.Region), + Namespace: strings.TrimSpace(config.Namespace), + MetricCount: len(metricData), + MetricNames: metricNames, + }, + }) +} + +func (c *PutMetricData) Actions() []core.Action { + return []core.Action{} +} + +func (c *PutMetricData) HandleAction(ctx core.ActionContext) error { + return nil +} + +func (c *PutMetricData) HandleWebhook(ctx core.WebhookRequestContext) (int, error) { + return http.StatusOK, nil +} + +func (c *PutMetricData) Cancel(ctx core.ExecutionContext) error { + return nil +} + +func (c *PutMetricData) Cleanup(ctx core.SetupContext) error { + return nil +} + +func validatePutMetricDataConfiguration(config PutMetricDataConfiguration) error { + region := strings.TrimSpace(config.Region) + if region == "" { + return fmt.Errorf("region is required") + } + + namespace := strings.TrimSpace(config.Namespace) + if namespace == "" { + return fmt.Errorf("namespace is required") + } + + if len(config.MetricData) == 0 { + return fmt.Errorf("metric data is required") + } + + if len(config.MetricData) > maxMetricDataPerRequest { + return fmt.Errorf("metric data exceeds limit of %d entries", maxMetricDataPerRequest) + } + + for i, metric := range config.MetricData { + if strings.TrimSpace(metric.MetricName) == "" { + return fmt.Errorf("metricData[%d].metricName is required", i) + } + + if metric.Value == nil { + return fmt.Errorf("metricData[%d].value is required", i) + } + + if strings.TrimSpace(metric.Timestamp) != "" { + if _, err := parseMetricTimestamp(metric.Timestamp); err != nil { + return fmt.Errorf("metricData[%d].timestamp must be a valid datetime", i) + } + } + + storageResolution := strings.TrimSpace(metric.StorageResolution) + if storageResolution != "" && + storageResolution != storageResolutionStandard && + storageResolution != storageResolutionHighResolution { + return fmt.Errorf( + "metricData[%d].storageResolution must be %s or %s", + i, + storageResolutionHighResolution, + storageResolutionStandard, + ) + } + + if len(metric.Dimensions) > maxDimensionsPerMetric { + return fmt.Errorf("metricData[%d].dimensions exceeds limit of %d entries", i, maxDimensionsPerMetric) + } + + for j, dimension := range metric.Dimensions { + if strings.TrimSpace(dimension.Name) == "" { + return fmt.Errorf("metricData[%d].dimensions[%d].name is required", i, j) + } + if strings.TrimSpace(dimension.Value) == "" { + return fmt.Errorf("metricData[%d].dimensions[%d].value is required", i, j) + } + } + } + + return nil +} + +func buildMetricData(metricInputs []PutMetricDatumConfigurationInput) ([]MetricDatum, []string, error) { + metricData := make([]MetricDatum, 0, len(metricInputs)) + metricNames := make([]string, 0, len(metricInputs)) + + for i, metric := range metricInputs { + metricName := strings.TrimSpace(metric.MetricName) + timestamp, err := parseMetricTimestamp(metric.Timestamp) + if err != nil { + return nil, nil, fmt.Errorf("metricData[%d].timestamp must be a valid datetime", i) + } + + var storageResolution *int + if strings.TrimSpace(metric.StorageResolution) != "" { + value, err := strconv.Atoi(strings.TrimSpace(metric.StorageResolution)) + if err != nil { + return nil, nil, fmt.Errorf("metricData[%d].storageResolution must be numeric", i) + } + storageResolution = &value + } + + dimensions := make([]Dimension, 0, len(metric.Dimensions)) + for j, dimension := range metric.Dimensions { + name := strings.TrimSpace(dimension.Name) + value := strings.TrimSpace(dimension.Value) + if name == "" { + return nil, nil, fmt.Errorf("metricData[%d].dimensions[%d].name is required", i, j) + } + if value == "" { + return nil, nil, fmt.Errorf("metricData[%d].dimensions[%d].value is required", i, j) + } + + dimensions = append(dimensions, Dimension{Name: name, Value: value}) + } + + metricNames = append(metricNames, metricName) + metricData = append(metricData, MetricDatum{ + MetricName: metricName, + Value: *metric.Value, + Unit: strings.TrimSpace(metric.Unit), + Timestamp: timestamp, + StorageResolution: storageResolution, + Dimensions: dimensions, + }) + } + + return metricData, metricNames, nil +} + +func parseMetricTimestamp(value string) (*time.Time, error) { + value = strings.TrimSpace(value) + if value == "" { + return nil, nil + } + + layouts := []string{ + time.RFC3339Nano, + time.RFC3339, + "2006-01-02T15:04:05", + "2006-01-02T15:04", + } + + for _, layout := range layouts { + parsed, err := time.Parse(layout, value) + if err == nil { + return &parsed, nil + } + } + + return nil, fmt.Errorf("invalid timestamp") +} diff --git a/pkg/integrations/aws/cloudwatch/put_metric_data_test.go b/pkg/integrations/aws/cloudwatch/put_metric_data_test.go new file mode 100644 index 0000000000..db6d2bf187 --- /dev/null +++ b/pkg/integrations/aws/cloudwatch/put_metric_data_test.go @@ -0,0 +1,256 @@ +package cloudwatch + +import ( + "io" + "net/http" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/superplanehq/superplane/pkg/core" + "github.com/superplanehq/superplane/test/support/contexts" +) + +func Test__PutMetricData__Setup(t *testing.T) { + component := &PutMetricData{} + + t.Run("invalid configuration -> error", func(t *testing.T) { + err := component.Setup(core.SetupContext{ + Configuration: "invalid", + }) + + require.ErrorContains(t, err, "failed to decode configuration") + }) + + t.Run("missing region -> error", func(t *testing.T) { + err := component.Setup(core.SetupContext{ + Configuration: map[string]any{ + "region": " ", + "namespace": "MyService/Production", + "metricData": []any{ + map[string]any{ + "metricName": "RequestCount", + "value": 1, + }, + }, + }, + }) + + require.ErrorContains(t, err, "region is required") + }) + + t.Run("missing namespace -> error", func(t *testing.T) { + err := component.Setup(core.SetupContext{ + Configuration: map[string]any{ + "region": "us-east-1", + "namespace": " ", + "metricData": []any{ + map[string]any{ + "metricName": "RequestCount", + "value": 1, + }, + }, + }, + }) + + require.ErrorContains(t, err, "namespace is required") + }) + + t.Run("missing metric data -> error", func(t *testing.T) { + err := component.Setup(core.SetupContext{ + Configuration: map[string]any{ + "region": "us-east-1", + "namespace": "MyService/Production", + }, + }) + + require.ErrorContains(t, err, "metric data is required") + }) + + t.Run("missing metric value -> error", func(t *testing.T) { + err := component.Setup(core.SetupContext{ + Configuration: map[string]any{ + "region": "us-east-1", + "namespace": "MyService/Production", + "metricData": []any{ + map[string]any{ + "metricName": "RequestCount", + }, + }, + }, + }) + + require.ErrorContains(t, err, "metricData[0].value is required") + }) + + t.Run("invalid storage resolution -> error", func(t *testing.T) { + err := component.Setup(core.SetupContext{ + Configuration: map[string]any{ + "region": "us-east-1", + "namespace": "MyService/Production", + "metricData": []any{ + map[string]any{ + "metricName": "RequestCount", + "value": 1, + "storageResolution": "5", + }, + }, + }, + }) + + require.ErrorContains(t, err, "metricData[0].storageResolution must be 1 or 60") + }) + + t.Run("invalid dimension -> error", func(t *testing.T) { + err := component.Setup(core.SetupContext{ + Configuration: map[string]any{ + "region": "us-east-1", + "namespace": "MyService/Production", + "metricData": []any{ + map[string]any{ + "metricName": "RequestCount", + "value": 1, + "dimensions": []any{ + map[string]any{"name": "Service", "value": " "}, + }, + }, + }, + }, + }) + + require.ErrorContains(t, err, "metricData[0].dimensions[0].value is required") + }) + + t.Run("valid configuration -> ok", func(t *testing.T) { + err := component.Setup(core.SetupContext{ + Configuration: map[string]any{ + "region": "us-east-1", + "namespace": "MyService/Production", + "metricData": []any{ + map[string]any{ + "metricName": "RequestCount", + "value": 42.0, + "unit": "Count", + "timestamp": "2026-02-10T12:00", + "storageResolution": "1", + "dimensions": []any{ + map[string]any{"name": "Service", "value": "api"}, + }, + }, + }, + }, + }) + + require.NoError(t, err) + }) +} + +func Test__PutMetricData__Execute(t *testing.T) { + component := &PutMetricData{} + + t.Run("invalid configuration -> error", func(t *testing.T) { + err := component.Execute(core.ExecutionContext{ + Configuration: "invalid", + ExecutionState: &contexts.ExecutionStateContext{KVs: map[string]string{}}, + }) + + require.ErrorContains(t, err, "failed to decode configuration") + }) + + t.Run("missing credentials -> error", func(t *testing.T) { + err := component.Execute(core.ExecutionContext{ + Configuration: map[string]any{ + "region": "us-east-1", + "namespace": "MyService/Production", + "metricData": []any{ + map[string]any{ + "metricName": "RequestCount", + "value": 1, + }, + }, + }, + Integration: &contexts.IntegrationContext{Secrets: map[string]core.IntegrationSecret{}}, + ExecutionState: &contexts.ExecutionStateContext{KVs: map[string]string{}}, + }) + + require.ErrorContains(t, err, "AWS session credentials are missing") + }) + + t.Run("valid request -> emits output", func(t *testing.T) { + httpContext := &contexts.HTTPContext{ + Responses: []*http.Response{ + { + StatusCode: http.StatusOK, + Body: io.NopCloser(strings.NewReader(` + + + req-123 + + + `)), + }, + }, + } + + execState := &contexts.ExecutionStateContext{KVs: map[string]string{}} + err := component.Execute(core.ExecutionContext{ + Configuration: map[string]any{ + "region": "us-east-1", + "namespace": "MyService/Production", + "metricData": []any{ + map[string]any{ + "metricName": "RequestCount", + "value": 42.0, + "unit": "Count", + "timestamp": "2026-02-10T12:00", + "storageResolution": "1", + "dimensions": []any{ + map[string]any{"name": "Service", "value": "api"}, + }, + }, + map[string]any{ + "metricName": "LatencyMs", + "value": 18.5, + "unit": "Milliseconds", + }, + }, + }, + HTTP: httpContext, + ExecutionState: execState, + Integration: &contexts.IntegrationContext{ + Secrets: map[string]core.IntegrationSecret{ + "accessKeyId": {Name: "accessKeyId", Value: []byte("key")}, + "secretAccessKey": {Name: "secretAccessKey", Value: []byte("secret")}, + "sessionToken": {Name: "sessionToken", Value: []byte("token")}, + }, + }, + }) + + require.NoError(t, err) + require.Len(t, execState.Payloads, 1) + payload := execState.Payloads[0].(map[string]any)["data"] + output, ok := payload.(PutMetricDataOutput) + require.True(t, ok) + assert.Equal(t, "req-123", output.RequestID) + assert.Equal(t, "us-east-1", output.Region) + assert.Equal(t, "MyService/Production", output.Namespace) + assert.Equal(t, 2, output.MetricCount) + assert.Equal(t, []string{"RequestCount", "LatencyMs"}, output.MetricNames) + + require.Len(t, httpContext.Requests, 1) + request := httpContext.Requests[0] + assert.Equal(t, "https://monitoring.us-east-1.amazonaws.com/", request.URL.String()) + + body, err := io.ReadAll(request.Body) + require.NoError(t, err) + bodyText := string(body) + assert.Contains(t, bodyText, "Action=PutMetricData") + assert.Contains(t, bodyText, "Namespace=MyService%2FProduction") + assert.Contains(t, bodyText, "MetricData.member.1.MetricName=RequestCount") + assert.Contains(t, bodyText, "MetricData.member.1.Value=42") + assert.Contains(t, bodyText, "MetricData.member.1.StorageResolution=1") + assert.Contains(t, bodyText, "MetricData.member.1.Dimensions.member.1.Name=Service") + assert.Contains(t, bodyText, "MetricData.member.2.MetricName=LatencyMs") + }) +} diff --git a/web_src/src/pages/workflowv2/mappers/aws/cloudwatch/put_metric_data.ts b/web_src/src/pages/workflowv2/mappers/aws/cloudwatch/put_metric_data.ts new file mode 100644 index 0000000000..6c35c6681d --- /dev/null +++ b/web_src/src/pages/workflowv2/mappers/aws/cloudwatch/put_metric_data.ts @@ -0,0 +1,112 @@ +import { + ComponentBaseContext, + ComponentBaseMapper, + ExecutionDetailsContext, + ExecutionInfo, + NodeInfo, + OutputPayload, + SubtitleContext, +} from "../../types"; +import { ComponentBaseProps, EventSection } from "@/ui/componentBase"; +import { getBackgroundColorClass, getColorClass } from "@/utils/colors"; +import { getState, getStateMap, getTriggerRenderer } from "../.."; +import awsCloudwatchIcon from "@/assets/icons/integrations/aws.cloudwatch.svg"; +import { formatTimeAgo } from "@/utils/date"; +import { MetadataItem } from "@/ui/metadataList"; +import { stringOrDash } from "../../utils"; + +interface PutMetricDataConfiguration { + region?: string; + namespace?: string; + metricData?: Array<{ metricName?: string }>; +} + +interface PutMetricDataOutput { + requestId?: string; + region?: string; + namespace?: string; + metricCount?: number; + metricNames?: string[]; +} + +export const putMetricDataMapper: ComponentBaseMapper = { + props(context: ComponentBaseContext): ComponentBaseProps { + const lastExecution = context.lastExecutions.length > 0 ? context.lastExecutions[0] : null; + const componentName = context.componentDefinition.name || "unknown"; + + return { + title: context.node.name || context.componentDefinition.label || "Unnamed component", + iconSrc: awsCloudwatchIcon, + iconColor: getColorClass(context.componentDefinition.color), + collapsedBackground: getBackgroundColorClass(context.componentDefinition.color), + collapsed: context.node.isCollapsed, + eventSections: lastExecution ? getPutMetricDataEventSections(context.nodes, lastExecution, componentName) : undefined, + includeEmptyState: !lastExecution, + metadata: getPutMetricDataMetadata(context.node), + eventStateMap: getStateMap(componentName), + }; + }, + + getExecutionDetails(context: ExecutionDetailsContext): Record { + const outputs = context.execution.outputs as { default?: OutputPayload[] } | undefined; + const result = outputs?.default?.[0]?.data as PutMetricDataOutput | undefined; + + if (!result) { + return {}; + } + + return { + Namespace: stringOrDash(result.namespace), + Region: stringOrDash(result.region), + "Metric Count": stringOrDash(result.metricCount), + "Metric Names": (result.metricNames || []).length > 0 ? (result.metricNames || []).join(", ") : "-", + "Request ID": stringOrDash(result.requestId), + }; + }, + + subtitle(context: SubtitleContext): string { + if (!context.execution.createdAt) { + return ""; + } + + return formatTimeAgo(new Date(context.execution.createdAt)); + }, +}; + +function getPutMetricDataMetadata(node: NodeInfo): MetadataItem[] { + const config = node.configuration as PutMetricDataConfiguration | undefined; + const metadata: MetadataItem[] = []; + + if (config?.region) { + metadata.push({ icon: "globe", label: config.region }); + } + + if (config?.namespace) { + metadata.push({ icon: "database", label: config.namespace }); + } + + if ((config?.metricData || []).length > 0) { + metadata.push({ + icon: "list", + label: `${config?.metricData?.length} metric${config?.metricData?.length === 1 ? "" : "s"}`, + }); + } + + return metadata; +} + +function getPutMetricDataEventSections(nodes: NodeInfo[], execution: ExecutionInfo, componentName: string): EventSection[] { + const rootTriggerNode = nodes.find((n) => n.id === execution.rootEvent?.nodeId); + const rootTriggerRenderer = getTriggerRenderer(rootTriggerNode?.componentName!); + const { title } = rootTriggerRenderer.getTitleAndSubtitle({ event: execution.rootEvent }); + + return [ + { + receivedAt: new Date(execution.createdAt!), + eventTitle: title, + eventSubtitle: formatTimeAgo(new Date(execution.createdAt!)), + eventState: getState(componentName)(execution), + eventId: execution.rootEvent!.id!, + }, + ]; +} diff --git a/web_src/src/pages/workflowv2/mappers/aws/index.ts b/web_src/src/pages/workflowv2/mappers/aws/index.ts index e431554a64..58822234d4 100644 --- a/web_src/src/pages/workflowv2/mappers/aws/index.ts +++ b/web_src/src/pages/workflowv2/mappers/aws/index.ts @@ -15,6 +15,7 @@ import { deleteRepositoryMapper } from "./codeartifact/delete_repository"; import { disposePackageVersionsMapper } from "./codeartifact/dispose_package_versions"; import { updatePackageVersionsStatusMapper } from "./codeartifact/update_package_versions_status"; import { onAlarmTriggerRenderer } from "./cloudwatch/on_alarm"; +import { putMetricDataMapper } from "./cloudwatch/put_metric_data"; export const componentMappers: Record = { "lambda.runFunction": runFunctionMapper, @@ -28,6 +29,7 @@ export const componentMappers: Record = { "codeArtifact.disposePackageVersions": disposePackageVersionsMapper, "codeArtifact.getPackageVersion": getPackageVersionMapper, "codeArtifact.updatePackageVersionsStatus": updatePackageVersionsStatusMapper, + "cloudwatch.putMetricData": putMetricDataMapper, }; export const triggerRenderers: Record = { @@ -48,4 +50,5 @@ export const eventStateRegistry: Record = { "codeArtifact.disposePackageVersions": buildActionStateRegistry("disposed"), "codeArtifact.getPackageVersion": buildActionStateRegistry("retrieved"), "codeArtifact.updatePackageVersionsStatus": buildActionStateRegistry("updated"), + "cloudwatch.putMetricData": buildActionStateRegistry("pushed"), };