Skip to content

Commit

Permalink
fix: admin gateway singleton
Browse files Browse the repository at this point in the history
  • Loading branch information
Innei committed Feb 8, 2022
1 parent a001921 commit 8f73e61
Show file tree
Hide file tree
Showing 15 changed files with 180 additions and 70 deletions.
4 changes: 4 additions & 0 deletions debug/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@
<li>
<a href="./socket-admin.html"> socket-admin </a>
</li>

<li>
<a href="./socket-test.html"> socket-test </a>
</li>
</ul>
</body>
</html>
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@
"@nestjs/testing": "8.2.6",
"@types/bcrypt": "5.0.0",
"@types/cache-manager": "3.4.2",
"@types/cron": "1.7.3",
"@types/ejs": "3.1.0",
"@types/html-minifier": "4.0.2",
"@types/ioredis": "4.28.7",
Expand All @@ -145,6 +146,7 @@
"@types/passport-jwt": "3.0.6",
"@types/ua-parser-js": "0.7.36",
"@vercel/ncc": "0.33.1",
"cron": "*",
"cross-env": "7.0.3",
"eslint": "*",
"husky": "7.0.4",
Expand Down
25 changes: 21 additions & 4 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/app.config.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
const isTEST = !!process.env.TEST
const isDev = process.env.NODE_ENV === 'development'
import cluster from 'cluster'
import { argv } from 'zx'

Expand Down
6 changes: 3 additions & 3 deletions src/app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import { ResponseInterceptor } from './common/interceptors/response.interceptor'
import { AttachHeaderTokenMiddleware } from './common/middlewares/attach-auth.middleware'
import {
DATA_DIR,
LOGGER_DIR,
LOG_DIR,
TEMP_DIR,
USER_ASSET_DIR,
} from './constants/path.constant'
Expand Down Expand Up @@ -58,8 +58,8 @@ function mkdirs() {
Logger.log(chalk.blue('数据目录已经建好: ' + DATA_DIR))
mkdirSync(TEMP_DIR, { recursive: true })
Logger.log(chalk.blue('临时目录已经建好: ' + TEMP_DIR))
mkdirSync(LOGGER_DIR, { recursive: true })
Logger.log(chalk.blue('日志目录已经建好: ' + LOGGER_DIR))
mkdirSync(LOG_DIR, { recursive: true })
Logger.log(chalk.blue('日志目录已经建好: ' + LOG_DIR))
mkdirSync(USER_ASSET_DIR, { recursive: true })
Logger.log(chalk.blue('资源目录已经建好: ' + USER_ASSET_DIR))
}
Expand Down
4 changes: 2 additions & 2 deletions src/common/filters/any-exception.filter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import { FastifyReply, FastifyRequest } from 'fastify'
import { WriteStream } from 'fs'
import { resolve } from 'path'
import { HTTP_REQUEST_TIME } from '~/constants/meta.constant'
import { LOGGER_DIR } from '~/constants/path.constant'
import { LOG_DIR } from '~/constants/path.constant'
import { REFLECTOR } from '~/constants/system.constant'
import { isDev } from '~/utils'
import { getIp } from '../../utils/ip.util'
Expand Down Expand Up @@ -53,7 +53,7 @@ export class AllExceptionsFilter implements ExceptionFilter {
if (!isDev) {
this.errorLogPipe =
this.errorLogPipe ??
fs.createWriteStream(resolve(LOGGER_DIR, 'error.log'), {
fs.createWriteStream(resolve(LOG_DIR, 'error.log'), {
flags: 'a+',
encoding: 'utf-8',
})
Expand Down
2 changes: 1 addition & 1 deletion src/constants/path.constant.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ export const DATA_DIR = isDev
: join(HOME, '.mx-space')

export const USER_ASSET_DIR = join(DATA_DIR, 'assets')
export const LOGGER_DIR = join(DATA_DIR, 'log')
export const LOG_DIR = join(DATA_DIR, 'log')

export const LOCAL_BOT_LIST_DATA_FILE_PATH = join(DATA_DIR, 'bot_list.json')

Expand Down
3 changes: 1 addition & 2 deletions src/modules/auth/auth.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import { PassportModule } from '@nestjs/passport'
import cluster from 'cluster'
import { machineIdSync } from 'node-machine-id'
import { CLUSTER, SECURITY } from '~/app.config'
import { AdminEventsGateway } from '../../processors/gateway/admin/events.gateway'
import { AuthController } from './auth.controller'
import { AuthService } from './auth.service'
import { JwtStrategy } from './jwt.strategy'
Expand Down Expand Up @@ -38,7 +37,7 @@ const jwtModule = JwtModule.registerAsync({
})
@Module({
imports: [PassportModule, jwtModule],
providers: [AuthService, JwtStrategy, AdminEventsGateway],
providers: [AuthService, JwtStrategy],
controllers: [AuthController],
exports: [JwtStrategy, AuthService, jwtModule],
})
Expand Down
2 changes: 0 additions & 2 deletions src/modules/backup/backup.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import { Readable } from 'stream'
import { Auth } from '~/common/decorator/auth.decorator'
import { HTTPDecorators } from '~/common/decorator/http.decorator'
import { ApiName } from '~/common/decorator/openapi.decorator'
import { CronService } from '~/processors/helper/helper.cron.service'
import { UploadService } from '~/processors/helper/helper.upload.service'
import { BackupService } from './backup.service'

Expand All @@ -30,7 +29,6 @@ export class BackupController {
constructor(
private readonly backupService: BackupService,
private readonly uploadService: UploadService,
private readonly cronService: CronService,
) {}

@Get('/new')
Expand Down
12 changes: 10 additions & 2 deletions src/processors/cache/cache.service.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { CACHE_MANAGER, Inject, Injectable, Logger } from '@nestjs/common'
import { Cache } from 'cache-manager'
import { Redis } from 'ioredis'
import { redisSubPub } from '~/utils/redis-subpub.util'

// Cache 客户端管理器

Expand Down Expand Up @@ -44,14 +43,23 @@ export class CacheService {
return this.cache.set(key, value, options)
}

public publish(event: string, data: any) {
public async publish(event: string, data: any) {
const { redisSubPub } = await import('../../utils/redis-subpub.util')
return redisSubPub.publish(event, data)
}

public async subscribe(event: string, callback: (data: any) => void) {
const { redisSubPub } = await import('../../utils/redis-subpub.util')

return redisSubPub.subscribe(event, callback)
}

public async unsubscribe(event: string, callback: (data: any) => void) {
const { redisSubPub } = await import('../../utils/redis-subpub.util')

return redisSubPub.unsubscribe(event, callback)
}

public getClient() {
return this.redisClient
}
Expand Down
82 changes: 34 additions & 48 deletions src/processors/gateway/admin/events.gateway.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import { Logger } from '@nestjs/common'
import { OnEvent } from '@nestjs/event-emitter'
import { JwtService } from '@nestjs/jwt'
import {
Expand All @@ -10,9 +9,12 @@ import {
WebSocketServer,
} from '@nestjs/websockets'
import { Emitter } from '@socket.io/redis-emitter'
import { resolve } from 'path'
import SocketIO, { Socket } from 'socket.io'
import { EventBusEvents } from '~/constants/event.constant'
import { LOG_DIR } from '~/constants/path.constant'
import { CacheService } from '~/processors/cache/cache.service'
import { getTodayLogFilePath } from '~/utils/consola.util'
import { AuthService } from '../../../modules/auth/auth.service'
import { BaseGateway } from '../base.gateway'
import { EventTypes } from '../events.types'
Expand All @@ -27,7 +29,6 @@ export class AdminEventsGateway
private readonly cacheService: CacheService,
) {
super()
this.bindStdOut()
}

tokenSocketIdMap = new Map<string, string>()
Expand Down Expand Up @@ -77,16 +78,41 @@ export class AdminEventsGateway
this.tokenSocketIdMap.set(token.toString(), sid)
}

subscribeSocketToHandlerMap = new Map<string, Function>()

@SubscribeMessage('log')
async subscribeStdOut(client: Socket) {
if (this.subscribeSocketToHandlerMap.has(client.id)) {
return
}

const queue = [] as Function[]
const handler = (data) => {
queue.push(() =>
client.send(this.gatewayMessageFormat(EventTypes.STDOUT, data)),
)

queue.shift()()
}

this.subscribeSocketToHandlerMap.set(client.id, handler)
this.cacheService.subscribe('log', handler)

fs.createReadStream(resolve(LOG_DIR, getTodayLogFilePath()), {
encoding: 'utf-8',
highWaterMark: 20,
}).on('data', handler)
}

@SubscribeMessage('unlog')
unsubscribeStdOut(client: Socket) {
const idx = this.subscribeStdOutClient.findIndex(
(client_) => client_ === client,
)
Logger.debug(chalk.yellow(client.id, idx))
if (~idx) {
this.subscribeStdOutClient.splice(idx, 1)
const cb = this.subscribeSocketToHandlerMap.get(client.id)
if (cb) {
this.cacheService.unsubscribe('log', cb as any)
}
this.subscribeSocketToHandlerMap.delete(client.id)
}

handleDisconnect(client: SocketIO.Socket) {
super.handleDisconnect(client)
this.unsubscribeStdOut(client)
Expand All @@ -106,46 +132,6 @@ export class AdminEventsGateway
return false
}

subscribeStdOutClient: Socket[] = []

@SubscribeMessage('log')
async subscribeStdOut(client: Socket) {
if (
this.subscribeStdOutClient.includes(client) ||
this.subscribeStdOutClient.some((client_) => client_.id === client.id)
) {
return
}
this.subscribeStdOutClient.push(client)
Logger.debug(
chalk.yellow(client.id, this.subscribeStdOutClient.length),
'SubscribeStdOut',
)
}

bindStdOut() {
const handler = (data: any) => {
this.subscribeStdOutClient.forEach((client) => {
client.send(this.gatewayMessageFormat(EventTypes.STDOUT, data))
})
}
const stream = {
stdout: process.stdout.write,
stderr: process.stderr.write,
}

process.stdout.write = function (...rest: any[]) {
handler(rest[0])

return stream.stdout.apply(this, rest)
}
process.stderr.write = function (...rest: any[]) {
handler(rest[0])

return stream.stderr.apply(this, rest)
}
}

broadcast(event: EventTypes, data: any) {
const client = new Emitter(this.cacheService.getClient())
client.of('/admin').emit('message', this.gatewayMessageFormat(event, data))
Expand Down
Loading

0 comments on commit 8f73e61

Please sign in to comment.