diff --git a/.changeset/silly-nails-tickle.md b/.changeset/silly-nails-tickle.md new file mode 100644 index 00000000000..691a0cc4f88 --- /dev/null +++ b/.changeset/silly-nails-tickle.md @@ -0,0 +1,6 @@ +--- +"@effect/platform-node": minor +"@effect/rpc": minor +--- + +Align RPC middleware api; only the `wrap` variant remains, and `provides` and `requires` are supported as second generic argument diff --git a/packages/ai/ai/src/McpSchema.ts b/packages/ai/ai/src/McpSchema.ts index f0eec33d5c5..1d379babcc0 100644 --- a/packages/ai/ai/src/McpSchema.ts +++ b/packages/ai/ai/src/McpSchema.ts @@ -1784,11 +1784,9 @@ export class McpServerClient extends Context.Tag("@effect/ai/McpSchema/McpServer * @since 1.0.0 * @category McpServerClient */ -export class McpServerClientMiddleware - extends RpcMiddleware.Tag()("@effect/ai/McpSchema/McpServerClientMiddleware", { - provides: McpServerClient - }) -{} +export class McpServerClientMiddleware extends RpcMiddleware.Tag()("@effect/ai/McpSchema/McpServerClientMiddleware") {} // ============================================================================= // Protocol diff --git a/packages/ai/ai/src/McpServer.ts b/packages/ai/ai/src/McpServer.ts index 88535626d6a..a68e695b82e 100644 --- a/packages/ai/ai/src/McpServer.ts +++ b/packages/ai/ai/src/McpServer.ts @@ -338,15 +338,18 @@ export const run: ( idleTimeToLive: 10000 }) - const clientMiddleware = McpServerClientMiddleware.of(({ clientId }) => - Effect.sync(() => - McpServerClient.of({ - clientId, - getClient: RcMap.get(clients, clientId).pipe( - Effect.map(({ client }) => client) - ) - }) - ) + const clientMiddleware = McpServerClientMiddleware.of( + (next, { clientId }) => + Effect.provideService( + next, + McpServerClient, + McpServerClient.of({ + clientId, + getClient: RcMap.get(clients, clientId).pipe( + Effect.map(({ client }) => client) + ) + }) + ) ) const patchedProtocol = RpcServer.Protocol.of({ diff --git a/packages/platform-node/test/fixtures/rpc-schemas.ts b/packages/platform-node/test/fixtures/rpc-schemas.ts index 14e8964110a..e5f4a7bfe7a 100644 --- a/packages/platform-node/test/fixtures/rpc-schemas.ts +++ b/packages/platform-node/test/fixtures/rpc-schemas.ts @@ -29,15 +29,31 @@ class CurrentUser extends Context.Tag("CurrentUser")() {} class Unauthorized extends Schema.TaggedError("Unauthorized")("Unauthorized", {}) {} -class AuthMiddleware extends RpcMiddleware.Tag()("AuthMiddleware", { - provides: CurrentUser, +class AuthMiddleware extends RpcMiddleware.Tag()("AuthMiddleware", { failure: Unauthorized, requiredForClient: true }) {} -class TimingMiddleware extends RpcMiddleware.Tag()("TimingMiddleware", { - wrap: true -}) {} +class TimingMiddleware extends RpcMiddleware.Tag()("TimingMiddleware") {} + +export class Something extends Context.Tag("Something")() {} +export class SomethingElse extends Context.Tag("SomethingElse")() {} +export class SomethingElseElse extends Context.Tag("SomethingElseElse")() {} + +class SomethingMiddleware extends RpcMiddleware.Tag< + SomethingMiddleware, + { requires: SomethingElseElse; provides: Something | SomethingElse } +>()("SomethingMiddleware") {} + +class SomethingWrapMiddleware + extends RpcMiddleware.Tag()( + "SomethingWrapMiddleware" + ) +{} + +class SomethingElseElseMiddleware extends RpcMiddleware.Tag()("SomethingElseElseMiddleware") {} class GetUser extends Rpc.make("GetUser", { success: User, @@ -66,6 +82,12 @@ export const UserRpcs = RpcGroup.make( }, success: Schema.Number }).middleware(TimingMiddleware), + Rpc.make("GetContext", { + success: Schema.Struct({ + Something: Schema.String, + SomethingElse: Schema.String + }) + }).middleware(SomethingMiddleware), Rpc.make("GetTimingMiddlewareMetrics", { success: Schema.Struct({ success: Schema.Number, @@ -73,12 +95,16 @@ export const UserRpcs = RpcGroup.make( count: Schema.Number }) }) -).middleware(AuthMiddleware) +) + .middleware(AuthMiddleware) + .middleware(SomethingElseElseMiddleware) const AuthLive = Layer.succeed( AuthMiddleware, - AuthMiddleware.of((options) => - Effect.succeed( + AuthMiddleware.of((next, options) => + Effect.provideService( + next, + CurrentUser, new User({ id: options.headers.userid ?? "1", name: options.headers.name ?? "Fallback name" }) ) ) @@ -89,8 +115,8 @@ const rpcDefects = Metric.counter("rpc_middleware_defects") const rpcCount = Metric.counter("rpc_middleware_count") const TimingLive = Layer.succeed( TimingMiddleware, - TimingMiddleware.of((options) => - options.next.pipe( + TimingMiddleware.of((next) => + next.pipe( Effect.tap(Metric.increment(rpcSuccesses)), Effect.tapDefect(() => Metric.increment(rpcDefects)), Effect.ensuring(Metric.increment(rpcCount)) @@ -98,6 +124,38 @@ const TimingLive = Layer.succeed( ) ) +const SomethingLive = Layer.succeed( + SomethingMiddleware, + SomethingMiddleware.of((next) => + SomethingElseElse.pipe(Effect.flatMap(() => + Effect.provide( + next, + Context.empty().pipe( + Context.add(Something, "something"), + Context.add(SomethingElse, "something-else") + ) + ) + )) + ) +).pipe( + Layer.merge(Layer.succeed( + SomethingElseElseMiddleware, + SomethingElseElseMiddleware.of(Effect.provideService(SomethingElseElse, "something-else-else")) + )) +) + +Layer.succeed( + SomethingWrapMiddleware, + SomethingWrapMiddleware.of((next) => + next.pipe(Effect.provide( + Context.empty().pipe( + Context.add(Something, "something"), + Context.add(SomethingElse, "something-else") + ) + )) + ) +) + const UsersLive = UserRpcs.toLayer(Effect.gen(function*() { let interrupts = 0 let emits = 0 @@ -135,6 +193,7 @@ const UsersLive = UserRpcs.toLayer(Effect.gen(function*() { Never: () => Effect.never.pipe(Effect.onInterrupt(() => Effect.sync(() => interrupts++))), "nested.test": () => Effect.void, TimedMethod: (_) => _.shouldFail ? Effect.die("boom") : Effect.succeed(1), + GetContext: () => Effect.all({ Something, SomethingElse }), GetTimingMiddlewareMetrics: () => Effect.all({ defect: Metric.value(rpcDefects).pipe(Effect.map((_) => _.count)), @@ -148,7 +207,8 @@ export const RpcLive = RpcServer.layer(UserRpcs).pipe( Layer.provide([ UsersLive, AuthLive, - TimingLive + TimingLive, + SomethingLive ]) ) @@ -166,6 +226,6 @@ export class UsersClient extends Context.Tag("UsersClient")< Layer.provide(AuthClient) ) static layerTest = Layer.scoped(UsersClient, RpcTest.makeClient(UserRpcs)).pipe( - Layer.provide([UsersLive, AuthLive, TimingLive, AuthClient]) + Layer.provide([UsersLive, AuthLive, TimingLive, SomethingLive, AuthClient]) ) } diff --git a/packages/platform-node/test/rpc-e2e.ts b/packages/platform-node/test/rpc-e2e.ts index df25cc805d6..6f83335c1b7 100644 --- a/packages/platform-node/test/rpc-e2e.ts +++ b/packages/platform-node/test/rpc-e2e.ts @@ -118,5 +118,12 @@ export const e2eSuite = ( assert.notEqual(defect, 0) assert.notEqual(success, 0) }).pipe(Effect.provide(layer))) + + it.effect("supports Context provide", () => + Effect.gen(function*() { + const client = yield* UsersClient + const context = yield* client.GetContext() + assert.deepStrictEqual(context, { Something: "something", SomethingElse: "something-else" }) + }).pipe(Effect.provide(layer))) }) } diff --git a/packages/rpc/src/Rpc.ts b/packages/rpc/src/Rpc.ts index 6365a313954..d62f46264b0 100644 --- a/packages/rpc/src/Rpc.ts +++ b/packages/rpc/src/Rpc.ts @@ -48,7 +48,8 @@ export interface Rpc< out Payload extends AnySchema = typeof Schema.Void, out Success extends Schema.Schema.Any = typeof Schema.Void, out Error extends Schema.Schema.All = typeof Schema.Never, - out Middleware extends RpcMiddleware.TagClassAny = never + out Middleware extends RpcMiddleware.TagClassAny = never, + out Requirements = never > extends Pipeable { new(_: never): {} @@ -69,7 +70,8 @@ export interface Rpc< Payload, S, Error, - Middleware + Middleware, + Requirements > /** @@ -80,7 +82,8 @@ export interface Rpc< Payload, Success, E, - Middleware + Middleware, + Requirements > /** @@ -93,7 +96,8 @@ export interface Rpc< P extends Schema.Struct ? P : P extends Schema.Struct.Fields ? Schema.Struct

: never, Success, Error, - Middleware + Middleware, + Requirements > /** @@ -104,7 +108,8 @@ export interface Rpc< Payload, Success, Error, - Middleware | M + Middleware | M, + Exclude> | RpcMiddleware.TagClass.ExtractRequires > /** @@ -115,7 +120,8 @@ export interface Rpc< Payload, Success, Error, - Middleware + Middleware, + Requirements > /** @@ -124,14 +130,14 @@ export interface Rpc< annotate( tag: Context_.Tag, value: S - ): Rpc + ): Rpc /** * Merge the annotations of the rpc with the provided context. */ annotateContext( context: Context_.Context - ): Rpc + ): Rpc } /** @@ -184,7 +190,8 @@ export type Tag = R extends Rpc< infer _Payload, infer _Success, infer _Error, - infer _Middleware + infer _Middleware, + infer _Requirements > ? _Tag : never @@ -197,7 +204,8 @@ export type Success = R extends Rpc< infer _Payload, infer _Success, infer _Error, - infer _Middleware + infer _Middleware, + infer _Requirements > ? _Success["Type"] : never @@ -210,7 +218,8 @@ export type SuccessEncoded = R extends Rpc< infer _Payload, infer _Success, infer _Error, - infer _Middleware + infer _Middleware, + infer _Requirements > ? _Success["Encoded"] : never @@ -250,7 +259,8 @@ export type ErrorSchema = R extends Rpc< infer _Payload, infer _Success, infer _Error, - infer _Middleware + infer _Middleware, + infer _Requirements > ? _Error | _Middleware : never @@ -300,7 +310,8 @@ export type PayloadConstructor = R extends Rpc< infer _Payload, infer _Success, infer _Error, - infer _Middleware + infer _Middleware, + infer _Requirements > ? _Payload extends { readonly fields: Schema.Struct.Fields } ? Schema.Simplify> @@ -316,7 +327,8 @@ export type Payload = R extends Rpc< infer _Payload, infer _Success, infer _Error, - infer _Middleware + infer _Middleware, + infer _Requirements > ? _Payload["Type"] : never @@ -329,7 +341,8 @@ export type Context = R extends Rpc< infer _Payload, infer _Success, infer _Error, - infer _Middleware + infer _Middleware, + infer _Requirements > ? _Payload["Context"] | _Success["Context"] | _Error["Context"] : never @@ -342,7 +355,8 @@ export type Middleware = R extends Rpc< infer _Payload, infer _Success, infer _Error, - infer _Middleware + infer _Middleware, + infer _Requirements > ? Context_.Tag.Identifier<_Middleware> : never @@ -355,7 +369,8 @@ export type MiddlewareClient = R extends Rpc< infer _Payload, infer _Success, infer _Error, - infer _Middleware + infer _Middleware, + infer _Requirements > ? _Middleware extends { readonly requiredForClient: true } ? RpcMiddleware.ForClient> @@ -371,13 +386,15 @@ export type AddError = R extends infer _Payload, infer _Success, infer _Error, - infer _Middleware + infer _Middleware, + infer _Requirements > ? Rpc< _Tag, _Payload, _Success, _Error | Error, - _Middleware + _Middleware, + _Requirements > : never @@ -390,13 +407,16 @@ export type AddMiddleware ? Rpc< _Tag, _Payload, _Success, _Error, - _Middleware | Middleware + _Middleware | Middleware, + | Exclude<_Requirements, RpcMiddleware.TagClass.ExtractProvides> + | RpcMiddleware.TagClass.ExtractRequires > : never @@ -409,7 +429,8 @@ export type ToHandler = R extends Rpc< infer _Payload, infer _Success, infer _Error, - infer _Middleware + infer _Middleware, + infer _Requirements > ? Handler<_Tag> : never @@ -430,24 +451,33 @@ export type ToHandlerFn = ( * @category models */ export type IsStream = R extends - Rpc, infer _Error, infer _Middleware> ? true : never + Rpc, infer _Error, infer _Middleware, infer _Requirements> ? + true : + never /** * @since 1.0.0 * @category models */ export type ExtractTag = R extends - Rpc ? R : never + Rpc ? R : never /** * @since 1.0.0 * @category models */ export type ExtractProvides = R extends - Rpc ? _Middleware extends { - readonly provides: Context_.Tag - } ? _I : - never : + Rpc ? + RpcMiddleware.TagClass.ExtractProvides<_Middleware> : + never + +/** + * @since 1.0.0 + * @category models + */ +export type ExtractRequires = R extends + Rpc ? + RpcMiddleware.TagClass.ExtractRequires<_Middleware> : never /** @@ -474,7 +504,8 @@ export type ResultFrom = R extends Rpc< infer _Payload, infer _Success, infer _Error, - infer _Middleware + infer _Middleware, + infer _Requirements > ? [_Success] extends [RpcSchema.Stream] ? | Stream< _SA["Type"], @@ -502,13 +533,15 @@ export type Prefixed = Rpcs extends Rpc infer _Payload, infer _Success, infer _Error, - infer _Middleware + infer _Middleware, + infer _Requirements > ? Rpc< `${Prefix}${_Tag}`, _Payload, _Success, _Error, - _Middleware + _Middleware, + _Requirements > : never @@ -597,7 +630,8 @@ const makeProto = < Payload extends Schema.Schema.Any, Success extends Schema.Schema.Any, Error extends Schema.Schema.All, - Middleware extends RpcMiddleware.TagClassAny + Middleware extends RpcMiddleware.TagClassAny, + Requirements >(options: { readonly _tag: Tag readonly payloadSchema: Payload @@ -605,7 +639,7 @@ const makeProto = < readonly errorSchema: Error readonly annotations: Context_.Context readonly middlewares: ReadonlySet -}): Rpc => { +}): Rpc => { function Rpc() {} Object.setPrototypeOf(Rpc, Proto) Object.assign(Rpc, options) diff --git a/packages/rpc/src/RpcClient.ts b/packages/rpc/src/RpcClient.ts index ea62186c1c8..416ee4137cb 100644 --- a/packages/rpc/src/RpcClient.ts +++ b/packages/rpc/src/RpcClient.ts @@ -116,7 +116,8 @@ export declare namespace RpcClient { infer _Payload, infer _Success, infer _Error, - infer _Middleware + infer _Middleware, + infer _Requirements > ? [_Success] extends [RpcSchema.Stream] ? AsMailbox extends true ? Effect.Effect< Mailbox.ReadonlyMailbox<_A["Type"], _E["Type"] | _Error["Type"] | E | _Middleware["failure"]["Type"]>, never, @@ -166,7 +167,8 @@ export declare namespace RpcClient { infer _Payload, infer _Success, infer _Error, - infer _Middleware + infer _Middleware, + infer _Requirements > ? [_Success] extends [RpcSchema.Stream] ? AsMailbox extends true ? Effect.Effect< Mailbox.ReadonlyMailbox<_A["Type"], _E["Type"] | _Error["Type"] | E | _Middleware["failure"]["Type"]>, never, diff --git a/packages/rpc/src/RpcGroup.ts b/packages/rpc/src/RpcGroup.ts index fed9eaf0ba4..f0242bab7cc 100644 --- a/packages/rpc/src/RpcGroup.ts +++ b/packages/rpc/src/RpcGroup.ts @@ -95,6 +95,8 @@ export interface RpcGroup extends Pipeable { ): Layer.Layer< Rpc.ToHandler, EX, + | (R extends Rpc.Rpc ? _F : + never) | Exclude | HandlersContext > @@ -117,6 +119,8 @@ export interface RpcGroup extends Pipeable { ): Layer.Layer< Rpc.Handler, EX, + | (R extends Rpc.Rpc ? _F : + never) | Exclude | HandlerContext > diff --git a/packages/rpc/src/RpcMiddleware.ts b/packages/rpc/src/RpcMiddleware.ts index 45cb9b45a36..d96b16245e0 100644 --- a/packages/rpc/src/RpcMiddleware.ts +++ b/packages/rpc/src/RpcMiddleware.ts @@ -2,6 +2,7 @@ * @since 1.0.0 */ import type { Headers } from "@effect/platform/Headers" +import type { NonEmptyReadonlyArray } from "effect/Array" import * as Context from "effect/Context" import * as Effect from "effect/Effect" import * as Layer from "effect/Layer" @@ -27,27 +28,16 @@ export type TypeId = typeof TypeId * @since 1.0.0 * @category models */ -export interface RpcMiddleware { - (options: { - readonly clientId: number - readonly rpc: Rpc.AnyWithProps - readonly payload: unknown - readonly headers: Headers - }): Effect.Effect -} - -/** - * @since 1.0.0 - * @category models - */ -export interface RpcMiddlewareWrap { - (options: { - readonly clientId: number - readonly rpc: Rpc.AnyWithProps - readonly payload: unknown - readonly headers: Headers - readonly next: Effect.Effect - }): Effect.Effect +export interface RpcMiddleware { + ( + next: Effect.Effect, + options: { + readonly clientId: number + readonly rpc: Rpc.AnyWithProps + readonly payload: unknown + readonly headers: Headers + } + ): Effect.Effect } /** @@ -98,20 +88,22 @@ export interface Any { export interface TagClass< Self, Name extends string, - Options + Options, + Config extends { + requires?: any + provides?: any + } > extends TagClass.Base< Self, Name, Options, - TagClass.Wrap extends true ? RpcMiddlewareWrap< - TagClass.Provides, - TagClass.Failure - > : - RpcMiddleware< - TagClass.Service, - TagClass.FailureService - > + RpcMiddleware< + "provides" extends keyof Config ? Config["provides"] : never, + TagClass.Failure, + "requires" extends keyof Config ? Config["requires"] : never + >, + Config > {} @@ -128,6 +120,23 @@ export declare namespace TagClass { readonly provides: Context.Tag readonly optional?: false } ? Context.Tag.Identifier + : Options extends { + readonly provides: NonEmptyReadonlyArray> + readonly optional?: false + } ? Context.Tag.Identifier + : never + + /** + * @since 1.0.0 + * @category models + */ + export type Requires = Options extends { + readonly requires: Context.Tag + } ? Context.Tag.Identifier + : Options extends { + readonly requires: NonEmptyReadonlyArray> + readonly optional?: false + } ? Context.Tag.Identifier : never /** @@ -136,6 +145,8 @@ export declare namespace TagClass { */ export type Service = Options extends { readonly provides: Context.Tag } ? Context.Tag.Service + : Options extends { readonly provides: NonEmptyReadonlyArray> } ? + Context.Context> : void /** @@ -182,21 +193,41 @@ export declare namespace TagClass { * @since 1.0.0 * @category models */ - export type Wrap = Options extends { readonly wrap: true } ? true : false + export type ExtractProvides = M extends { + readonly provides: infer _I + } ? _I : + never + + /** + * @since 1.0.0 + * @category models + */ + export type ExtractRequires = M extends { + readonly requires: infer _I + } ? _I : + never /** * @since 1.0.0 * @category models */ - export interface Base extends Context.Tag { + export interface Base< + Self, + Name extends string, + Options, + Service, + Config extends { + requires?: any + provides?: any + } + > extends Context.Tag { new(_: never): Context.TagClassShape readonly [TypeId]: TypeId readonly optional: Optional readonly failure: FailureSchema - readonly provides: Options extends { readonly provides: Context.Tag } ? Options["provides"] - : undefined + readonly provides: "provides" extends keyof Config ? Config["provides"] : never + readonly requires: "requires" extends keyof Config ? Config["requires"] : never readonly requiredForClient: RequiredForClient - readonly wrap: Wrap } } @@ -207,50 +238,52 @@ export declare namespace TagClass { export interface TagClassAny extends Context.Tag { readonly [TypeId]: TypeId readonly optional: boolean - readonly provides?: Context.Tag | undefined + readonly provides?: any | undefined + readonly requires?: any | undefined readonly failure: Schema.Schema.All readonly requiredForClient: boolean - readonly wrap: boolean } /** * @since 1.0.0 * @category models */ -export interface TagClassAnyWithProps extends Context.Tag | RpcMiddlewareWrap> { +export interface TagClassAnyWithProps extends Context.Tag> { readonly [TypeId]: TypeId readonly optional: boolean - readonly provides?: Context.Tag + readonly provides?: any | undefined + readonly requires?: any | undefined readonly failure: Schema.Schema.All readonly requiredForClient: boolean - readonly wrap: boolean } /** * @since 1.0.0 * @category tags */ -export const Tag = (): < +export const Tag = < + Self, + Config extends { + requires?: any + provides?: any + } = { requires: never; provides: never } +>(): < const Name extends string, const Options extends { - readonly wrap?: boolean readonly optional?: boolean readonly failure?: Schema.Schema.All - readonly provides?: Context.Tag readonly requiredForClient?: boolean } >( id: Name, options?: Options | undefined -) => TagClass => +) => TagClass => ( id: string, options?: { readonly optional?: boolean readonly failure?: Schema.Schema.All - readonly provides?: Context.Tag readonly requiredForClient?: boolean - readonly wrap?: boolean } ) => { const Err = globalThis.Error as any @@ -270,12 +303,8 @@ export const Tag = (): < }) TagClass_[TypeId] = TypeId TagClass_.failure = options?.optional === true || options?.failure === undefined ? Schema.Never : options.failure - if (options?.provides) { - TagClass_.provides = options.provides - } TagClass_.optional = options?.optional ?? false TagClass_.requiredForClient = options?.requiredForClient ?? false - TagClass_.wrap = options?.wrap ?? false return TagClass as any } diff --git a/packages/rpc/src/RpcServer.ts b/packages/rpc/src/RpcServer.ts index 3d9276a9b2a..c7b61c30473 100644 --- a/packages/rpc/src/RpcServer.ts +++ b/packages/rpc/src/RpcServer.ts @@ -49,7 +49,6 @@ import type { RequestEncoded } from "./RpcMessage.js" import { constEof, constPong, RequestId, ResponseDefectEncoded } from "./RpcMessage.js" -import type { RpcMiddleware } from "./RpcMiddleware.js" import * as RpcSchema from "./RpcSchema.js" import * as RpcSerialization from "./RpcSerialization.js" import type { InitialMessage } from "./RpcWorker.js" @@ -431,24 +430,8 @@ const applyMiddleware = ( } for (const tag of rpc.middlewares) { - if (tag.wrap) { - const middleware = Context.unsafeGet(context, tag) - handler = middleware({ ...options, next: handler as any }) - } else if (tag.optional) { - const middleware = Context.unsafeGet(context, tag) as RpcMiddleware - const previous = handler - handler = Effect.matchEffect(middleware(options), { - onFailure: () => previous, - onSuccess: tag.provides !== undefined - ? (value) => Effect.provideService(previous, tag.provides as any, value) - : (_) => previous - }) - } else { - const middleware = Context.unsafeGet(context, tag) as RpcMiddleware - handler = tag.provides !== undefined - ? Effect.provideServiceEffect(handler, tag.provides as any, middleware(options)) - : Effect.zipRight(middleware(options), handler) - } + const middleware = Context.unsafeGet(context, tag) + handler = middleware(handler as any, { ...options }) as any } return handler