Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Outbox prisma adapter #231

Open
wants to merge 33 commits into
base: main
Choose a base branch
from
Open

Conversation

kamilwylegala
Copy link
Collaborator

Transactional outbox pattern adapter for Prisma.

@kamilwylegala kamilwylegala force-pushed the AP-5046-outbox-prisma-adapter branch from 9ffbf8c to 963014a Compare November 26, 2024 14:33
const prismaModel: PrismaClient[typeof this.modelName] = this.prisma[this.modelName]

for (const entry of entries) {
await prismaModel.upsert({
Copy link
Owner

@kibertoad kibertoad Nov 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

upserts often have suboptimal performance with plenty of locking, can we somehow simplify this to be bulk inserts and bulk updates?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely. I'm working on making tests green. Once I cover all cases, Bulk update/insert would do the job.

Actually upsert has some weird behavior, maybe let's switch to bulk inserts/updates right away.

@kamilwylegala kamilwylegala marked this pull request as ready for review November 28, 2024 12:20
Comment on lines 9 to 18
type ModelDelegate = {
// biome-ignore lint/suspicious/noExplicitAny: <explanation>
create: (args: any) => Promise<any>
// biome-ignore lint/suspicious/noExplicitAny: <explanation>
findMany: (args: any) => Promise<any>
// biome-ignore lint/suspicious/noExplicitAny: <explanation>
createMany: (args: any) => Promise<any>
// biome-ignore lint/suspicious/noExplicitAny: <explanation>
updateMany: (args: any) => Promise<any>
}
Copy link
Collaborator

@CarlosGamero CarlosGamero Jan 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 I am not really happy with using any here, as we know the method signature could we define input and output types to have type check and IDE help?

type OutboxEntryDelegate<Event extends CommonEventDefinition> = {
  create: (args: { data: OutboxEntry<Event> }) => Promise<OutboxEntry<Event>>
  findMany: (args: { where?: Partial<OutboxEntry<Event>> }) => Promise<OutboxEntry<Event>[]>
....
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing this out. It's still a work in progress. I did any here to move quicker. I fully agree, it needs to be addressed. I will work on it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed, please take a look 😁

Comment on lines +20 to +28
export class OutboxPrismaAdapter<
SupportedEvents extends CommonEventDefinition[],
ModelName extends keyof PrismaClient & string,
> implements OutboxStorage<SupportedEvents>
{
constructor(
private readonly prisma: PrismaClient,
private readonly modelName: ModelName,
) {}
Copy link
Collaborator

@CarlosGamero CarlosGamero Jan 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 I remember having issue with passing PrismaClient directly as a parameter on prima-utils package, I don't remember exactly the reason, but it was because the object we are using is the autogenerate one and not the default coming from the Prisma package. I fixed it by doing something like:

export class OutboxPrismaAdapter<
  SupportedEvents extends CommonEventDefinition[],
  Prisma extends PrismaClient,
  ...
> implements OutboxStorage<SupportedEvents>
{
  constructor(
    private readonly prisma: Prisma,
    private readonly modelName: ModelName,
  ) {}
...
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I am not sure if the definition of the Model name will work to have IDE autocompletion, wondering if we can make it a but more specific with something like:

type PrismaModelName<T extends PrismaClient> = {
  [K in keyof T]: T[K] extends { create: Function; findMany: Function } ? K : never
}[keyof T]

export class OutboxPrismaAdapter<
  SupportedEvents extends CommonEventDefinition[],
  Prisma extends PrismaClient,
  ModelName extends PrismaModelName<Prisma>,
> implements OutboxStorage<SupportedEvents>
{
  constructor(
    private readonly prisma: Prisma,
    private readonly modelName: ModelName,
  ) {}

createEntry(
outboxEntry: OutboxEntry<SupportedEvents[number]>,
): Promise<OutboxEntry<SupportedEvents[number]>> {
const prismaModel = this.prisma[this.modelName] as unknown as ModelDelegate
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟢 I see we are doing this on all methods, could we maybe create a private method to retrieve the model delegate so we can have the casting in a single place

Comment on lines 50 to 67
async flush(outboxAccumulator: OutboxAccumulator<SupportedEvents>): Promise<void> {
const entries = await outboxAccumulator.getEntries()
const failedEntries = await outboxAccumulator.getFailedEntries()
const prismaModel = this.prisma[this.modelName] as unknown as ModelDelegate

const existingEntries = await prismaModel.findMany({
where: {
id: {
in: [...entries.map((entry) => entry.id), ...failedEntries.map((entry) => entry.id)],
},
},
})

await this.prisma.$transaction(async (prisma) => {
const prismaModel = prisma[this.modelName] as ModelDelegate
await this.handleSuccesses(prismaModel, entries, existingEntries)
await this.handleFailures(prismaModel, failedEntries, existingEntries)
})
Copy link
Collaborator

@CarlosGamero CarlosGamero Jan 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 How many entries do we expect here? wondering if chunking could be beneficial

prismaModel: ModelDelegate,
entries: OutboxEntry<SupportedEvents[number]>[],
existingEntries: OutboxEntry<SupportedEvents[number]>[],
) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟢 This method is almost the same as handleSuccesses wondering if we can combine them by adding another param like isSuccess

const prismaModel = this.prisma[this.modelName] as unknown as ModelDelegate

// @ts-ignore
const messageType = getMessageType(outboxEntry.event)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@CarlosGamero While you're looking at the PR, could you check this line? For some reason, TSLint complains about type mismatch here. I believe, the code is exactly the same as in the other packages, thus there are still compilation errors.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Of course! let me have a look :)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed in Slack, For visibility:

Most likely we don't know to save the type in a separate filed and we can save it with the event itself, but just for visibility the issue was that getMessageType is expecting a message type and we are passing an event, if I am reading it fine on code, a message is an extension of event and that's why it was failing

},
})

await this.prisma.$transaction(async (prisma) => {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 What do you think about using prismaTransaction (from @lokalise/prisma-utils) here so we can benefit from teh retry mechanism implemented there

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I discussed it with Igor, it's kibertoad namespace, so the preference is to not depend on any lokalise pacakges.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't remember this conversation, but if it happened, then I disagree with Igor from the past. We use lokalise-namespaced packages liberally in node-service-template, there is nothing wrong with it, and I don't think that @lokalise/prisma-utils is coupled to our internal specifics - as long as it supports key DBs (PostgreSQL and MySQL), it should be fine to use it.
Since this is a dedicated package for prisma adapter, having prisma-related dependencies in it should be fine

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, let's use prisma utils.

Back then, it was about using @lokalise/id-utils:

#204 (comment)

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right, it was a slightly different case, we only needed a subset of the library, of which our id-utils is basically an opinionated wrapper

@kamilwylegala kamilwylegala force-pushed the AP-5046-outbox-prisma-adapter branch from 965cc31 to bc80d7d Compare January 24, 2025 09:31
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants