fix(packages/api): improve packet awaiting and reconnection

This commit is contained in:
PalmDevs
2024-04-02 19:12:28 +07:00
parent b6cbe9d64c
commit 4b2557f1f1
5 changed files with 75 additions and 54 deletions

View File

@@ -7,11 +7,8 @@
"types": "dist/index.d.ts", "types": "dist/index.d.ts",
"scripts": { "scripts": {
"build": "bun bundle && bun types", "build": "bun bundle && bun types",
"watch": "bunx conc --raw \"bun bundle:watch\" \"bun types:watch\"",
"bundle": "bun build src/index.ts --outdir=dist --target=bun", "bundle": "bun build src/index.ts --outdir=dist --target=bun",
"bundle:watch": "bun run bundle --watch",
"types": "tsc --declaration --emitDeclarationOnly", "types": "tsc --declaration --emitDeclarationOnly",
"types:watch": "bun types --watch --preserveWatchOutput",
"types:clean": "bun types --build --clean" "types:clean": "bun types --build --clean"
}, },
"repository": { "repository": {
@@ -30,7 +27,8 @@
}, },
"homepage": "https://github.com/revanced/revanced-helper#readme", "homepage": "https://github.com/revanced/revanced-helper#readme",
"dependencies": { "dependencies": {
"@revanced/bot-shared": "workspace:*" "@revanced/bot-shared": "workspace:*",
"ws": "^8.16.0"
}, },
"devDependencies": { "devDependencies": {
"typed-emitter": "^2.1.0" "typed-emitter": "^2.1.0"

View File

@@ -1,5 +1,4 @@
import { ClientOperation, ServerOperation } from '@revanced/bot-shared' import { ClientOperation, type Packet, ServerOperation } from '@revanced/bot-shared'
import { awaitPacket } from 'src/utils/packets'
import { import {
type ClientWebSocketEvents, type ClientWebSocketEvents,
ClientWebSocketManager, ClientWebSocketManager,
@@ -12,6 +11,7 @@ import {
export default class Client { export default class Client {
ready = false ready = false
ws: ClientWebSocketManager ws: ClientWebSocketManager
#awaiter: ClientWebSocketPacketAwaiter
constructor(options: ClientOptions) { constructor(options: ClientOptions) {
this.ws = new ClientWebSocketManager(options.api.websocket) this.ws = new ClientWebSocketManager(options.api.websocket)
@@ -21,6 +21,8 @@ export default class Client {
this.ws.on('disconnect', () => { this.ws.on('disconnect', () => {
this.ready = false this.ready = false
}) })
this.#awaiter = new ClientWebSocketPacketAwaiter(this.ws)
} }
/** /**
@@ -50,13 +52,9 @@ export default class Client {
// But if we add anything similar, this will cause another race condition // But if we add anything similar, this will cause another race condition
// To fix this, we can try adding a instanced function that would return the currentSequence // To fix this, we can try adding a instanced function that would return the currentSequence
// and it would be updated every time a "heartbeat ack" packet is received // and it would be updated every time a "heartbeat ack" packet is received
const expectedNextSeq = this.ws.currentSequence + 1
const awaitPkt = (op: ServerOperation, timeout = this.ws.timeout) =>
awaitPacket(this.ws, op, expectedNextSeq, timeout)
return Promise.race([ return Promise.race([
awaitPkt(ServerOperation.ParsedText), this.#awaiter.await(ServerOperation.ParsedText, this.ws.currentSequence),
awaitPkt(ServerOperation.ParseTextFailed, this.ws.timeout + 5000), this.#awaiter.await(ServerOperation.ParseTextFailed, this.ws.timeout + 5000),
]) ])
.then(pkt => { .then(pkt => {
if (pkt.op === ServerOperation.ParsedText) return pkt.d if (pkt.op === ServerOperation.ParsedText) return pkt.d
@@ -82,14 +80,11 @@ export default class Client {
}, },
}) })
// See line 48 // See line 50
const expectedNextSeq = this.ws.currentSequence + 1
const awaitPkt = (op: ServerOperation, timeout = this.ws.timeout) =>
awaitPacket(this.ws, op, expectedNextSeq, timeout)
return Promise.race([ return Promise.race([
awaitPkt(ServerOperation.ParsedImage), this.#awaiter.await(ServerOperation.ParsedImage, this.ws.currentSequence),
awaitPkt(ServerOperation.ParseImageFailed, this.ws.timeout + 5000), this.#awaiter.await(ServerOperation.ParseImageFailed, this.ws.timeout + 5000),
]) ])
.then(pkt => { .then(pkt => {
if (pkt.op === ServerOperation.ParsedImage) return pkt.d if (pkt.op === ServerOperation.ParsedImage) return pkt.d
@@ -111,17 +106,13 @@ export default class Client {
}, },
}) })
// See line 48 // See line 50
const expectedNextSeq = this.ws.currentSequence + 1
const awaitPkt = (op: ServerOperation, timeout = this.ws.timeout) =>
awaitPacket(this.ws, op, expectedNextSeq, timeout)
return Promise.race([ return Promise.race([
awaitPkt(ServerOperation.TrainedMessage), this.#awaiter.await(ServerOperation.TrainedMessage, this.ws.currentSequence),
awaitPkt(ServerOperation.TrainMessageFailed, this.ws.timeout + 5000), this.#awaiter.await(ServerOperation.TrainMessageFailed, this.ws.timeout + 5000),
]) ])
.then(pkt => { .then(pkt => {
if (pkt.op === ServerOperation.TrainedMessage) return if (pkt.op === ServerOperation.TrainedMessage) return pkt.d
throw new Error('Failed to train message, the API encountered an error') throw new Error('Failed to train message, the API encountered an error')
}) })
.catch(() => { .catch(() => {
@@ -162,6 +153,13 @@ export default class Client {
return handler return handler
} }
/**
* Connects the client to the API
*/
connect() {
return this.ws.connect()
}
/** /**
* Disconnects the client from the API * Disconnects the client from the API
*/ */
@@ -174,6 +172,45 @@ export default class Client {
} }
} }
export class ClientWebSocketPacketAwaiter {
#ws: ClientWebSocketManager
#resolvers: Map<string, (packet: Packet<ServerOperation>) => void>
constructor(ws: ClientWebSocketManager) {
this.#ws = ws
this.#resolvers = new Map()
this.#ws.on('packet', packet => {
const key = this.keyFor(packet.op, packet.s)
const resolve = this.#resolvers.get(key)
if (resolve) {
resolve(packet)
this.#resolvers.delete(key)
}
})
}
keyFor(op: ServerOperation, seq: number) {
return `${op}-${seq}`
}
await<TOp extends ServerOperation>(
op: TOp,
expectedSeq: number,
timeout = 10000,
): Promise<Packet<ServerOperation>> {
return new Promise((resolve, reject) => {
const key = this.keyFor(op, expectedSeq)
this.#resolvers.set(key, resolve)
setTimeout(() => {
this.#resolvers.delete(key)
reject('Awaiting packet timed out')
}, timeout)
})
}
}
export type ReadiedClient = Client & { ready: true } export type ReadiedClient = Client & { ready: true }
export interface ClientOptions { export interface ClientOptions {

View File

@@ -20,6 +20,7 @@ export class ClientWebSocketManager {
readonly url: string readonly url: string
timeout: number timeout: number
connecting = false
ready = false ready = false
disconnected: false | DisconnectReason = false disconnected: false | DisconnectReason = false
currentSequence = 0 currentSequence = 0
@@ -36,8 +37,12 @@ export class ClientWebSocketManager {
* Connects to the WebSocket API * Connects to the WebSocket API
* @returns A promise that resolves when the client is ready * @returns A promise that resolves when the client is ready
*/ */
connect() { async connect() {
return new Promise<void>((rs, rj) => { if (this.connecting) throw new Error('Cannot connect when already connecting to the server')
this.connecting = true
await new Promise<void>((rs, rj) => {
try { try {
this.#socket = new WebSocket(this.url) this.#socket = new WebSocket(this.url)
@@ -63,10 +68,13 @@ export class ClientWebSocketManager {
this.#socket.on('close', (code, reason) => { this.#socket.on('close', (code, reason) => {
clearTimeout(timeout) clearTimeout(timeout)
this._handleDisconnect(code, reason.toString()) this._handleDisconnect(code, reason.toString())
throw new Error('WebSocket connection closed before ready')
}) })
} catch (e) { } catch (e) {
rj(e) rj(e)
} }
}).finally(() => {
this.connecting = false
}) })
} }
@@ -108,6 +116,8 @@ export class ClientWebSocketManager {
send<TOp extends ClientOperation>(packet: Packet<TOp>) { send<TOp extends ClientOperation>(packet: Packet<TOp>) {
this.#throwIfDisconnected('Cannot send a packet when already disconnected from the server') this.#throwIfDisconnected('Cannot send a packet when already disconnected from the server')
this.currentSequence++
this.#socket.send(serializePacket(packet), err => { this.#socket.send(serializePacket(packet), err => {
throw err throw err
}) })
@@ -164,6 +174,7 @@ export class ClientWebSocketManager {
protected _handleDisconnect(reason: DisconnectReason | number, message?: string) { protected _handleDisconnect(reason: DisconnectReason | number, message?: string) {
this.disconnected = reason in DisconnectReason ? reason : DisconnectReason.Generic this.disconnected = reason in DisconnectReason ? reason : DisconnectReason.Generic
this.connecting = false
this.#socket?.close(reason) this.#socket?.close(reason)
this.#socket = null! this.#socket = null!

View File

@@ -1 +1,2 @@
type RequiredProperty<T> = { [P in keyof T]: Required<NonNullable<T[P]>> } type RequiredProperty<T> = { [P in keyof T]: Required<NonNullable<T[P]>> }
type IfTrueElseNever<T extends boolean, U> = T extends true ? U : never

View File

@@ -1,26 +0,0 @@
import type { Packet, ServerOperation } from '@revanced/bot-shared'
import type { ClientWebSocketManager } from 'src/classes'
export function awaitPacket<TOp extends ServerOperation>(
ws: ClientWebSocketManager,
op: TOp,
expectedSeq: number,
timeout = 10000,
): Promise<Packet<TOp>> {
return new Promise((resolve, reject) => {
const timer = setTimeout(() => {
ws.off('packet', handler)
reject('Awaiting packet timed out')
}, timeout)
function handler(packet: Packet) {
if (packet.op === op && packet.s === expectedSeq) {
clearTimeout(timer)
ws.off('packet', handler)
resolve(packet as Packet<TOp>)
}
}
ws.on('packet', handler)
})
}