Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions src/components/MysqlSafeTransaction.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import { RedisCache } from "coa-redis"
import { MysqlBin } from "../libs/MysqlBin"
import { CoaMysql } from "../typings"

export class MysqlSafeTransaction {
private readonly bin: MysqlBin
private readonly cache: RedisCache

constructor(bin: MysqlBin, cache: RedisCache) {
this.bin = bin
this.cache = cache
}

async safeTransaction<T>(handler: (trx: CoaMysql.Transaction) => Promise<T>): Promise<T> {
let clearCacheNsps: any[] = []
const result = await this.bin.io.transaction(async (trx: CoaMysql.Transaction) => {
trx.__isSafeTransaction = true
trx.clearCacheNsps = []

const result = await handler(trx)

clearCacheNsps = trx.clearCacheNsps || []
return result
})

if (clearCacheNsps.length > 0) { await this.cache.mDelete(clearCacheNsps) }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

考虑下这句话能不能放到上面18行的位置,是否有必要在事务外面执行

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

放到事务内 应该还是有概率导致缓存内存在脏数据的 事务外比较保险一点

return result
}
}
10 changes: 6 additions & 4 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
export { CoaMysql } from './typings'
export { MysqlSafeTransaction } from './components/MysqlSafeTransaction'
export { MysqlStorage } from './components/MysqlStorage'
export { MysqlUuid } from './components/MysqlUuid'
export { MysqlBin } from './libs/MysqlBin'
export { MysqlNative } from './services/MysqlNative'
export { MysqlCache } from './services/MysqlCache'
export { MysqlUuid } from './components/MysqlUuid'
export { MysqlStorage } from './components/MysqlStorage'
export { MysqlNative } from './services/MysqlNative'
export { CoaMysql } from './typings'

3 changes: 1 addition & 2 deletions src/libs/MysqlBin.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import { echo } from 'coa-echo'
import { CoaError } from 'coa-error'
import { Knex } from './Knex'
import { CoaMysql } from '../typings'

import { Knex } from './Knex'
export class MysqlBin {
public io: Knex
public config: CoaMysql.Config
Expand Down
76 changes: 40 additions & 36 deletions src/services/MysqlCache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import { secure } from 'coa-secure'
import { MysqlBin } from '../libs/MysqlBin'
import { CoaMysql } from '../typings'
import { MysqlNative } from './MysqlNative'

export class MysqlCache<Scheme> extends MysqlNative<Scheme> {
redisCache: RedisCache

Expand All @@ -15,49 +14,49 @@ export class MysqlCache<Scheme> extends MysqlNative<Scheme> {
}

async insert(data: CoaMysql.SafePartial<Scheme>, trx?: CoaMysql.Transaction) {
const id = await super.insert(data, trx)
await this.deleteCache([id], [data])
const id = await super.insert(data, trx);
await this.deleteCache([id], [data], trx)
return id
}

async mInsert(dataList: Array<CoaMysql.SafePartial<Scheme>>, trx?: CoaMysql.Transaction) {
const ids = await super.mInsert(dataList, trx)
await this.deleteCache(ids, dataList)
const ids = await super.mInsert(dataList, trx);
await this.deleteCache(ids, dataList, trx)
return ids
}

async updateById(id: string, data: CoaMysql.SafePartial<Scheme>, trx?: CoaMysql.Transaction) {
const dataList = await this.getCacheChangedDataList([id], data, trx)
const result = await super.updateById(id, data, trx)
if (result) await this.deleteCache([id], dataList)
if (result) await this.deleteCache([id], dataList, trx)
return result
}

async updateByIds(ids: string[], data: CoaMysql.SafePartial<Scheme>, trx?: CoaMysql.Transaction) {
const dataList = await this.getCacheChangedDataList(ids, data, trx)
const result = await super.updateByIds(ids, data, trx)
if (result) await this.deleteCache(ids, dataList)
const result = await super.updateByIds(ids, data, trx);
if (result) await this.deleteCache(ids, dataList, trx)
return result
}

async updateForQueryById(id: string, query: CoaMysql.Query, data: CoaMysql.SafePartial<Scheme>, trx?: CoaMysql.Transaction) {
const dataList = await this.getCacheChangedDataList([id], data, trx)
const result = await super.updateForQueryById(id, query, data, trx)
if (result) await this.deleteCache([id], dataList)
const result = await super.updateForQueryById(id, query, data, trx);
if (result) await this.deleteCache([id], dataList, trx)
return result
}

async upsertById(id: string, data: CoaMysql.SafePartial<Scheme>, trx?: CoaMysql.Transaction) {
const dataList = await this.getCacheChangedDataList([id], data, trx)
const result = await super.upsertById(id, data, trx)
await this.deleteCache([id], dataList)
const result = await super.upsertById(id, data, trx);
await this.deleteCache([id], dataList, trx)
return result
}

async deleteByIds(ids: string[], trx?: CoaMysql.Transaction) {
const dataList = await this.getCacheChangedDataList(ids, undefined, trx)
const result = await super.deleteByIds(ids, trx)
if (result) await this.deleteCache(ids, dataList)
const result = await super.deleteByIds(ids, trx);
await this.deleteCache(ids, dataList, trx)
return result
}

Expand All @@ -66,16 +65,16 @@ export class MysqlCache<Scheme> extends MysqlNative<Scheme> {
}

async getById(id: string, pick = this.columns, trx?: CoaMysql.Transaction, ms = this.ms, force = false) {
const result = await this.redisCache.warp(this.getCacheNsp('id'), id, async () => await super.getById(id, this.columns, trx), ms, force)
const result = trx?.__isSafeTransaction ? await super.getById(id, this.columns, trx) : await this.redisCache.warp(this.getCacheNsp('id'), id, async () => await super.getById(id, this.columns, trx), ms, force)
return this.pickResult(result, pick)
}

async getIdBy(field: string, value: string | number, trx?: CoaMysql.Transaction) {
return await this.redisCache.warp(this.getCacheNsp('index', field), '' + value, async () => await super.getIdBy(field, value, trx))
return trx?.__isSafeTransaction ? await super.getIdBy(field, value, trx) : await this.redisCache.warp(this.getCacheNsp('index', field), '' + value, async () => await super.getIdBy(field, value, trx))
}

async mGetByIds(ids: string[], pick = this.pick, trx?: CoaMysql.Transaction, ms = this.ms, force = false) {
const result = await this.redisCache.mWarp(this.getCacheNsp('id'), ids, async ids => await super.mGetByIds(ids, this.columns, trx), ms, force)
const result = trx?.__isSafeTransaction ? await super.mGetByIds(ids, this.columns, trx) : await this.redisCache.mWarp(this.getCacheNsp('id'), ids, async ids => await super.mGetByIds(ids, this.columns, trx), ms, force)
_.forEach(result, (v, k) => {
result[k] = this.pickResult(v, pick)
})
Expand All @@ -88,48 +87,46 @@ export class MysqlCache<Scheme> extends MysqlNative<Scheme> {
}

protected async findListCount(finger: Array<CoaMysql.Dic<any>>, query: CoaMysql.Query, trx?: CoaMysql.Transaction) {
const cacheNsp = this.getCacheNsp('data')
const cacheId = 'list-count:' + secure.sha1($.sortQueryString(...finger))
return await this.redisCache.warp(cacheNsp, cacheId, async () => await super.selectListCount(query, trx))
return trx?.__isSafeTransaction ? await super.selectListCount(query, trx) : await this.redisCache.warp(this.getCacheNsp('data'), cacheId, async () => await super.selectListCount(query, trx))
}

protected async findIdList(finger: Array<CoaMysql.Dic<any>>, query: CoaMysql.Query, trx?: CoaMysql.Transaction) {
const cacheNsp = this.getCacheNsp('data')
const cacheId = 'list:' + secure.sha1($.sortQueryString(...finger))
return await this.redisCache.warp(cacheNsp, cacheId, async () => await super.selectIdList(query, trx))
return trx?.__isSafeTransaction ? await super.selectIdList(query, trx) : await this.redisCache.warp(this.getCacheNsp('data'), cacheId, async () => await super.selectIdList(query, trx))
}

protected async findIdSortList(finger: Array<CoaMysql.Dic<any>>, pager: CoaMysql.Pager, query: CoaMysql.Query, trx?: CoaMysql.Transaction) {
const cacheNsp = this.getCacheNsp('data')
const cacheId = `sort-list:${pager.rows}:${pager.last}:` + secure.sha1($.sortQueryString(...finger))
return await this.redisCache.warp(cacheNsp, cacheId, async () => await super.selectIdSortList(pager, query, trx))
return trx?.__isSafeTransaction ? await super.selectIdSortList(pager, query, trx) : await this.redisCache.warp(this.getCacheNsp('data'), cacheId, async () => await super.selectIdSortList(pager, query, trx))
}

protected async findIdViewList(finger: Array<CoaMysql.Dic<any>>, pager: CoaMysql.Pager, query: CoaMysql.Query, trx?: CoaMysql.Transaction) {
const cacheNsp = this.getCacheNsp('data')
const cacheId = `view-list:${pager.rows}:${pager.page}:` + secure.sha1($.sortQueryString(...finger))
const count = await this.findListCount(finger, query, trx)
return await this.redisCache.warp(cacheNsp, cacheId, async () => await super.selectIdViewList(pager, query, trx, count))
return trx?.__isSafeTransaction ? await super.selectIdViewList(pager, query, trx, count) : await this.redisCache.warp(this.getCacheNsp('data'), cacheId, async () => await super.selectIdViewList(pager, query, trx, count))
}

protected async mGetCountBy(field: string, ids: string[], trx?: CoaMysql.Transaction) {
const cacheNsp = this.getCacheNsp('count', field)
return await this.redisCache.mWarp(cacheNsp, ids, async ids => {
const queryFunction = async () => {
const rows = (await this.table(trx).select({ id: field }).count({ count: this.key }).whereIn(field, ids).groupBy(field)) as any[]
const result: CoaMysql.Dic<number> = {}
_.forEach(rows, ({ id, count }) => (result[id] = count))
return result
})
}
const result = trx?.__isSafeTransaction ? await queryFunction() : await this.redisCache.mWarp(this.getCacheNsp('count', field), ids, queryFunction)
return result
}

protected async getCountBy(field: string, value: string, query?: CoaMysql.Query, trx?: CoaMysql.Transaction) {
const cacheNsp = this.getCacheNsp('count', field)
return await this.redisCache.warp(cacheNsp, value, async () => {
const queryFunction = async () => {
const qb = this.table(trx).count({ count: this.key })
query ? query(qb) : qb.where(field, value)
const rows = await qb
return (rows[0]?.count as number) || 0
})
}
const result = trx?.__isSafeTransaction ? await queryFunction() : await this.redisCache.warp(this.getCacheNsp('count', field), value, queryFunction)
return result
}

protected pickResult<T>(data: T, pick: string[]) {
Expand All @@ -155,10 +152,15 @@ export class MysqlCache<Scheme> extends MysqlNative<Scheme> {
return resultList
}

protected async deleteCache(ids: string[], dataList: Array<CoaMysql.SafePartial<Scheme>>) {
async deleteCache(ids: string[], dataList: Array<CoaMysql.SafePartial<Scheme>>, trx?: CoaMysql.Transaction) {
const deleteIds = [] as CoaRedis.CacheDelete[]
deleteIds.push([this.getCacheNsp('id'), ids])
deleteIds.push([this.getCacheNsp('data'), []])
if (trx?.__isSafeTransaction) {
(trx as any)?.clearCacheNsps.push([this.getCacheNsp('id'), ids]);
(trx as any)?.clearCacheNsps.push([this.getCacheNsp('data'), []])
} else {
deleteIds.push([this.getCacheNsp('id'), ids])
deleteIds.push([this.getCacheNsp('data'), []])
}
_.forEach(this.caches, (items, name) => {
// name可能为index,count,或自定义
items.forEach(item => {
Expand All @@ -169,9 +171,11 @@ export class MysqlCache<Scheme> extends MysqlNative<Scheme> {
data?.[key] && ids.push(data[key])
})
ids.push(...keys.slice(1))
ids.length && deleteIds.push([this.getCacheNsp(name, key), ids])
if (ids.length) {
(trx?.__isSafeTransaction) ? (trx as any)?.clearCacheNsps.push([this.getCacheNsp(name, key), ids]) : deleteIds.push([this.getCacheNsp(name, key), ids])
}
})
})
await this.redisCache.mDelete(deleteIds)
if (!trx?.__isSafeTransaction) await this.redisCache.mDelete(deleteIds)
}
}
2 changes: 1 addition & 1 deletion src/test/demo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -144,4 +144,4 @@ await User.mGetByIds(['id001', 'id002'], ['name']) // 返回 {id001:{userId:'id0
await User.truncate() // 无返回值,主要不报错即成功截断整个表

// 自定义方法
await User.customMethod() // 执行自定义方法
await User.customMethod() // 执行自定义方法
85 changes: 85 additions & 0 deletions src/test/demoSafeTransaction.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/* eslint-disable @typescript-eslint/no-redeclare */
// @ts-nocheck
import { $, _ } from 'coa-helper'
import { RedisBin, RedisCache } from 'coa-redis'
import { MysqlBin, MysqlCache, MysqlSafeTransaction } from '..'
// MySQL配置
const mysqlConfig = {
host: '127.0.0.1',
port: 3306,
user: 'root',
password: '',
charset: 'utf8mb4',
trace: true,
debug: false,
databases: {
main: { database: 'coa-mysql', ms: 7 * 24 * 3600 * 1000 },
},
}

const redisConfig = {
host: '127.0.0.1',
port: 6379,
password: '',
db: 1,
prefix: 'coa-mysql',
trace: false,

}
const redisBin = new RedisBin(redisConfig)
const redisCache = new RedisCache(redisBin)

// 初始化Mysql基本连接,后续所有模型均依赖此实例
const mysqlBin = new MysqlBin(mysqlConfig)
const safeTransaction = new MysqlSafeTransaction(mysqlBin, redisCache)

const userScheme = {
userId: '' as string,
name: '' as string,
mobile: '' as string,
avatar: '' as string,
gender: 1 as number,
language: '' as string,
status: 1 as number,
created: 0 as number,
updated: 0 as number,
}


// 定义User类型(通过默认结构自动生成)
type UserScheme = typeof userScheme


// 通过基类初始化
const User = new (class extends MysqlCache<UserScheme> {
constructor() {
super(
{
name: 'User', // 表名,默认会转化为下划线(snackCase)形式,如 User->user UserPhoto->user_photo
title: '用户表', // 表的备注名称
scheme: userScheme, // 表的默认结构
pick: ['userId', 'name'], // 查询列表时显示的字段信息
caches: { index: ['name'], count: ['userId', 'name'] },

},
mysqlBin,
redisCache,
)
}
})()

const a = async () => {
await User.checkById('411******728253X')
await $.timeout(3000)
await safeTransaction.safeTransaction(async (trx: CoaMysql.Transaction) => {
await User.updateById('411******728253X', { name: 'mmm' })
for (let index = 0; index < 10; index++) {
const userId = `${_.now()}-${index}-Y`
await User.insert({ userId, name: 'heyifan2' }, trx)
}
})
await $.timeout(3000)
const b = await User.checkById('411******728253X')
console.log(b);
}
a()
5 changes: 4 additions & 1 deletion src/typings.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ export namespace CoaMysql {
export type SafePartial<T> = T extends {} ? Partial<T> : any
export type Query = (qb: Knex.QueryBuilder) => void
export type QueryBuilder = Knex.QueryBuilder
export type Transaction = Knex.Transaction
export interface Transaction extends Knex.Transaction {
__isSafeTransaction?: boolean
clearCacheNsps?: any[]
}
export interface Pager {
rows: number
last: number
Expand Down