Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/silly-nails-tickle.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@effect/platform-node": minor
"@effect/rpc": minor
---

Align RPC middleware api; only the `wrap` variant remains, and `provides` and `requires` are supported as second generic argument <Self, Config>
8 changes: 3 additions & 5 deletions packages/ai/ai/src/McpSchema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1784,11 +1784,9 @@ export class McpServerClient extends Context.Tag("@effect/ai/McpSchema/McpServer
* @since 1.0.0
* @category McpServerClient
*/
export class McpServerClientMiddleware
extends RpcMiddleware.Tag<McpServerClientMiddleware>()("@effect/ai/McpSchema/McpServerClientMiddleware", {
provides: McpServerClient
})
{}
export class McpServerClientMiddleware extends RpcMiddleware.Tag<McpServerClientMiddleware, {
provides: McpServerClient
}>()("@effect/ai/McpSchema/McpServerClientMiddleware") {}

// =============================================================================
// Protocol
Expand Down
21 changes: 12 additions & 9 deletions packages/ai/ai/src/McpServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -338,15 +338,18 @@ export const run: (
idleTimeToLive: 10000
})

const clientMiddleware = McpServerClientMiddleware.of(({ clientId }) =>
Effect.sync(() =>
McpServerClient.of({
clientId,
getClient: RcMap.get(clients, clientId).pipe(
Effect.map(({ client }) => client)
)
})
)
const clientMiddleware = McpServerClientMiddleware.of(
(next, { clientId }) =>
Effect.provideService(
next,
McpServerClient,
McpServerClient.of({
clientId,
getClient: RcMap.get(clients, clientId).pipe(
Effect.map(({ client }) => client)
)
})
)
)

const patchedProtocol = RpcServer.Protocol.of({
Expand Down
84 changes: 72 additions & 12 deletions packages/platform-node/test/fixtures/rpc-schemas.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,31 @@ class CurrentUser extends Context.Tag("CurrentUser")<CurrentUser, User>() {}

class Unauthorized extends Schema.TaggedError<Unauthorized>("Unauthorized")("Unauthorized", {}) {}

class AuthMiddleware extends RpcMiddleware.Tag<AuthMiddleware>()("AuthMiddleware", {
provides: CurrentUser,
class AuthMiddleware extends RpcMiddleware.Tag<AuthMiddleware, { provides: CurrentUser }>()("AuthMiddleware", {
failure: Unauthorized,
requiredForClient: true
}) {}

class TimingMiddleware extends RpcMiddleware.Tag<TimingMiddleware>()("TimingMiddleware", {
wrap: true
}) {}
class TimingMiddleware extends RpcMiddleware.Tag<TimingMiddleware>()("TimingMiddleware") {}

export class Something extends Context.Tag("Something")<Something, "something">() {}
export class SomethingElse extends Context.Tag("SomethingElse")<SomethingElse, "something-else">() {}
export class SomethingElseElse extends Context.Tag("SomethingElseElse")<SomethingElseElse, "something-else-else">() {}

class SomethingMiddleware extends RpcMiddleware.Tag<
SomethingMiddleware,
{ requires: SomethingElseElse; provides: Something | SomethingElse }
>()("SomethingMiddleware") {}

class SomethingWrapMiddleware
extends RpcMiddleware.Tag<SomethingWrapMiddleware, { provides: Something | SomethingElse }>()(
"SomethingWrapMiddleware"
)
{}

class SomethingElseElseMiddleware extends RpcMiddleware.Tag<SomethingElseElseMiddleware, {
provides: SomethingElseElse
}>()("SomethingElseElseMiddleware") {}

class GetUser extends Rpc.make("GetUser", {
success: User,
Expand Down Expand Up @@ -66,19 +82,29 @@ export const UserRpcs = RpcGroup.make(
},
success: Schema.Number
}).middleware(TimingMiddleware),
Rpc.make("GetContext", {
success: Schema.Struct({
Something: Schema.String,
SomethingElse: Schema.String
})
}).middleware(SomethingMiddleware),
Rpc.make("GetTimingMiddlewareMetrics", {
success: Schema.Struct({
success: Schema.Number,
defect: Schema.Number,
count: Schema.Number
})
})
).middleware(AuthMiddleware)
)
.middleware(AuthMiddleware)
.middleware(SomethingElseElseMiddleware)

const AuthLive = Layer.succeed(
AuthMiddleware,
AuthMiddleware.of((options) =>
Effect.succeed(
AuthMiddleware.of((next, options) =>
Effect.provideService(
next,
CurrentUser,
new User({ id: options.headers.userid ?? "1", name: options.headers.name ?? "Fallback name" })
)
)
Expand All @@ -89,15 +115,47 @@ const rpcDefects = Metric.counter("rpc_middleware_defects")
const rpcCount = Metric.counter("rpc_middleware_count")
const TimingLive = Layer.succeed(
TimingMiddleware,
TimingMiddleware.of((options) =>
options.next.pipe(
TimingMiddleware.of((next) =>
next.pipe(
Effect.tap(Metric.increment(rpcSuccesses)),
Effect.tapDefect(() => Metric.increment(rpcDefects)),
Effect.ensuring(Metric.increment(rpcCount))
)
)
)

const SomethingLive = Layer.succeed(
SomethingMiddleware,
SomethingMiddleware.of((next) =>
SomethingElseElse.pipe(Effect.flatMap(() =>
Effect.provide(
next,
Context.empty().pipe(
Context.add(Something, "something"),
Context.add(SomethingElse, "something-else")
)
)
))
)
).pipe(
Layer.merge(Layer.succeed(
SomethingElseElseMiddleware,
SomethingElseElseMiddleware.of(Effect.provideService(SomethingElseElse, "something-else-else"))
))
)

Layer.succeed(
SomethingWrapMiddleware,
SomethingWrapMiddleware.of((next) =>
next.pipe(Effect.provide(
Context.empty().pipe(
Context.add(Something, "something"),
Context.add(SomethingElse, "something-else")
)
))
)
)

const UsersLive = UserRpcs.toLayer(Effect.gen(function*() {
let interrupts = 0
let emits = 0
Expand Down Expand Up @@ -135,6 +193,7 @@ const UsersLive = UserRpcs.toLayer(Effect.gen(function*() {
Never: () => Effect.never.pipe(Effect.onInterrupt(() => Effect.sync(() => interrupts++))),
"nested.test": () => Effect.void,
TimedMethod: (_) => _.shouldFail ? Effect.die("boom") : Effect.succeed(1),
GetContext: () => Effect.all({ Something, SomethingElse }),
GetTimingMiddlewareMetrics: () =>
Effect.all({
defect: Metric.value(rpcDefects).pipe(Effect.map((_) => _.count)),
Expand All @@ -148,7 +207,8 @@ export const RpcLive = RpcServer.layer(UserRpcs).pipe(
Layer.provide([
UsersLive,
AuthLive,
TimingLive
TimingLive,
SomethingLive
])
)

Expand All @@ -166,6 +226,6 @@ export class UsersClient extends Context.Tag("UsersClient")<
Layer.provide(AuthClient)
)
static layerTest = Layer.scoped(UsersClient, RpcTest.makeClient(UserRpcs)).pipe(
Layer.provide([UsersLive, AuthLive, TimingLive, AuthClient])
Layer.provide([UsersLive, AuthLive, TimingLive, SomethingLive, AuthClient])
)
}
7 changes: 7 additions & 0 deletions packages/platform-node/test/rpc-e2e.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,5 +118,12 @@ export const e2eSuite = <E>(
assert.notEqual(defect, 0)
assert.notEqual(success, 0)
}).pipe(Effect.provide(layer)))

it.effect("supports Context provide", () =>
Effect.gen(function*() {
const client = yield* UsersClient
const context = yield* client.GetContext()
assert.deepStrictEqual(context, { Something: "something", SomethingElse: "something-else" })
}).pipe(Effect.provide(layer)))
})
}
Loading