feat(apis/websocket): clear old client sessions and instances

This commit is contained in:
PalmDevs
2024-07-14 00:24:32 +07:00
parent 2f03800c61
commit 43bd0a021c
5 changed files with 97 additions and 79 deletions

View File

@@ -0,0 +1,63 @@
import { OEM, createWorker as createTesseractWorker } from 'tesseract.js'
import { join as joinPath } from 'path'
import { createLogger } from '@revanced/bot-shared'
import { exists as pathExists } from 'fs/promises'
import { getConfig } from './utils/config'
export const config = getConfig()
export const logger = createLogger({
level: config.logLevel === 'none' ? Number.MAX_SAFE_INTEGER : config.logLevel,
})
export const wit = {
token: process.env['WIT_AI_TOKEN']!,
async fetch(route: string, options?: RequestInit) {
const res = await fetch(`https://api.wit.ai${route}`, {
headers: {
Authorization: `Bearer ${this.token}`,
'Content-Type': 'application/json',
},
...options,
})
if (!res.ok) throw new Error(`Failed to fetch from Wit.ai: ${res.statusText} (${res.status})`)
return await res.json()
},
message(text: string) {
return this.fetch(`/message?q=${encodeURIComponent(text)}&n=8`) as Promise<WitMessageResponse>
},
async train(text: string, label: string) {
await this.fetch('/utterances', {
body: JSON.stringify([
{
text,
intent: label,
entities: [],
traits: [],
},
]),
method: 'POST',
})
},
} as const
export interface WitMessageResponse {
text: string
intents: Array<{
id: string
name: string
confidence: number
}>
}
const TesseractWorkerPath = joinPath(import.meta.dir, 'worker', 'index.js')
const TesseractCompiledWorkerExists = await pathExists(TesseractWorkerPath)
export const tesseract = await createTesseractWorker(
'eng',
OEM.DEFAULT,
TesseractCompiledWorkerExists ? { workerPath: TesseractWorkerPath } : undefined,
)

View File

@@ -2,6 +2,7 @@ import type { ClientOperation } from '@revanced/bot-shared'
import type { Logger } from '@revanced/bot-shared'
import type { Worker as TesseractWorker } from 'tesseract.js'
import type { ClientPacketObject } from '../classes/Client'
import type { WitMessageResponse } from '../context'
import type { Config } from '../utils/config'
export { default as parseTextEventHandler } from './parseText'
@@ -22,12 +23,3 @@ export type EventContext = {
logger: Logger
config: Config
}
export interface WitMessageResponse {
text: string
intents: Array<{
id: string
name: string
confidence: number
}>
}

View File

@@ -1,32 +1,17 @@
import { OEM, createWorker as createTesseractWorker } from 'tesseract.js'
import { join as joinPath } from 'path'
import { inspect as inspectObject } from 'util'
import { exists as pathExists } from 'fs/promises'
import Client from './classes/Client'
import {
type EventContext,
type WitMessageResponse,
parseImageEventHandler,
parseTextEventHandler,
trainMessageEventHandler,
} from './events'
import { type EventContext, parseImageEventHandler, parseTextEventHandler, trainMessageEventHandler } from './events'
import { DisconnectReason, HumanizedDisconnectReason, createLogger } from '@revanced/bot-shared'
import { getConfig } from './utils/config'
import { DisconnectReason, HumanizedDisconnectReason } from '@revanced/bot-shared'
import { createServer } from 'http'
import { type WebSocket, WebSocketServer } from 'ws'
import { config, logger, tesseract, wit } from './context'
// Load config, init logger, check environment
const config = getConfig()
const logger = createLogger({
level: config.logLevel === 'none' ? Number.MAX_SAFE_INTEGER : config.logLevel,
})
if (!process.env['NODE_ENV']) logger.warn('NODE_ENV not set, defaulting to `development`')
const environment = (process.env['NODE_ENV'] ?? 'development') as NodeEnvironment
@@ -43,52 +28,16 @@ if (!process.env['WIT_AI_TOKEN']) {
process.exit(1)
}
// Workers and API clients
// Handle uncaught exceptions
const TesseractWorkerPath = joinPath(import.meta.dir, 'worker', 'index.js')
const TesseractCompiledWorkerExists = await pathExists(TesseractWorkerPath)
const tesseract = await createTesseractWorker(
'eng',
OEM.DEFAULT,
TesseractCompiledWorkerExists ? { workerPath: TesseractWorkerPath } : undefined,
)
const wit = {
token: process.env['WIT_AI_TOKEN']!,
async fetch(route: string, options?: RequestInit) {
const res = await fetch(`https://api.wit.ai${route}`, {
headers: {
Authorization: `Bearer ${this.token}`,
'Content-Type': 'application/json',
},
...options,
})
if (!res.ok) throw new Error(`Failed to fetch from Wit.ai: ${res.statusText} (${res.status})`)
return await res.json()
},
message(text: string) {
return this.fetch(`/message?q=${encodeURIComponent(text)}&n=8`) as Promise<WitMessageResponse>
},
async train(text: string, label: string) {
await this.fetch('/utterances', {
body: JSON.stringify([
{
text,
intent: label,
entities: [],
traits: [],
},
]),
method: 'POST',
})
},
} as const
process.on('uncaughtException', e => logger.error('Uncaught exception:', e))
process.on('unhandledRejection', e => logger.error('Unhandled rejection:', e))
// Server logic
const clientMap = new WeakMap<WebSocket, Client>()
const clientIds = new Set<string>()
const clientToSocket = new WeakMap<Client, WebSocket>()
const socketToClient = new WeakMap<WebSocket, Client>()
const eventContext: EventContext = {
tesseract,
logger,
@@ -97,7 +46,6 @@ const eventContext: EventContext = {
}
const server = createServer()
const wss = new WebSocketServer({
// 16 KiB max payload
// A Discord message can not be longer than 4000 characters
@@ -113,17 +61,29 @@ wss.on('connection', async (socket, request) => {
return logger.warn('Connection failed because client is missing remote address')
}
const id = `${request.socket.remoteAddress}:${request.socket.remotePort}`
if (clientIds.has(id)) {
logger.warn(`Client ${id} already connected, disconnecting old session`)
const oldClient = socketToClient.get(socket)
await oldClient?.disconnect(DisconnectReason.NewConnection)
}
const client = new Client({
socket,
id: `${request.socket.remoteAddress}:${request.socket.remotePort}`,
id,
})
clientMap.set(socket, client)
socketToClient.set(socket, client)
clientToSocket.set(client, socket)
logger.debug(`Client ${client.id}'s instance has been added`)
logger.info(`New client connected with ID: ${client.id}`)
logger.info(`New client connected with ID: ${id}`)
client.on('disconnect', reason => {
clientIds.delete(client.id)
clientToSocket.delete(client)
socketToClient.delete(socket)
logger.info(
`Client ${client.id} disconnected because client ${HumanizedDisconnectReason[reason]} (${reason})`,
)
@@ -136,17 +96,16 @@ wss.on('connection', async (socket, request) => {
if (['debug', 'trace'].includes(config.logLevel)) {
logger.debug('Debug logs enabled, attaching debug events...')
client.on('message', d => logger.debug(`Message from client ${client.id}:`, d))
client.on('packet', ({ client, ...rawPacket }) =>
logger.debug(`Packet received from client ${client.id}: ${inspectObject(rawPacket)}`),
)
client.on('message', d => logger.debug(`Message from client ${client.id}:`, d))
}
} catch (e) {
if (e instanceof Error) logger.error(e.stack ?? e.message)
else logger.error(inspectObject(e))
const client = clientMap.get(socket)
const client = socketToClient.get(socket)
if (!client) {
logger.error(
@@ -164,7 +123,6 @@ wss.on('connection', async (socket, request) => {
// Start the server
server.listen(config.port, config.address)
logger.debug(`Starting with these configurations: ${inspectObject(config)}`)
const addressInfo = wss.address()

View File

@@ -26,10 +26,14 @@ enum DisconnectReason {
* The receiving end didn't have an open socket
*/
NoOpenSocket = 4003,
/**
* The client connected from another location
*/
NewConnection = 4004,
/**
* The client was not ready in time (**CLIENT-ONLY**)
*/
TooSlow = 4002,
TooSlow = 4012,
}
export default DisconnectReason

View File

@@ -12,6 +12,7 @@ const HumanizedDisconnectReason = {
[DisconnectReason.TooSlow]: 'the client was not ready in time',
[DisconnectReason.PlannedDisconnect]: 'the client has disconnected on its own',
[DisconnectReason.NoOpenSocket]: 'the receiving end did not have an open socket',
[DisconnectReason.NewConnection]: 'the client connected from another location',
} as const satisfies Record<DisconnectReason | number, string>
export default HumanizedDisconnectReason