-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Outbox prisma adapter #231
base: main
Are you sure you want to change the base?
Changes from all commits
0353847
14fbfba
d55405e
e93bdad
963014a
7444fab
9c971fc
5ad155f
d9e3d41
c61533d
552e0b6
f143384
432875b
ccf29a6
3f39f92
0b7d718
c2108e3
a7a9cb0
1b3c72c
77b1d26
856ee17
45d7310
00ca6d0
33b954b
829c9ac
1f39dbf
8b818c1
ab58167
ed4ce02
bc80d7d
55bdd41
01975c0
0fc3e7f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,5 +30,9 @@ jobs: | |
run: | | ||
npm install --ignore-scripts | ||
|
||
- name: Build | ||
run: | | ||
npm run build | ||
|
||
- name: Run lint | ||
run: npm run lint |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -130,3 +130,6 @@ dist | |
.pnp.* | ||
/.idea | ||
/package-lock.json | ||
|
||
# prisma | ||
db-client |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,12 +1,15 @@ | ||
{ | ||
"$schema": "./node_modules/@biomejs/biome/configuration_schema.json", | ||
"extends": ["./node_modules/@kibertoad/biome-config/configs/biome-package.json"], | ||
"linter": { | ||
"rules": { | ||
"performance": { | ||
"noBarrelFile": "off", | ||
"noReExportAll": "off" | ||
} | ||
} | ||
} | ||
"$schema": "./node_modules/@biomejs/biome/configuration_schema.json", | ||
"extends": ["./node_modules/@kibertoad/biome-config/configs/biome-package.json"], | ||
"linter": { | ||
"rules": { | ||
"performance": { | ||
"noBarrelFile": "off", | ||
"noReExportAll": "off" | ||
} | ||
} | ||
}, | ||
"files": { | ||
"ignore": ["db-client"] | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
# outbox-prisma-adapter | ||
|
||
This package provides a Prisma adapter for the Outbox pattern. | ||
|
||
### Development | ||
|
||
#### Tests | ||
|
||
To run the tests, you need to have a PostgreSQL database running. You can use the following command to start a PostgreSQL database using Docker: | ||
|
||
```sh | ||
docker-compose up -d | ||
``` | ||
|
||
Then update Prisma client: | ||
```sh | ||
npx prisma generate --schema=./test/schema.prisma | ||
``` |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
services: | ||
|
||
postgres: | ||
image: postgres:16.2 | ||
environment: | ||
POSTGRES_USER: prisma | ||
POSTGRES_PASSWORD: prisma | ||
POSTGRES_DB: prisma | ||
ports: | ||
- 5432:5432 |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
export * from './lib/outbox-prisma-adapter' |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,190 @@ | ||
import type { | ||
OutboxAccumulator, | ||
OutboxEntry, | ||
OutboxStorage, | ||
} from '@message-queue-toolkit/outbox-core' | ||
import type { CommonEventDefinition } from '@message-queue-toolkit/schemas' | ||
import type { Prisma, PrismaClient } from '@prisma/client' | ||
|
||
export type EnrichedOutboxEntry<Event extends CommonEventDefinition> = OutboxEntry<Event> & { | ||
type: string | ||
} | ||
|
||
type ModelDelegate<Event extends CommonEventDefinition> = { | ||
create: (args: { data: EnrichedOutboxEntry<Event> }) => Promise<EnrichedOutboxEntry<Event>> | ||
findMany: (args: { | ||
where: | ||
| Partial<EnrichedOutboxEntry<Event>> | ||
| { | ||
id?: Prisma.StringFilter | ||
retryCount?: Prisma.IntFilter | ||
} | ||
}) => Promise<EnrichedOutboxEntry<Event>[]> | ||
createMany: (args: { data: EnrichedOutboxEntry<Event>[] }) => Promise<void> | ||
updateMany: (args: { | ||
where: { | ||
id: { | ||
in: string[] | ||
} | ||
} | ||
data: | ||
| Partial<OutboxEntry<Event>> | ||
| { | ||
retryCount?: number | Prisma.IntFieldUpdateOperationsInput | ||
} | ||
}) => Promise<void> | ||
} | ||
|
||
export class OutboxPrismaAdapter< | ||
SupportedEvents extends CommonEventDefinition[], | ||
ModelName extends keyof PrismaClient & string, | ||
> implements OutboxStorage<SupportedEvents> | ||
{ | ||
constructor( | ||
private readonly prisma: PrismaClient, | ||
private readonly modelName: ModelName, | ||
) {} | ||
|
||
createEntry( | ||
outboxEntry: OutboxEntry<SupportedEvents[number]>, | ||
): Promise<OutboxEntry<SupportedEvents[number]>> { | ||
const prismaModel = this.prisma[this.modelName] as unknown as ModelDelegate< | ||
SupportedEvents[number] | ||
> | ||
|
||
return prismaModel.create({ | ||
data: { | ||
id: outboxEntry.id, | ||
type: outboxEntry.event.type, | ||
created: outboxEntry.created, | ||
updated: outboxEntry.updated, | ||
event: outboxEntry.event, | ||
status: outboxEntry.status, | ||
retryCount: outboxEntry.retryCount, | ||
}, | ||
}) | ||
} | ||
|
||
async flush(outboxAccumulator: OutboxAccumulator<SupportedEvents>): Promise<void> { | ||
const entries = await outboxAccumulator.getEntries() | ||
const failedEntries = await outboxAccumulator.getFailedEntries() | ||
const prismaModel = this.prisma[this.modelName] as unknown as ModelDelegate< | ||
SupportedEvents[number] | ||
> | ||
|
||
const existingEntries = await prismaModel.findMany({ | ||
where: { | ||
id: { | ||
in: [...entries.map((entry) => entry.id), ...failedEntries.map((entry) => entry.id)], | ||
}, | ||
}, | ||
}) | ||
|
||
await this.prisma.$transaction(async (prisma) => { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡 What do you think about using There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think I discussed it with Igor, it's kibertoad namespace, so the preference is to not depend on any lokalise pacakges. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't remember this conversation, but if it happened, then I disagree with Igor from the past. We use lokalise-namespaced packages liberally in node-service-template, there is nothing wrong with it, and I don't think that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. right, it was a slightly different case, we only needed a subset of the library, of which our id-utils is basically an opinionated wrapper |
||
const prismaModel = prisma[this.modelName] as unknown as ModelDelegate< | ||
SupportedEvents[number] | ||
> | ||
await this.handleSuccesses(prismaModel, entries, existingEntries) | ||
await this.handleFailures(prismaModel, failedEntries, existingEntries) | ||
}) | ||
} | ||
|
||
private async handleSuccesses( | ||
prismaModel: ModelDelegate<SupportedEvents[number]>, | ||
entries: OutboxEntry<SupportedEvents[number]>[], | ||
existingEntries: OutboxEntry<SupportedEvents[number]>[], | ||
) { | ||
const toCreate = entries.filter( | ||
(entry) => !existingEntries.some((existingEntry) => existingEntry.id === entry.id), | ||
) | ||
const toUpdate = entries.filter((entry) => | ||
existingEntries.some((existingEntry) => existingEntry.id === entry.id), | ||
) | ||
|
||
if (toCreate.length > 0) { | ||
await prismaModel.createMany({ | ||
data: toCreate.map((entry) => ({ | ||
id: entry.id, | ||
type: entry.event.type, | ||
created: entry.created, | ||
updated: new Date(), | ||
event: entry.event, | ||
status: 'SUCCESS', | ||
retryCount: entry.retryCount, | ||
})), | ||
}) | ||
} | ||
|
||
if (toUpdate.length > 0) { | ||
await prismaModel.updateMany({ | ||
where: { | ||
id: { | ||
in: toUpdate.map((entry) => entry.id), | ||
}, | ||
}, | ||
data: { | ||
status: 'SUCCESS', | ||
updated: new Date(), | ||
}, | ||
}) | ||
} | ||
} | ||
|
||
private async handleFailures( | ||
prismaModel: ModelDelegate<SupportedEvents[number]>, | ||
entries: OutboxEntry<SupportedEvents[number]>[], | ||
existingEntries: OutboxEntry<SupportedEvents[number]>[], | ||
) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟢 This method is almost the same as |
||
const toCreate = entries.filter( | ||
(entry) => !existingEntries.some((existingEntry) => existingEntry.id === entry.id), | ||
) | ||
const toUpdate = entries.filter((entry) => | ||
existingEntries.some((existingEntry) => existingEntry.id === entry.id), | ||
) | ||
|
||
if (toCreate.length > 0) { | ||
await prismaModel.createMany({ | ||
data: toCreate.map((entry) => ({ | ||
id: entry.id, | ||
type: entry.event.type, | ||
created: entry.created, | ||
updated: new Date(), | ||
event: entry.event, | ||
status: 'FAILED', | ||
retryCount: 1, | ||
})), | ||
}) | ||
} | ||
|
||
if (toUpdate.length > 0) { | ||
await prismaModel.updateMany({ | ||
where: { | ||
id: { | ||
in: toUpdate.map((entry) => entry.id), | ||
}, | ||
}, | ||
data: { | ||
status: 'FAILED', | ||
updated: new Date(), | ||
retryCount: { | ||
increment: 1, | ||
}, | ||
}, | ||
}) | ||
} | ||
} | ||
|
||
getEntries(maxRetryCount: number): Promise<OutboxEntry<SupportedEvents[number]>[]> { | ||
const prismaModel = this.prisma[this.modelName] as unknown as ModelDelegate< | ||
SupportedEvents[number] | ||
> | ||
|
||
return prismaModel.findMany({ | ||
where: { | ||
retryCount: { | ||
lte: maxRetryCount, | ||
}, | ||
}, | ||
}) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🟡 I remember having issue with passing PrismaClient directly as a parameter on
prima-utils
package, I don't remember exactly the reason, but it was because the object we are using is the autogenerate one and not the default coming from the Prisma package. I fixed it by doing something like:There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, I am not sure if the definition of the Model name will work to have IDE autocompletion, wondering if we can make it a but more specific with something like: