Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/set interval #97

Merged
merged 3 commits into from
Nov 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ jobs:
with:
deno-version: v1.x

- name: Verify formatting
- name: Check Types
run: deno check --unstable mod.ts

- name: Check formatting
run: deno fmt --check

- name: Run linter
Expand Down
38 changes: 19 additions & 19 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
`kvdex` is a high-level abstraction layer for Deno KV with zero third-party
dependencies. It's purpose is to enhance the experience of using Deno's KV store
through additional features such as indexing, strongly typed collections, and
cron jobs, while maintaining as much of the native functionality as possible,
like atomic operations and queue listeners.
queue-based intervals, while maintaining as much of the native functionality as
possible, like atomic operations and queue listeners.

## Highlights

Expand All @@ -13,7 +13,7 @@ like atomic operations and queue listeners.
- Extensible model strategy (Zod supported)
- Segmented storage for large objects that exceed the native size limit.
- Support for pagination and filtering.
- Create repeating cron jobs.
- Set intervals built on queues.
- Message queues at database and collection level with topics.
- Support for atomic operations.

Expand Down Expand Up @@ -58,7 +58,7 @@ like atomic operations and queue listeners.
- [findUndelivered()](#findundelivered-1)
- [enqueue()](#enqueue-1)
- [listenQueue()](#listenqueue-1)
- [cron()](#cron)
- [setInterval()](#setinterval)
- [atomic()](#atomic)
- [Atomic Operations](#atomic-operations)
- [Without checking](#without-checking)
Expand All @@ -73,9 +73,9 @@ like atomic operations and queue listeners.
Collections are typed using models. Standard models can be defined using the
`model()` function. Alternatively, any object that extends or implements the
Model type can be used as a model. Zod is therefore fully compatible, without
being a dependency. The standard model uses TypeScript inference only and does
not validate any data when parsing. It is up to the devloper to choose the
strategy that fits their use case the best.
being a dependency. The standard model uses type casting only, and does not
validate any data when parsing. It is up to the developer to choose the strategy
that fits their use case the best.

**_NOTE_:** When using interfaces instead of types, they must extend the KvValue
type.
Expand Down Expand Up @@ -701,28 +701,28 @@ db.listenQueue(async (data) => {
}, { topic: "posts" })
```

### cron()
### setInterval()

Create a cron job that will run on interval, either indefinitely or until an
exit condition is met. Interval defaults to 1 hour if not set. Like with queue
listeners, multiple cron jobs can be created.
Create an interval with a callback function that can run indefinitely or until
an exit condition is met. Interval defaults to 1 hour if not set. Like with
queue listeners, multiple intervals can be created.

```ts
// Will repeat indefinitely with 1 hour interval
db.cron(() => console.log("Hello World!"))
db.setInterval(() => console.log("Hello World!"))

// First job starts with a 10 second delay, after that there is a 5 second delay between jobs
db.cron(() => console.log("I terminate after running 10 times"), {
// Delay before the first job is invoked
// First callback is invoked after a 10 second delay, after that there is a 5 second delay between callbacks
db.setInterval(() => console.log("I terminate after running 10 times"), {
// Delay before the first callback is invoked
startDelay: 10_000,

// Fixed interval
// Fixed interval of 5 seconds
interval: 5_000,

// If this is set it will override the fixed interval
setInterval: ({ count }) => count * 500
// ...or set a dynamic interval
interval: ({ count }) => count * 500

// Count starts at 0, exitOn is run before the current job
// Count starts at 0, exitOn is run before the current callback
exitOn: ({ count }) => count === 10,
})
```
Expand Down
2 changes: 1 addition & 1 deletion deno.jsonc
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"lock": false,
"tasks": {
"test": "deno lint && deno fmt && deno test -A --unstable"
"test": "deno check --unstable mod.ts && deno fmt --check && deno lint && deno test -A --unstable"
},
"fmt": {
"semiColons": false
Expand Down
114 changes: 114 additions & 0 deletions src/atomic_wrapper.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
import { ATOMIC_OPERATION_MUTATION_LIMIT } from "./constants.ts"
import { SetOptions } from "./types.ts"

export class AtomicWrapper implements Deno.AtomicOperation {
private kv: Deno.Kv
private current: Deno.AtomicOperation
private atomics: Deno.AtomicOperation[]
private count: number
private atomicBatchSize: number

constructor(
kv: Deno.Kv,
atomicBatchSize = ATOMIC_OPERATION_MUTATION_LIMIT / 4,
) {
this.kv = kv
this.current = kv.atomic()
this.atomics = []
this.count = 0
this.atomicBatchSize = atomicBatchSize
}

set(key: Deno.KvKey, value: unknown, options?: SetOptions) {
this.addMutation((op) => op.set(key, value, options))
return this
}

delete(key: Deno.KvKey) {
this.addMutation((op) => op.delete(key))
return this
}

mutate(...mutations: Deno.KvMutation[]) {
this.addMutation((op) => op.mutate(...mutations))
return this
}

check(...checks: Deno.AtomicCheck[]) {
this.addMutation((op) => op.check(...checks))
return this
}

sum(key: Deno.KvKey, n: bigint) {
this.addMutation((op) => op.sum(key, n))
return this
}

max(key: Deno.KvKey, n: bigint) {
this.addMutation((op) => op.max(key, n))
return this
}

min(key: Deno.KvKey, n: bigint): this {
this.addMutation((op) => op.min(key, n))
return this
}

enqueue(
value: unknown,
options?: {
delay?: number | undefined
keysIfUndelivered?: Deno.KvKey[] | undefined
} | undefined,
) {
this.addMutation((op) => op.enqueue(value, options))
return this
}

async commit(): Promise<Deno.KvCommitResult | Deno.KvCommitError> {
// Add curent operation to atomics list
this.atomics.push(this.current)

// Commit all operations
const settled = await Promise.allSettled(
this.atomics.map((op) => op.commit()),
)

// Check status of all commits
const success = settled.every((v) => v.status === "fulfilled")

// If successful, return commit result
if (success) {
return {
ok: true,
versionstamp: "0",
}
}

// Return commit error
return {
ok: false,
}
}

/** PRIVATE METHODS */

/**
* Add an atomic mutation within a batched operation.
*
* @param mutation - Atomic mutation.
*/
private addMutation(
mutation: (op: Deno.AtomicOperation) => Deno.AtomicOperation,
) {
// Add atomic mutation and increment count
this.current = mutation(this.current)
this.count++

// Add current operation to atomics list if batch size is reached, reset current and count
if (this.count % this.atomicBatchSize === this.atomicBatchSize - 1) {
this.atomics.push(this.current)
this.current = this.kv.atomic()
}
}
}
62 changes: 39 additions & 23 deletions src/collection.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import {
//ATOMIC_OPERATION_CONSERVATIVE_MUTATION_LIMIT,
ID_KEY_PREFIX,
KVDEX_KEY_PREFIX,
UNDELIVERED_KEY_PREFIX,
Expand Down Expand Up @@ -27,7 +28,6 @@ import type {
} from "./types.ts"
import {
allFulfilled,
atomicDelete,
createHandlerId,
createListSelector,
extendKey,
Expand All @@ -36,9 +36,11 @@ import {
isKvObject,
kvGetMany,
prepareEnqueue,
selectsAll,
} from "./utils.ts"
import { Document } from "./document.ts"
import { model } from "./model.ts"
import { AtomicWrapper } from "./atomic_wrapper.ts"

/**
* Create a collection builder function.
Expand All @@ -62,7 +64,7 @@ export function collection<const T1 extends KvValue>(
kv: Deno.Kv,
key: KvKey,
queueHandlers: Map<string, QueueMessageHandler<QueueValue>[]>,
idempotentListener: () => void,
idempotentListener: () => Promise<void>,
) =>
new Collection<T1, CollectionOptions<T1>>(
kv,
Expand All @@ -84,7 +86,7 @@ export class Collection<
const T2 extends CollectionOptions<T1>,
> {
private queueHandlers: Map<string, QueueMessageHandler<QueueValue>[]>
private idempotentListener: () => void
private idempotentListener: () => Promise<void>

protected kv: Deno.Kv

Expand All @@ -97,7 +99,7 @@ export class Collection<
key: KvKey,
model: Model<T1>,
queueHandlers: Map<string, QueueMessageHandler<QueueValue>[]>,
idempotentListener: () => void,
idempotentListener: () => Promise<void>,
options?: T2,
) {
// Set reference to queue handlers and idempotent listener
Expand Down Expand Up @@ -286,10 +288,9 @@ export class Collection<
*/
async delete(...ids: KvId[]) {
// Perform delete operation for each id
await allFulfilled(ids.map(async (id) => {
const key = extendKey(this._keys.idKey, id)
await this.kv.delete(key)
}))
const atomic = new AtomicWrapper(this.kv)
ids.forEach((id) => atomic.delete(extendKey(this._keys.idKey, id)))
await atomic.commit()
}

/**
Expand Down Expand Up @@ -417,8 +418,16 @@ export class Collection<
throw errors
}

// Return commit results
return results
// If a commit has failed, return commit error
if (!results.every((cr) => cr.ok)) {
return { ok: false }
}

// Return commit result
return {
ok: true,
versionstamp: "0",
}
}

/**
Expand All @@ -442,14 +451,7 @@ export class Collection<
*/
async deleteMany(options?: ListOptions<T1>) {
// Perform quick delete if all documents are to be deleted
if (
!options?.consistency &&
!options?.cursor &&
!options?.endId &&
!options?.startId &&
!options?.filter &&
!options?.limit
) {
if (selectsAll(options)) {
// Create list iterator and empty keys list
const iter = this.kv.list({ prefix: this._keys.baseKey }, options)
const keys: Deno.KvKey[] = []
Expand All @@ -460,7 +462,9 @@ export class Collection<
}

// Delete all keys and return
return await atomicDelete(this.kv, keys, options?.batchSize)
const atomic = new AtomicWrapper(this.kv, options?.batchSize)
keys.forEach((key) => atomic.delete(key))
await atomic.commit()
}

// Execute delete operation for each document entry
Expand Down Expand Up @@ -584,8 +588,19 @@ export class Collection<
* @returns A promise that resolves to a number representing the performed count.
*/
async count(options?: CountOptions<T1>) {
// Initiate count variable, increment for each document entry, return result
// Initiate count result
let result = 0

// Perform efficient count if counting all document entries
if (selectsAll(options)) {
const iter = this.kv.list({ prefix: this._keys.idKey }, options)
for await (const _ of iter) {
result++
}
return result
}

// Perform count using many documents handler
await this.handleMany(this._keys.idKey, () => result++, options)
return result
}
Expand Down Expand Up @@ -651,7 +666,7 @@ export class Collection<
* @param options - Queue listener options.
* @returns void.
*/
listenQueue<T extends QueueValue = QueueValue>(
async listenQueue<T extends QueueValue = QueueValue>(
handler: QueueMessageHandler<T>,
options?: QueueListenerOptions,
) {
Expand All @@ -664,7 +679,7 @@ export class Collection<
this.queueHandlers.set(handlerId, handlers)

// Activate idempotent listener
this.idempotentListener()
return await this.idempotentListener()
}

/**
Expand Down Expand Up @@ -759,7 +774,8 @@ export class Collection<
})

// Filter document and add to documents list
if (!options?.filter || options.filter(doc)) {
const filter = options?.filter
if (!filter || filter(doc)) {
docs.push(doc)
}
}
Expand Down
7 changes: 6 additions & 1 deletion src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ export const UNDELIVERED_KEY_PREFIX = "__undelivered__"
// Fixed limits
export const ATOMIC_OPERATION_MUTATION_LIMIT = 1_000

export const ATOMIC_OPERATION_SAFE_MUTATION_LIMIT = 20
export const ATOMIC_OPERATION_CONSERVATIVE_MUTATION_LIMIT = 20

export const GET_MANY_KEY_LIMIT = 10

Expand All @@ -24,3 +24,8 @@ export const LARGE_COLLECTION_STRING_LIMIT = 25_000
export const DEFAULT_CRON_INTERVAL = 60 * 60 * 1_000 // 1 hour

export const DEFAULT_CRON_RETRY = 10

// Interval constants
export const DEFAULT_INTERVAL = 60 * 60 * 1_000 // 1 hour

export const DEFAULT_INTERVAL_RETRY = 10
Loading