diff --git a/alchemy/package.json b/alchemy/package.json index dde4e5974..d4438ae21 100644 --- a/alchemy/package.json +++ b/alchemy/package.json @@ -148,7 +148,9 @@ "ai": "^4.0.0", "arktype": "^2.0.0", "cloudflare": "^4.0.0", + "diff": "^8.0.2", "dofs": "^0.0.1", + "effect": "^3.0.0", "hono": "^4.0.0", "prettier": "^3.0.0", "stripe": "^17.0.0", diff --git a/alchemy/src/aws/account-id.ts b/alchemy/src/aws/account-id.ts index 90f33fc65..052f58ebd 100644 --- a/alchemy/src/aws/account-id.ts +++ b/alchemy/src/aws/account-id.ts @@ -1,15 +1,29 @@ -import { GetCallerIdentityCommand, STSClient } from "@aws-sdk/client-sts"; - -const sts = new STSClient({}); +import { Effect } from "effect"; +import { createAwsClient, type AwsError } from "./client.ts"; export type AccountId = string & { readonly __brand: "AccountId"; }; /** - * Helper to get the current AWS account ID + * Helper to get the current AWS account ID using Effect-based API + */ +export function AccountId(): Effect.Effect { + return Effect.gen(function* () { + const client = yield* createAwsClient({ service: "sts" }); + const identity = yield* client.postJson<{ + GetCallerIdentityResult: { Account: string }; + }>("/", { + Action: "GetCallerIdentity", + Version: "2011-06-15", + }); + return identity.GetCallerIdentityResult.Account as AccountId; + }); +} + +/** + * Helper to get the current AWS account ID as a Promise (for backwards compatibility) */ -export async function AccountId(): Promise { - const identity = await sts.send(new GetCallerIdentityCommand({})); - return identity.Account! as AccountId; +export async function getAccountId(): Promise { + return Effect.runPromise(AccountId()); } diff --git a/alchemy/src/aws/bucket.ts b/alchemy/src/aws/bucket.ts index cfb4a3853..80aaf0ec0 100644 --- a/alchemy/src/aws/bucket.ts +++ b/alchemy/src/aws/bucket.ts @@ -1,19 +1,35 @@ -import { - CreateBucketCommand, - DeleteBucketCommand, - GetBucketAclCommand, - GetBucketLocationCommand, - GetBucketTaggingCommand, - GetBucketVersioningCommand, - HeadBucketCommand, - NoSuchBucket, - PutBucketTaggingCommand, - S3Client, -} from "@aws-sdk/client-s3"; -import type { Context } from "../context.ts"; -import { Resource } from "../resource.ts"; -import { ignore } from "../util/ignore.ts"; -import { retry } from "./retry.ts"; +import { Effect } from "effect"; +import { createAwsClient } from "./client.ts"; +import { EffectResource } from "./effect-resource.ts"; + +/** + * AWS S3 API response interfaces for type safety + */ +interface S3LocationResponse { + LocationConstraint?: string; +} + +interface S3TaggingResponse { + Tagging?: { + TagSet?: Array<{ Key: string; Value: string }>; + }; +} + +interface S3VersioningResponse { + VersioningConfiguration?: { + Status?: "Enabled" | "Suspended"; + }; +} + +interface S3AclResponse { + AccessControlPolicy?: { + AccessControlList?: { + Grant?: Array<{ + Permission?: string; + }>; + }; + }; +} /** * Properties for creating or updating an S3 bucket @@ -127,108 +143,89 @@ export interface Bucket extends Resource<"s3::Bucket">, BucketProps { * } * }); */ -export const Bucket = Resource( +export const Bucket = EffectResource( "s3::Bucket", - async function (this: Context, _id: string, props: BucketProps) { - const client = new S3Client({}); + function* (_id, props) { + const client = yield* createAwsClient({ service: "s3" }); if (this.phase === "delete") { - await ignore(NoSuchBucket.name, () => - retry(() => - client.send( - new DeleteBucketCommand({ - Bucket: props.bucketName, - }), - ), - ), - ); - return this.destroy(); + yield* client + .delete(`/${props.bucketName}`) + .pipe(Effect.catchAll(() => Effect.unit)); + return yield* this.destroy(); } - try { - // Check if bucket exists - await retry(() => - client.send( - new HeadBucketCommand({ - Bucket: props.bucketName, - }), - ), + + // Helper function to create tagging XML + const createTaggingXml = (tags: Record) => { + const tagSet = Object.entries(tags).map(([Key, Value]) => ({ + Key, + Value, + })); + return `${tagSet + .map( + ({ Key, Value }) => + `${Key}${Value}`, + ) + .join("")}`; + }; + + // Try to check if bucket exists and update tags if needed + const bucketExists = yield* client + .request("HEAD", `/${props.bucketName}`) + .pipe( + Effect.map(() => true), + Effect.catchTag("AwsNotFoundError", () => Effect.succeed(false)), ); + if (bucketExists) { // Update tags if they changed and bucket exists if (this.phase === "update" && props.tags) { - await retry(() => - client.send( - new PutBucketTaggingCommand({ - Bucket: props.bucketName, - Tagging: { - TagSet: Object.entries(props.tags!).map(([Key, Value]) => ({ - Key, - Value, - })), - }, - }), - ), - ); + const taggingXml = createTaggingXml(props.tags); + yield* client.put(`/${props.bucketName}?tagging`, taggingXml, { + "Content-Type": "application/xml", + }); } - } catch (error: any) { - if (error.name === "NotFound") { - // Create bucket if it doesn't exist - await retry(() => - client.send( - new CreateBucketCommand({ - Bucket: props.bucketName, - // Add tags during creation if specified - ...(props.tags && { - Tagging: { - TagSet: Object.entries(props.tags).map(([Key, Value]) => ({ - Key, - Value, - })), - }, - }), - }), - ), - ); - } else { - throw error; + } else { + // Create bucket if it doesn't exist + yield* client.put(`/${props.bucketName}`); + + // Add tags after creation if specified + if (props.tags) { + const taggingXml = createTaggingXml(props.tags); + yield* client.put(`/${props.bucketName}?tagging`, taggingXml, { + "Content-Type": "application/xml", + }); } } - // Get bucket details + // Get bucket details in parallel const [locationResponse, versioningResponse, aclResponse] = - await Promise.all([ - retry(() => - client.send( - new GetBucketLocationCommand({ Bucket: props.bucketName }), - ), - ), - retry(() => - client.send( - new GetBucketVersioningCommand({ Bucket: props.bucketName }), - ), - ), - retry(() => - client.send(new GetBucketAclCommand({ Bucket: props.bucketName })), - ), + yield* Effect.all([ + client.get(`/${props.bucketName}?location`, { + Host: `${props.bucketName}.s3.amazonaws.com`, + }), + client.get(`/${props.bucketName}?versioning`), + client.get(`/${props.bucketName}?acl`), ]); - const region = locationResponse.LocationConstraint || "us-east-1"; + const region = locationResponse?.LocationConstraint || "us-east-1"; - // Get tags if they exist + // Get tags if they weren't provided let tags = props.tags; if (!tags) { - try { - const taggingResponse = await retry(() => - client.send( - new GetBucketTaggingCommand({ Bucket: props.bucketName }), - ), + const taggingResponse = yield* client + .get(`/${props.bucketName}?tagging`) + .pipe( + Effect.catchTag("AwsNotFoundError", () => Effect.succeed(null)), // Tags don't exist - OK ); - tags = Object.fromEntries( - taggingResponse.TagSet?.map(({ Key, Value }) => [Key, Value]) || [], - ); - } catch (error: any) { - if (error.name !== "NoSuchTagSet") { - throw error; + + if (taggingResponse) { + // Parse XML response to extract tags + const tagSet = taggingResponse.Tagging?.TagSet; + if (Array.isArray(tagSet)) { + tags = Object.fromEntries( + tagSet.map(({ Key, Value }) => [Key, Value]) || [], + ); } } } @@ -240,8 +237,9 @@ export const Bucket = Resource( bucketRegionalDomainName: `${props.bucketName}.s3.${region}.amazonaws.com`, region, hostedZoneId: getHostedZoneId(region), - versioningEnabled: versioningResponse.Status === "Enabled", - acl: aclResponse.Grants?.[0]?.Permission?.toLowerCase(), + versioningEnabled: + versioningResponse?.VersioningConfiguration?.Status === "Enabled", + acl: aclResponse?.AccessControlPolicy?.AccessControlList?.Grant?.[0]?.Permission?.toLowerCase(), ...(tags && { tags }), }); }, diff --git a/alchemy/src/aws/client.ts b/alchemy/src/aws/client.ts new file mode 100644 index 000000000..59db3d040 --- /dev/null +++ b/alchemy/src/aws/client.ts @@ -0,0 +1,351 @@ +import { fromNodeProviderChain } from "@aws-sdk/credential-providers"; +import { loadConfig } from "@smithy/node-config-provider"; +import { AwsClient } from "aws4fetch"; +import { Effect, Schedule, Data } from "effect"; +import { safeFetch } from "../util/safe-fetch.ts"; + +/** + * AWS service-specific tagged errors using Effect's Data.TaggedError + */ +export class AwsError extends Data.TaggedError("AwsError")<{ + readonly message: string; + readonly errorCode: string; + readonly response?: Response; + readonly data?: any; +}> {} + +export class AwsNetworkError extends Data.TaggedError("AwsNetworkError")<{ + readonly message: string; + readonly errorCode: string; + readonly response?: Response; + readonly data?: any; +}> {} + +export class AwsThrottleError extends Data.TaggedError("AwsThrottleError")<{ + readonly message: string; + readonly errorCode: string; + readonly response?: Response; + readonly data?: any; +}> {} + +export class AwsNotFoundError extends Data.TaggedError("AwsNotFoundError")<{ + readonly message: string; + readonly errorCode: string; + readonly response?: Response; + readonly data?: any; +}> {} + +export class AwsAccessDeniedError extends Data.TaggedError( + "AwsAccessDeniedError", +)<{ + readonly message: string; + readonly errorCode: string; + readonly response?: Response; + readonly data?: any; +}> {} + +export class AwsValidationError extends Data.TaggedError("AwsValidationError")<{ + readonly message: string; + readonly errorCode: string; + readonly response?: Response; + readonly data?: any; +}> {} + +export class AwsConflictError extends Data.TaggedError("AwsConflictError")<{ + readonly message: string; + readonly errorCode: string; + readonly response?: Response; + readonly data?: any; +}> {} + +export class AwsInternalServerError extends Data.TaggedError( + "AwsInternalServerError", +)<{ + readonly message: string; + readonly errorCode: string; + readonly response?: Response; + readonly data?: any; +}> {} + +/** + * Options for AWS client creation + */ +export interface AwsClientOptions { + /** + * AWS region to use + */ + region?: string; + + /** + * AWS service name (e.g., 's3', 'sqs', 'lambda') + */ + service: string; + + /** + * AWS access key ID (overrides environment variable) + */ + accessKeyId?: string; + + /** + * AWS secret access key (overrides environment variable) + */ + secretAccessKey?: string; + + /** + * AWS session token for temporary credentials + */ + sessionToken?: string; + + /** + * Maximum number of retries for retryable errors + */ + maxRetries?: number; +} + +const getRegion = loadConfig({ + environmentVariableSelector: (env) => + env.AWS_REGION || env.AWS_DEFAULT_REGION, + configFileSelector: (profile) => profile.region, + default: undefined, +}); + +/** + * Create an AWS client using aws4fetch with native Effect + */ +export function createAwsClient( + options: AwsClientOptions, +): Effect.Effect { + return Effect.gen(function* () { + const credentials = yield* Effect.tryPromise({ + try: () => fromNodeProviderChain()(), + catch: (error) => + new AwsError({ + message: + error instanceof Error + ? error.message + : "Failed to load AWS credentials", + errorCode: "CredentialsError", + }), + }); + + const region = yield* Effect.gen(function* () { + if (options.region) return options.region; + + const configRegion = yield* Effect.tryPromise({ + try: () => getRegion(), + catch: () => null, + }).pipe(Effect.catchAll(() => Effect.succeed(null))); + + return ( + configRegion ?? + process.env.AWS_REGION ?? + process.env.AWS_DEFAULT_REGION ?? + null + ); + }); + + if (!region) { + yield* Effect.fail( + new AwsError({ + message: + "No region found. Please set AWS_REGION or AWS_DEFAULT_REGION in the environment or in your AWS profile.", + errorCode: "RegionNotFound", + }), + ); + } + + const client = new AwsClient({ + ...credentials, + service: options.service, + region, + }); + + return new AwsClientWrapper(client, { + ...options, + region, + }); + }); +} + +export class AwsClientWrapper { + private region: string; + private service: string; + private maxRetries: number; + + constructor( + private readonly client: AwsClient, + options: AwsClientOptions & { region: string }, + ) { + this.region = options.region; + this.service = options.service; + this.maxRetries = options.maxRetries || 3; + } + + /** + * Make a request to AWS using aws4fetch with Effect-based error handling + */ + public request( + method: string, + path: string, + _params?: Record, + options?: { + headers?: Record; + body?: string; + maxRetries?: number; + }, + ): Effect.Effect { + const maxRetries = options?.maxRetries || this.maxRetries; + + const makeRequest = Effect.tryPromise({ + try: async () => { + // Special URL handling for S3 + const url = + this.service === "s3" + ? `https://s3.${this.region}.amazonaws.com${path}` + : `https://${this.service}.${this.region}.amazonaws.com${path}`; + + const requestOptions = { + method, + headers: { + // Don't set default Content-Type for all services + ...(this.service !== "s3" && { + "Content-Type": "application/x-amz-json-1.1", + }), + ...options?.headers, + }, + ...(options?.body && { body: options.body }), + }; + + const signedRequest = await this.client.sign(url, requestOptions); + const response = await safeFetch(signedRequest); + + if (!response.ok) { + // Try to parse as XML for S3, JSON for others + let data: any = {}; + try { + if (this.service === "s3") { + const text = await response.text(); + data = { message: text, statusText: response.statusText }; + } else { + data = await response.json(); + } + } catch { + data = { statusText: response.statusText }; + } + throw this.createError(response, data); + } + + // For S3 HEAD requests, return empty object + if (method === "HEAD") { + return {} as T; + } + + // For S3, try to parse as XML first, then JSON + if (this.service === "s3") { + const text = await response.text(); + // For now, return the raw text - in a real implementation you'd parse XML + return (text ? { data: text } : {}) as T; + } + + return (await response.json()) as T; + }, + catch: (error): AwsError => { + if (error instanceof AwsError) { + return error; + } + return new AwsNetworkError({ + message: + error instanceof Error + ? error.message + : "Network error during AWS request", + errorCode: "NetworkError", + }); + }, + }); + + // Use Effect's retry with exponential backoff for retryable errors + const schedule = Schedule.exponential(100); // 100ms + return makeRequest.pipe( + Effect.retry({ + while: (error) => + error._tag === "AwsThrottleError" || error._tag === "AwsNetworkError", + times: maxRetries, + schedule, + }), + ); + } + + /** + * Make a POST request with JSON body + */ + public postJson( + path: string, + body: Record, + headers?: Record, + ): Effect.Effect { + return this.request("POST", path, undefined, { + headers, + body: JSON.stringify(body), + }); + } + + /** + * Make a GET request + */ + public get( + path: string, + headers?: Record, + ): Effect.Effect { + return this.request("GET", path, undefined, { headers }); + } + + /** + * Make a DELETE request + */ + public delete( + path: string, + headers?: Record, + ): Effect.Effect { + return this.request("DELETE", path, undefined, { headers }); + } + + /** + * Make a PUT request + */ + public put( + path: string, + body?: string, + headers?: Record, + ): Effect.Effect { + return this.request("PUT", path, undefined, { + headers, + ...(body && { body }), + }); + } + + private createError(response: Response, data: any): AwsError { + const errorCode = data.Code || data.__type || response.status.toString(); + const message = data.Message || data.message || response.statusText; + + if (response.status === 404 || errorCode.includes("NotFound")) { + return new AwsNotFoundError({ message, errorCode, response, data }); + } + if (response.status === 403 || errorCode.includes("AccessDenied")) { + return new AwsAccessDeniedError({ message, errorCode, response, data }); + } + if (response.status === 429 || errorCode.includes("Throttling")) { + return new AwsThrottleError({ message, errorCode, response, data }); + } + if (response.status === 400 || errorCode.includes("ValidationException")) { + return new AwsValidationError({ message, errorCode, response, data }); + } + if (response.status === 409 || errorCode.includes("Conflict")) { + return new AwsConflictError({ message, errorCode, response, data }); + } + if (response.status >= 500) { + return new AwsInternalServerError({ message, errorCode, response, data }); + } + + return new AwsError({ message, errorCode, response, data }); + } +} diff --git a/alchemy/src/aws/effect-resource.ts b/alchemy/src/aws/effect-resource.ts new file mode 100644 index 000000000..75699c1e6 --- /dev/null +++ b/alchemy/src/aws/effect-resource.ts @@ -0,0 +1,44 @@ +import { Effect } from "effect"; +import type { Context } from "../context.ts"; +import { Resource } from "../resource.ts"; + +/** + * Creates a Resource that uses Effect throughout the entire lifecycle + * + * This wrapper allows resources to be implemented using Effect's declarative + * flow control features while maintaining compatibility with the existing + * Resource interface that expects Promise return types. + * + * For delete operations, the effectHandler should use `yield* this.destroy()` + * to explicitly handle destruction within the Effect chain. + */ +export function EffectResource, P>( + type: string, + effectHandler: ( + this: EffectContext, + id: string, + props: P, + ) => Generator, T, any>, +) { + return Resource( + type, + async function (this: Context, id: string, props: P): Promise { + // Create Effect-wrapped context with destroy() as Effect + const effectContext: EffectContext = { + ...this, + destroy: () => Effect.sync(() => this.destroy()), + }; + + return await Effect.runPromise( + Effect.gen(effectHandler.bind(effectContext, id, props)), + ); + }, + ); +} + +/** + * Effect-wrapped Context that provides destroy() as an Effect operation + */ +type EffectContext> = Context & { + destroy: () => Effect.Effect; +}; diff --git a/alchemy/src/aws/queue.ts b/alchemy/src/aws/queue.ts index 5500de8b8..7cdff7a7c 100644 --- a/alchemy/src/aws/queue.ts +++ b/alchemy/src/aws/queue.ts @@ -1,16 +1,7 @@ -import { - CreateQueueCommand, - DeleteQueueCommand, - GetQueueAttributesCommand, - GetQueueUrlCommand, - QueueDeletedRecently, - QueueDoesNotExist, - SQSClient, -} from "@aws-sdk/client-sqs"; -import type { Context } from "../context.ts"; -import { Resource } from "../resource.ts"; +import { Effect, Schedule } from "effect"; import { logger } from "../util/logger.ts"; -import { retry } from "./retry.ts"; +import { createAwsClient } from "./client.ts"; +import { EffectResource } from "./effect-resource.ts"; /** * Properties for creating or updating an SQS queue @@ -136,71 +127,74 @@ export interface Queue extends Resource<"sqs::Queue">, QueueProps { * receiveMessageWaitTimeSeconds: 20 * }); */ -export const Queue = Resource( +export const Queue = EffectResource( "sqs::Queue", - async function ( - this: Context, - _id: string, - props: QueueProps, - ): Promise { - const client = new SQSClient({}); - // Don't automatically add .fifo suffix - user must include it in queueName + function* (_id, props) { + const client = yield* createAwsClient({ service: "sqs" }); const queueName = props.queueName; // Validate that FIFO queues have .fifo suffix if (props.fifo && !queueName.endsWith(".fifo")) { - throw new Error("FIFO queue names must end with .fifo suffix"); + yield* Effect.fail( + new Error("FIFO queue names must end with .fifo suffix"), + ); } if (this.phase === "delete") { - try { - // Get queue URL first - const urlResponse = await retry(() => - client.send( - new GetQueueUrlCommand({ - QueueName: queueName, + // Get queue URL and delete it, ignoring not found errors + const deleteQueue = Effect.gen(function* () { + const urlResponse = yield* client.postJson<{ QueueUrl: string }>("/", { + Action: "GetQueueUrl", + QueueName: queueName, + Version: "2012-11-05", + }); + + yield* client.postJson("/", { + Action: "DeleteQueue", + QueueUrl: urlResponse.QueueUrl, + Version: "2012-11-05", + }); + + // Wait for queue to be deleted using Effect.repeat + yield* client + .postJson("/", { + Action: "GetQueueUrl", + QueueName: queueName, + Version: "2012-11-05", + }) + .pipe( + Effect.flatMap(() => Effect.sleep(1000)), // 1 second + Effect.repeat({ + until: () => false, // Keep trying until it fails }), - ), - ); - - // Delete the queue - await retry(() => - client.send( - new DeleteQueueCommand({ - QueueUrl: urlResponse.QueueUrl, + Effect.catchSome((error) => { + if ( + error._tag === "AwsNotFoundError" || + isQueueDoesNotExist(error) + ) { + return Effect.succeed(null); // Queue is deleted + } + return Effect.fail(error); }), - ), - ); - - // Wait for queue to be deleted - let queueDeleted = false; - while (!queueDeleted) { - try { - await retry(() => { - return client.send( - new GetQueueUrlCommand({ - QueueName: queueName, - }), - ); - }); - // If we get here, queue still exists - await new Promise((resolve) => setTimeout(resolve, 1000)); - } catch (error: any) { - if (isQueueDoesNotExist(error)) { - queueDeleted = true; - } else { - throw error; - } + ); + }); + + yield* deleteQueue.pipe( + Effect.catchAll((error) => { + if (error._tag === "AwsNotFoundError" || isQueueDoesNotExist(error)) { + return Effect.unit; } - } - } catch (error: any) { - logger.log(error.message); - if (!isQueueDoesNotExist(error)) { - throw error; - } - } - return this.destroy(); + const message = + error._tag === "AwsError" ? error.message : String(error); + return Effect.sync(() => logger.log(message)).pipe( + Effect.flatMap(() => Effect.unit), + ); + }), + ); + + return yield* this.destroy(); } + // Create queue with attributes const attributes: Record = {}; @@ -244,103 +238,89 @@ export const Queue = Resource( ) : undefined; - try { - // Create the queue - const createResponse = await retry( - () => - client.send( - new CreateQueueCommand({ - QueueName: queueName, - Attributes: attributes, - tags, - }), - ), - (err) => isQueueDeletedRecently(err), + // Create the queue parameters + const createParams: Record = { + Action: "CreateQueue", + QueueName: queueName, + Version: "2012-11-05", + }; + + // Add attributes + Object.entries(attributes).forEach(([key, value], index) => { + createParams[`Attribute.${index + 1}.Name`] = key; + createParams[`Attribute.${index + 1}.Value`] = value; + }); + + // Add tags + if (tags) { + Object.entries(tags).forEach(([key, value], index) => { + createParams[`Tag.${index + 1}.Key`] = key; + createParams[`Tag.${index + 1}.Value`] = value; + }); + } + + // Create queue with retry logic for recently deleted queues + const createQueue = Effect.gen(function* () { + const createResponse = yield* client.postJson<{ QueueUrl: string }>( + "/", + createParams, ); // Get queue attributes - const attributesResponse = await retry(() => - client.send( - new GetQueueAttributesCommand({ - QueueUrl: createResponse.QueueUrl, - AttributeNames: ["QueueArn"], - }), - ), - ); + const attributesResponse = yield* client.postJson<{ + Attributes: Record; + }>("/", { + Action: "GetQueueAttributes", + QueueUrl: createResponse.QueueUrl, + AttributeNames: ["QueueArn"], + Version: "2012-11-05", + }); return this({ ...props, - arn: attributesResponse.Attributes!.QueueArn!, - url: createResponse.QueueUrl!, + arn: attributesResponse.Attributes.QueueArn, + url: createResponse.QueueUrl, }); - } catch (error: any) { - if (isQueueDeletedRecently(error)) { - logger.log( - `Queue "${queueName}" was recently deleted and can't be re-created. Waiting and retrying...`, - ); - // Queue was recently deleted, wait and retry - const maxRetries = 61; - let retryCount = 0; - - while (retryCount < maxRetries) { - try { - // Wait for 1 second before retrying - await new Promise((resolve) => setTimeout(resolve, 1000)); - - // Retry creating the queue - const createResponse = await retry(() => - client.send( - new CreateQueueCommand({ - QueueName: queueName, - Attributes: attributes, - tags, - }), - ), - ); - - // Get queue attributes - const attributesResponse = await retry(() => - client.send( - new GetQueueAttributesCommand({ - QueueUrl: createResponse.QueueUrl, - AttributeNames: ["QueueArn"], - }), - ), - ); - - return this({ - ...props, - arn: attributesResponse.Attributes!.QueueArn!, - url: createResponse.QueueUrl!, - }); - } catch (retryError: any) { - if ( - !isQueueDeletedRecently(retryError) || - retryCount === maxRetries - 1 - ) { - throw retryError; - } - retryCount++; - } + }); + + // Handle queue creation with retry for recently deleted queues + const result = yield* createQueue.pipe( + Effect.catchSome((error) => { + if (isQueueDeletedRecently(error)) { + // Use Effect's built-in retry with exponential backoff + return Effect.sync(() => + logger.log( + `Queue "${queueName}" was recently deleted and can't be re-created. Waiting and retrying...`, + ), + ).pipe( + Effect.flatMap(() => createQueue), + Effect.retry({ + times: 60, + schedule: Schedule.spaced(1000), // 1 second + }), + ); } - } - throw error; - } + return Effect.fail(error); + }), + ); + + return result; }, ); -function isQueueDoesNotExist(error: any): error is QueueDoesNotExist { +function isQueueDoesNotExist(error: any): boolean { return ( error.name === "QueueDoesNotExist" || error.Code === "AWS.SimpleQueueService.NonExistentQueue" || - error instanceof QueueDoesNotExist + (error._tag === "AwsError" && error.message.includes("NonExistentQueue")) ); } -function isQueueDeletedRecently(error: any): error is QueueDeletedRecently { +function isQueueDeletedRecently(error: any): boolean { return ( - error instanceof QueueDeletedRecently || error.Code === "AWS.SimpleQueueService.QueueDeletedRecently" || - error.name === "QueueDeletedRecently" + error.name === "QueueDeletedRecently" || + (error._tag === "AwsError" && + error.message.includes("QueueDeletedRecently")) ); } diff --git a/alchemy/src/aws/ssm-parameter.ts b/alchemy/src/aws/ssm-parameter.ts index 7d66aa1c7..801b3d625 100644 --- a/alchemy/src/aws/ssm-parameter.ts +++ b/alchemy/src/aws/ssm-parameter.ts @@ -1,18 +1,8 @@ -import { - AddTagsToResourceCommand, - DeleteParameterCommand, - GetParameterCommand, - ParameterAlreadyExists, - ParameterNotFound, - PutParameterCommand, - SSMClient, - type Tag, -} from "@aws-sdk/client-ssm"; -import type { Context } from "../context.ts"; -import { Resource } from "../resource.ts"; +import { Effect } from "effect"; import { type Secret, isSecret } from "../secret.ts"; -import { ignore } from "../util/ignore.ts"; import { logger } from "../util/logger.ts"; +import { createAwsClient } from "./client.ts"; +import { EffectResource } from "./effect-resource.ts"; /** * Base properties shared by all SSM Parameter types @@ -172,145 +162,144 @@ export type SSMParameter = Resource<"ssm::Parameter"> & { * } * }); */ -export const SSMParameter = Resource( +export const SSMParameter = EffectResource( "ssm::Parameter", - async function ( - this: Context, - _id: string, - props: SSMParameterProps, - ): Promise { - const client = new SSMClient({}); + function* (_id, props) { + const client = yield* createAwsClient({ service: "ssm" }); if (this.phase === "delete") { - try { - await ignore(ParameterNotFound.name, () => - client.send( - new DeleteParameterCommand({ - Name: props.name, - }), - ), - ); - } catch (error: any) { - if (!(error instanceof ParameterNotFound)) { - throw error; - } - } + yield* client + .postJson("/", { + Action: "DeleteParameter", + Name: props.name, + Version: "2014-11-06", + }) + .pipe(Effect.catchAll(() => Effect.unit)); - return this.destroy(); + return yield* this.destroy(); } const parameterType = props.type || "String"; // Extract the actual value and handle type-specific conversions - let parameterValue: string; - if (isSecret(props.value)) { - parameterValue = props.value.unencrypted; - } else if (Array.isArray(props.value)) { - // Convert string array to comma-separated string for StringList - parameterValue = props.value.join(","); - } else { - parameterValue = props.value; - } + const parameterValue = isSecret(props.value) + ? props.value.unencrypted + : Array.isArray(props.value) + ? props.value.join(",") + : props.value; + + // Helper to create tags with alchemy defaults + const createTags = () => [ + ...Object.entries(props.tags || {}).map(([Key, Value]) => ({ + Key, + Value, + })), + { Key: "alchemy_stage", Value: this.stage }, + { Key: "alchemy_resource", Value: this.id }, + ]; + + // Helper to create base parameter params + const createBaseParams = (overwrite: boolean) => { + const params: Record = { + Action: "PutParameter", + Name: props.name, + Value: parameterValue, + Type: parameterType, + Overwrite: overwrite, + Version: "2014-11-06", + }; + + if (props.description) params.Description = props.description; + if (props.keyId) params.KeyId = props.keyId; + if (props.tier) params.Tier = props.tier; + if (props.policies) params.Policies = props.policies; + if (props.dataType) params.DataType = props.dataType; + + return params; + }; + + // Try to create parameter with tags first + const createWithTags = Effect.gen(function* () { + const tags = createTags(); + const putParams = createBaseParams(false); + + // Add tags to parameters + tags.forEach((tag, index) => { + putParams[`Tags.member.${index + 1}.Key`] = tag.Key; + putParams[`Tags.member.${index + 1}.Value`] = tag.Value; + }); - try { - // First, try to create the parameter without overwrite to include tags - try { - await client.send( - new PutParameterCommand({ - Name: props.name, - Value: parameterValue, - Type: parameterType, - Description: props.description, - KeyId: props.keyId, - Tier: props.tier, - Policies: props.policies, - DataType: props.dataType, - Overwrite: false, // Don't overwrite, include tags - Tags: [ - ...Object.entries(props.tags || {}).map(([Key, Value]) => ({ - Key, - Value, - })), - { - Key: "alchemy_stage", - Value: this.stage, - }, - { - Key: "alchemy_resource", - Value: this.id, - }, - ], - }), - ); - } catch (error: any) { - // If parameter already exists, update it with overwrite (no tags in this call) - if (error instanceof ParameterAlreadyExists) { - await client.send( - new PutParameterCommand({ - Name: props.name, - Value: parameterValue, - Type: parameterType, - Description: props.description, - KeyId: props.keyId, - Tier: props.tier, - Policies: props.policies, - DataType: props.dataType, - Overwrite: true, // Overwrite existing, no tags - }), - ); + yield* client.postJson("/", putParams); + }); + + // Update existing parameter and tags separately + const updateExisting = Effect.gen(function* () { + const updateParams = createBaseParams(true); + yield* client.postJson("/", updateParams); + + // Update tags separately for existing parameters + const tags = createTags(); + const tagParams: Record = { + Action: "AddTagsToResource", + ResourceType: "Parameter", + ResourceId: props.name, + Version: "2014-11-06", + }; - // Update tags separately for existing parameters - const tags: Tag[] = [ - ...Object.entries(props.tags || {}).map(([Key, Value]) => ({ - Key, - Value, - })), - { - Key: "alchemy_stage", - Value: this.stage, - }, - { - Key: "alchemy_resource", - Value: this.id, - }, - ]; + tags.forEach((tag, index) => { + tagParams[`Tags.member.${index + 1}.Key`] = tag.Key; + tagParams[`Tags.member.${index + 1}.Value`] = tag.Value; + }); + + yield* client.postJson("/", tagParams); + }); - await client.send( - new AddTagsToResourceCommand({ - ResourceType: "Parameter", - ResourceId: props.name, - Tags: tags, - }), - ); - } else { - throw error; + // Try create first, fallback to update if already exists + yield* createWithTags.pipe( + Effect.catchSome((error) => { + if ( + error._tag === "AwsError" && + error.message.includes("AlreadyExists") + ) { + return updateExisting; } - } + return Effect.fail(error); + }), + ); - // Get the updated parameter - const parameter = await client.send( - new GetParameterCommand({ - Name: props.name, - WithDecryption: true, + // Get the updated parameter + const parameter = yield* client + .postJson<{ Parameter: any }>("/", { + Action: "GetParameter", + Name: props.name, + WithDecryption: true, + Version: "2014-11-06", + }) + .pipe( + Effect.catchAll((error) => { + return Effect.sync(() => + logger.error( + `Error creating/updating parameter ${props.name}:`, + error, + ), + ).pipe(Effect.flatMap(() => Effect.fail(error))); }), ); - if (!parameter?.Parameter) { - throw new Error(`Failed to create or update parameter ${props.name}`); - } - - return this({ - ...props, - arn: parameter.Parameter.ARN!, - version: parameter.Parameter.Version!, - lastModifiedDate: parameter.Parameter.LastModifiedDate!, - name: parameter.Parameter.Name ?? props.name, - value: props.value, - type: (parameter.Parameter.Type as any) ?? parameterType, - } as SSMParameter); - } catch (error: any) { - logger.error(`Error creating/updating parameter ${props.name}:`, error); - throw error; + if (!parameter?.Parameter) { + yield* Effect.fail( + new Error(`Failed to create or update parameter ${props.name}`), + ); } + + return this({ + ...props, + arn: parameter.Parameter.ARN, + version: parameter.Parameter.Version, + lastModifiedDate: new Date(parameter.Parameter.LastModifiedDate), + name: parameter.Parameter.Name ?? props.name, + value: props.value, + type: parameter.Parameter.Type ?? parameterType, + } as SSMParameter); }, ); diff --git a/package.json b/package.json index 529c3dfcf..6858c2a95 100644 --- a/package.json +++ b/package.json @@ -53,5 +53,8 @@ "biome check --write --no-errors-on-unmatched", "biome check --no-errors-on-unmatched" ] + }, + "dependencies": { + "effect": "^3.16.8" } }