feat!: big feature changes

BREAKING CHANGES:
- Heartbeating removed
- `config.consoleLogLevel` -> `config.logLevel`

NEW FEATURES:
- Training messages
- Sequence number system
- WebSocket close codes used instead of disconnect packets

FIXES:
- Improved error handling
- Some performance improvements
- Made code more clean
- Updated dependencies
This commit is contained in:
PalmDevs
2024-03-28 21:41:59 +07:00
parent 77f1a9cb3e
commit b3b7723b4f
33 changed files with 562 additions and 506 deletions

View File

@@ -2,7 +2,7 @@ import { EventEmitter } from 'events'
import {
ClientOperation,
DisconnectReason,
Packet,
type Packet,
ServerOperation,
deserializePacket,
isClientPacket,
@@ -17,38 +17,30 @@ export default class Client {
id: string
disconnected: DisconnectReason | false = false
ready = false
currentSequence = 0
lastHeartbeat: number = null!
heartbeatInterval: number
#hbTimeout: NodeJS.Timeout = null!
#emitter = new EventEmitter() as TypedEmitter<ClientEventHandlers>
#socket: WebSocket
constructor(options: ClientOptions) {
this.#socket = options.socket
this.heartbeatInterval = options.heartbeatInterval ?? 60000
this.id = options.id
this.#socket.on('error', () => this.forceDisconnect())
this.#socket.on('close', () => this.forceDisconnect())
this.#socket.on('unexpected-response', () => this.forceDisconnect())
this.#socket.on('error', () => this.disconnect(DisconnectReason.ServerError))
this.#socket.on('close', code => this._handleDisconnect(code))
this.#socket.on('unexpected-response', () => this.disconnect(DisconnectReason.InvalidPacket))
this.send({
op: ServerOperation.Hello,
d: {
heartbeatInterval: this.heartbeatInterval,
},
d: null,
})
.then(() => {
this.#listen()
this.#listenHeartbeat()
this._listen()
this.ready = true
this.#emitter.emit('ready')
})
.catch(() => {
if (this.disconnected === false) this.disconnect(DisconnectReason.ServerError)
else this.forceDisconnect(DisconnectReason.ServerError)
this.disconnect(DisconnectReason.ServerError)
})
}
@@ -64,54 +56,40 @@ export default class Client {
this.#emitter.off(name, handler)
}
send<TOp extends ServerOperation>(packet: Packet<TOp>) {
send<TOp extends ServerOperation>(packet: Omit<Packet<TOp>, 's'>, sequence?: number) {
return new Promise<void>((resolve, reject) => {
try {
this.#throwIfDisconnected('Cannot send packet to client that has already disconnected')
this.#socket.send(serializePacket(packet))
resolve()
} catch (e) {
reject(e)
}
this.#throwIfDisconnected('Cannot send packet to client that has already disconnected')
this.#socket.send(
serializePacket({ ...packet, s: sequence ?? this.currentSequence++ } as Packet<TOp>),
err => (err ? reject(err) : resolve()),
)
})
}
async disconnect(reason: DisconnectReason = DisconnectReason.Generic) {
async disconnect(reason: DisconnectReason | number = DisconnectReason.Generic) {
this.#throwIfDisconnected('Cannot disconnect client that has already disconnected')
try {
await this.send({ op: ServerOperation.Disconnect, d: { reason } })
} catch (err) {
throw new Error(`Cannot send disconnect reason to client ${this.id}: ${err}`)
} finally {
this.forceDisconnect(reason)
}
}
forceDisconnect(reason: DisconnectReason = DisconnectReason.Generic) {
if (this.disconnected !== false) return
// It's so weird because if I moved this down a few lines
// it would just fire the disconnect event twice because of a race condition
this.disconnected = reason
this.ready = false
if (this.#hbTimeout) clearTimeout(this.#hbTimeout)
this.#socket.close()
this.#emitter.emit('disconnect', reason)
this.#socket.close(reason)
this._handleDisconnect(reason)
}
#throwIfDisconnected(errorMessage: string) {
if (this.disconnected !== false) throw new Error(errorMessage)
if (this.#socket.readyState !== this.#socket.OPEN) {
this.forceDisconnect(DisconnectReason.Generic)
this.#socket.close(DisconnectReason.NoOpenSocket)
throw new Error(errorMessage)
}
}
#listen() {
protected _handleDisconnect(code: number) {
this.disconnected = code
this.ready = false
this.#emitter.emit('disconnect', code)
}
protected _listen() {
this.#socket.on('message', data => {
this.#emitter.emit('message', data)
try {
@@ -136,38 +114,6 @@ export default class Client {
})
}
#listenHeartbeat() {
this.lastHeartbeat = Date.now()
this.#startHeartbeatTimeout()
this.on('heartbeat', () => {
this.lastHeartbeat = Date.now()
this.#hbTimeout.refresh()
this.send({
op: ServerOperation.HeartbeatAck,
d: {
nextHeartbeat: this.lastHeartbeat + this.heartbeatInterval,
},
}).catch(() => {})
})
}
#startHeartbeatTimeout() {
this.#hbTimeout = setTimeout(() => {
if (Date.now() - this.lastHeartbeat > 0) {
// TODO: put into config
// 5000 is extra time to account for latency
const interval = setTimeout(() => this.disconnect(DisconnectReason.TimedOut), 5000)
this.once('heartbeat', () => clearTimeout(interval))
// This should never happen but it did in my testing so I'm adding this just in case
this.once('disconnect', () => clearTimeout(interval))
// Technically we don't have to do this, but JUST IN CASE!
} else this.#hbTimeout.refresh()
}, this.heartbeatInterval)
}
protected _toBuffer(data: RawData) {
if (data instanceof Buffer) return data
if (data instanceof ArrayBuffer) return Buffer.from(data)
@@ -178,7 +124,6 @@ export default class Client {
export interface ClientOptions {
id: string
socket: WebSocket
heartbeatInterval?: number
}
export type ClientPacketObject<TOp extends ClientOperation> = Packet<TOp> & {

View File

@@ -1,20 +1,33 @@
import type { ClientOperation } from '@revanced/bot-shared'
import type { Logger } from '@revanced/bot-shared'
import type { Wit } from 'node-wit'
import type { Worker as TesseractWorker } from 'tesseract.js'
import { ClientPacketObject } from '../classes/Client'
import type { Config } from '../utils/getConfig'
import type { ClientPacketObject } from '../classes/Client'
import type { Config } from '../utils/config'
export { default as parseTextEventHandler } from './parseText'
export { default as parseImageEventHandler } from './parseImage'
export { default as trainMessageEventHandler } from './trainMessage'
export type EventHandler<POp extends ClientOperation> = (
packet: ClientPacketObject<POp>,
context: EventContext,
) => void | Promise<void>
export type EventContext = {
witClient: Wit
tesseractWorker: TesseractWorker
wit: {
train(text: string, label: string): Promise<void>
message(text: string): Promise<WitMessageResponse>
}
tesseract: TesseractWorker
logger: Logger
config: Config
}
export interface WitMessageResponse {
text: string
intents: Array<{
id: string
name: string
confidence: number
}>
}

View File

@@ -1,19 +1,21 @@
import { ClientOperation, ServerOperation } from '@revanced/bot-shared'
import { type ClientOperation, ServerOperation } from '@revanced/bot-shared'
import { AsyncQueue } from '@sapphire/async-queue'
import type { EventHandler } from './index'
import type { EventHandler } from '.'
const queue = new AsyncQueue()
const parseImageEventHandler: EventHandler<ClientOperation.ParseImage> = async (
packet,
{ tesseractWorker, logger, config },
{ tesseract, logger, config },
) => {
const {
client,
d: { image_url: imageUrl, id },
d: { image_url: imageUrl },
} = packet
const nextSeq = client.currentSequence++
logger.debug(`Client ${client.id} requested to parse image from URL:`, imageUrl)
logger.debug(`Queue currently has ${queue.remaining}/${config.ocrConcurrentQueues} items in it`)
@@ -23,24 +25,27 @@ const parseImageEventHandler: EventHandler<ClientOperation.ParseImage> = async (
try {
logger.debug(`Recognizing image from URL for client ${client.id}`)
const { data, jobId } = await tesseractWorker.recognize(imageUrl)
const { data, jobId } = await tesseract.recognize(imageUrl)
logger.debug(`Recognized image from URL for client ${client.id} (job ${jobId}):`, data.text)
await client.send({
op: ServerOperation.ParsedImage,
d: {
id,
text: data.text,
await client.send(
{
op: ServerOperation.ParsedImage,
d: {
text: data.text,
},
},
})
nextSeq,
)
} catch {
logger.error(`Failed to parse image from URL for client ${client.id}:`, imageUrl)
await client.send({
op: ServerOperation.ParseImageFailed,
d: {
id,
await client.send(
{
op: ServerOperation.ParseImageFailed,
d: null,
},
})
nextSeq,
)
} finally {
queue.shift()
logger.debug(

View File

@@ -1,35 +1,41 @@
import { ClientOperation, ServerOperation } from '@revanced/bot-shared'
import { type ClientOperation, ServerOperation } from '@revanced/bot-shared'
import { inspect as inspectObject } from 'util'
import type { EventHandler } from './index'
import type { EventHandler } from '.'
const parseTextEventHandler: EventHandler<ClientOperation.ParseText> = async (packet, { witClient, logger }) => {
const parseTextEventHandler: EventHandler<ClientOperation.ParseText> = async (packet, { wit, logger }) => {
const {
client,
d: { text, id },
d: { text },
} = packet
logger.debug(`Client ${client.id} requested to parse text:`, text)
const nextSeq = client.currentSequence++
const actualText = text.slice(0, 279)
logger.debug(`Client ${client.id} requested to parse text:`, actualText)
try {
const { intents } = await witClient.message(text, {})
const { intents } = await wit.message(actualText)
const intentsWithoutIds = intents.map(({ id, ...rest }) => rest)
await client.send({
op: ServerOperation.ParsedText,
d: {
id,
labels: intentsWithoutIds,
await client.send(
{
op: ServerOperation.ParsedText,
d: {
labels: intentsWithoutIds,
},
},
})
nextSeq,
)
} catch (e) {
await client.send({
op: ServerOperation.ParseTextFailed,
d: {
id,
await client.send(
{
op: ServerOperation.ParseTextFailed,
d: null,
},
})
nextSeq,
)
if (e instanceof Error) logger.error(e.stack ?? e.message)
else logger.error(inspectObject(e))

View File

@@ -0,0 +1,43 @@
import { type ClientOperation, ServerOperation } from '@revanced/bot-shared'
import { inspect as inspectObject } from 'util'
import type { EventHandler } from '.'
const trainMessageEventHandler: EventHandler<ClientOperation.TrainMessage> = async (packet, { wit, logger }) => {
const {
client,
d: { text, label },
} = packet
const nextSeq = client.currentSequence++
const actualText = text.slice(0, 279)
logger.debug(`Client ${client.id} requested to train label ${label} with:`, actualText)
try {
await wit.train(actualText, label)
await client.send(
{
op: ServerOperation.TrainedMessage,
d: null,
},
nextSeq,
)
logger.debug(`Trained label ${label} with:`, actualText)
} catch (e) {
await client.send(
{
op: ServerOperation.TrainMessageFailed,
d: null,
},
nextSeq,
)
if (e instanceof Error) logger.error(e.stack ?? e.message)
else logger.error(inspectObject(e))
}
}
export default trainMessageEventHandler

View File

@@ -1,43 +1,93 @@
import witPkg from 'node-wit'
import { createWorker as createTesseractWorker } from 'tesseract.js'
const { Wit } = witPkg
import { inspect as inspectObject } from 'util'
import Client from './classes/Client'
import { EventContext, parseImageEventHandler, parseTextEventHandler } from './events/index'
import {
type EventContext,
type WitMessageResponse,
parseImageEventHandler,
parseTextEventHandler,
trainMessageEventHandler,
} from './events'
import { DisconnectReason, HumanizedDisconnectReason, createLogger } from '@revanced/bot-shared'
import { checkEnvironment, getConfig } from './utils/index'
import { getConfig } from './utils/config'
import { createServer } from 'http'
import { WebSocket, WebSocketServer } from 'ws'
import { type WebSocket, WebSocketServer } from 'ws'
// Load config, init logger, check environment
const config = getConfig()
const logger = createLogger({
level: config['consoleLogLevel'] === 'none' ? Infinity : config['consoleLogLevel'],
level: config.logLevel === 'none' ? Number.MAX_SAFE_INTEGER : config.logLevel,
})
checkEnvironment(logger)
if (!process.env['NODE_ENV']) logger.warn('NODE_ENV not set, defaulting to `development`')
const environment = (process.env['NODE_ENV'] ?? 'development') as NodeEnvironment
if (!['development', 'production'].includes(environment)) {
logger.error('NODE_ENV is neither `development` nor `production`, unable to determine environment')
logger.info('Set NODE_ENV to blank to use `development` mode')
process.exit(1)
}
logger.info(`Running in ${environment} mode...`)
if (environment === 'production' && process.env['IS_USING_DOT_ENV']) {
logger.warn('You seem to be using .env files, this is generally not a good idea in production...')
}
if (!process.env['WIT_AI_TOKEN']) {
logger.error('WIT_AI_TOKEN is not defined in the environment variables')
process.exit(1)
}
// Workers and API clients
const tesseractWorker = await createTesseractWorker('eng')
const witClient = new Wit({
accessToken: process.env['WIT_AI_TOKEN']!,
})
const tesseract = await createTesseractWorker('eng')
const wit = {
token: process.env['WIT_AI_TOKEN']!,
async fetch(route: string, options?: RequestInit) {
const res = await fetch(`https://api.wit.ai${route}`, {
headers: {
Authorization: `Bearer ${this.token}`,
'Content-Type': 'application/json',
},
...options,
})
if (!res.ok) throw new Error(`Failed to fetch from Wit.ai: ${res.statusText} (${res.status})`)
return await res.json()
},
message(text: string) {
return this.fetch(`/message?q=${encodeURIComponent(text)}&n=8`) as Promise<WitMessageResponse>
},
async train(text: string, label: string) {
await this.fetch('/utterances', {
body: JSON.stringify([
{
text,
intent: label,
entities: [],
traits: [],
},
]),
method: 'POST',
})
},
} as const
// Server logic
const clients = new Set<Client>()
const clientSocketMap = new WeakMap<WebSocket, Client>()
const clientMap = new WeakMap<WebSocket, Client>()
const eventContext: EventContext = {
tesseractWorker,
tesseract,
logger,
witClient,
wit,
config,
}
@@ -61,25 +111,24 @@ wss.on('connection', async (socket, request) => {
const client = new Client({
socket,
id: `${request.socket.remoteAddress}:${request.socket.remotePort}`,
heartbeatInterval: config['clientHeartbeatInterval'],
})
clientSocketMap.set(socket, client)
clients.add(client)
clientMap.set(socket, client)
logger.debug(`Client ${client.id}'s instance has been added`)
logger.info(`New client connected (now ${clients.size} clients) with ID:`, client.id)
logger.info(`New client connected with ID: ${client.id}`)
client.on('disconnect', reason => {
clients.delete(client)
logger.info(`Client ${client.id} disconnected because client ${HumanizedDisconnectReason[reason]}`)
logger.info(
`Client ${client.id} disconnected because client ${HumanizedDisconnectReason[reason]} (${reason})`,
)
})
client.on('parseText', async packet => parseTextEventHandler(packet, eventContext))
client.on('parseText', packet => parseTextEventHandler(packet, eventContext))
client.on('parseImage', packet => parseImageEventHandler(packet, eventContext))
client.on('trainMessage', packet => trainMessageEventHandler(packet, eventContext))
client.on('parseImage', async packet => parseImageEventHandler(packet, eventContext))
if (['debug', 'trace'].includes(config['consoleLogLevel'])) {
if (['debug', 'trace'].includes(config.logLevel)) {
logger.debug('Debug logs enabled, attaching debug events...')
client.on('packet', ({ client, ...rawPacket }) =>
@@ -87,14 +136,12 @@ wss.on('connection', async (socket, request) => {
)
client.on('message', d => logger.debug(`Message from client ${client.id}:`, d))
client.on('heartbeat', () => logger.debug('Heartbeat received from client', client.id))
}
} catch (e) {
if (e instanceof Error) logger.error(e.stack ?? e.message)
else logger.error(inspectObject(e))
const client = clientSocketMap.get(socket)
const client = clientMap.get(socket)
if (!client) {
logger.error(
@@ -104,9 +151,6 @@ wss.on('connection', async (socket, request) => {
}
if (client.disconnected === false) client.disconnect(DisconnectReason.ServerError)
else client.forceDisconnect()
clients.delete(client)
logger.debug(`Client ${client.id} disconnected because of an internal error`)
}
@@ -114,7 +158,7 @@ wss.on('connection', async (socket, request) => {
// Start the server
server.listen(config['port'], config['address'])
server.listen(config.port, config.address)
logger.debug(`Starting with these configurations: ${inspectObject(config)}`)

View File

@@ -1,23 +0,0 @@
import type { Logger } from '@revanced/bot-shared'
export default function checkEnvironment(logger: Logger) {
if (!process.env['NODE_ENV']) logger.warn('NODE_ENV not set, defaulting to `development`')
const environment = (process.env['NODE_ENV'] ?? 'development') as NodeEnvironment
if (!['development', 'production'].includes(environment)) {
logger.error('NODE_ENV is neither `development` nor `production`, unable to determine environment')
logger.info('Set NODE_ENV to blank to use `development` mode')
process.exit(1)
}
logger.info(`Running in ${environment} mode...`)
if (environment === 'production' && process.env['IS_USING_DOT_ENV']) {
logger.warn('You seem to be using .env files, this is generally not a good idea in production...')
}
if (!process.env['WIT_AI_TOKEN']) {
logger.error('WIT_AI_TOKEN is not defined in the environment variables')
process.exit(1)
}
}

View File

@@ -7,7 +7,7 @@ const configPath = resolvePath(process.cwd(), 'config.json')
const userConfig: Partial<Config> = existsSync(configPath)
? (
await import(pathToFileURL(configPath).href, {
assert: {
with: {
type: 'json',
},
})
@@ -28,10 +28,9 @@ export const defaultConfig: Config = {
address: '127.0.0.1',
port: 8080,
ocrConcurrentQueues: 1,
clientHeartbeatInterval: 60000,
consoleLogLevel: 'info',
logLevel: 'info',
}
export default function getConfig() {
export function getConfig() {
return Object.assign(defaultConfig, userConfig) satisfies Config
}

View File

@@ -1,2 +0,0 @@
export { default as getConfig } from './getConfig'
export { default as checkEnvironment } from './checkEnvironment'