diff --git a/packages/api/package.json b/packages/api/package.json index 2eb4e35..4801219 100755 --- a/packages/api/package.json +++ b/packages/api/package.json @@ -7,11 +7,8 @@ "types": "dist/index.d.ts", "scripts": { "build": "bun bundle && bun types", - "watch": "bunx conc --raw \"bun bundle:watch\" \"bun types:watch\"", "bundle": "bun build src/index.ts --outdir=dist --target=bun", - "bundle:watch": "bun run bundle --watch", "types": "tsc --declaration --emitDeclarationOnly", - "types:watch": "bun types --watch --preserveWatchOutput", "types:clean": "bun types --build --clean" }, "repository": { @@ -30,7 +27,8 @@ }, "homepage": "https://github.com/revanced/revanced-helper#readme", "dependencies": { - "@revanced/bot-shared": "workspace:*" + "@revanced/bot-shared": "workspace:*", + "ws": "^8.16.0" }, "devDependencies": { "typed-emitter": "^2.1.0" diff --git a/packages/api/src/classes/Client.ts b/packages/api/src/classes/Client.ts index 6fc2bb3..b0ccb43 100755 --- a/packages/api/src/classes/Client.ts +++ b/packages/api/src/classes/Client.ts @@ -1,5 +1,4 @@ -import { ClientOperation, ServerOperation } from '@revanced/bot-shared' -import { awaitPacket } from 'src/utils/packets' +import { ClientOperation, type Packet, ServerOperation } from '@revanced/bot-shared' import { type ClientWebSocketEvents, ClientWebSocketManager, @@ -12,6 +11,7 @@ import { export default class Client { ready = false ws: ClientWebSocketManager + #awaiter: ClientWebSocketPacketAwaiter constructor(options: ClientOptions) { this.ws = new ClientWebSocketManager(options.api.websocket) @@ -21,6 +21,8 @@ export default class Client { this.ws.on('disconnect', () => { this.ready = false }) + + this.#awaiter = new ClientWebSocketPacketAwaiter(this.ws) } /** @@ -50,13 +52,9 @@ export default class Client { // But if we add anything similar, this will cause another race condition // To fix this, we can try adding a instanced function that would return the currentSequence // and it would be updated every time a "heartbeat ack" packet is received - const expectedNextSeq = this.ws.currentSequence + 1 - const awaitPkt = (op: ServerOperation, timeout = this.ws.timeout) => - awaitPacket(this.ws, op, expectedNextSeq, timeout) - return Promise.race([ - awaitPkt(ServerOperation.ParsedText), - awaitPkt(ServerOperation.ParseTextFailed, this.ws.timeout + 5000), + this.#awaiter.await(ServerOperation.ParsedText, this.ws.currentSequence), + this.#awaiter.await(ServerOperation.ParseTextFailed, this.ws.timeout + 5000), ]) .then(pkt => { if (pkt.op === ServerOperation.ParsedText) return pkt.d @@ -82,14 +80,11 @@ export default class Client { }, }) - // See line 48 - const expectedNextSeq = this.ws.currentSequence + 1 - const awaitPkt = (op: ServerOperation, timeout = this.ws.timeout) => - awaitPacket(this.ws, op, expectedNextSeq, timeout) + // See line 50 return Promise.race([ - awaitPkt(ServerOperation.ParsedImage), - awaitPkt(ServerOperation.ParseImageFailed, this.ws.timeout + 5000), + this.#awaiter.await(ServerOperation.ParsedImage, this.ws.currentSequence), + this.#awaiter.await(ServerOperation.ParseImageFailed, this.ws.timeout + 5000), ]) .then(pkt => { if (pkt.op === ServerOperation.ParsedImage) return pkt.d @@ -111,17 +106,13 @@ export default class Client { }, }) - // See line 48 - const expectedNextSeq = this.ws.currentSequence + 1 - const awaitPkt = (op: ServerOperation, timeout = this.ws.timeout) => - awaitPacket(this.ws, op, expectedNextSeq, timeout) - + // See line 50 return Promise.race([ - awaitPkt(ServerOperation.TrainedMessage), - awaitPkt(ServerOperation.TrainMessageFailed, this.ws.timeout + 5000), + this.#awaiter.await(ServerOperation.TrainedMessage, this.ws.currentSequence), + this.#awaiter.await(ServerOperation.TrainMessageFailed, this.ws.timeout + 5000), ]) .then(pkt => { - if (pkt.op === ServerOperation.TrainedMessage) return + if (pkt.op === ServerOperation.TrainedMessage) return pkt.d throw new Error('Failed to train message, the API encountered an error') }) .catch(() => { @@ -162,6 +153,13 @@ export default class Client { return handler } + /** + * Connects the client to the API + */ + connect() { + return this.ws.connect() + } + /** * Disconnects the client from the API */ @@ -174,6 +172,45 @@ export default class Client { } } +export class ClientWebSocketPacketAwaiter { + #ws: ClientWebSocketManager + #resolvers: Map) => void> + + constructor(ws: ClientWebSocketManager) { + this.#ws = ws + this.#resolvers = new Map() + + this.#ws.on('packet', packet => { + const key = this.keyFor(packet.op, packet.s) + const resolve = this.#resolvers.get(key) + if (resolve) { + resolve(packet) + this.#resolvers.delete(key) + } + }) + } + + keyFor(op: ServerOperation, seq: number) { + return `${op}-${seq}` + } + + await( + op: TOp, + expectedSeq: number, + timeout = 10000, + ): Promise> { + return new Promise((resolve, reject) => { + const key = this.keyFor(op, expectedSeq) + this.#resolvers.set(key, resolve) + + setTimeout(() => { + this.#resolvers.delete(key) + reject('Awaiting packet timed out') + }, timeout) + }) + } +} + export type ReadiedClient = Client & { ready: true } export interface ClientOptions { diff --git a/packages/api/src/classes/ClientWebSocket.ts b/packages/api/src/classes/ClientWebSocket.ts index f5b960e..2383868 100755 --- a/packages/api/src/classes/ClientWebSocket.ts +++ b/packages/api/src/classes/ClientWebSocket.ts @@ -20,6 +20,7 @@ export class ClientWebSocketManager { readonly url: string timeout: number + connecting = false ready = false disconnected: false | DisconnectReason = false currentSequence = 0 @@ -36,8 +37,12 @@ export class ClientWebSocketManager { * Connects to the WebSocket API * @returns A promise that resolves when the client is ready */ - connect() { - return new Promise((rs, rj) => { + async connect() { + if (this.connecting) throw new Error('Cannot connect when already connecting to the server') + + this.connecting = true + + await new Promise((rs, rj) => { try { this.#socket = new WebSocket(this.url) @@ -63,10 +68,13 @@ export class ClientWebSocketManager { this.#socket.on('close', (code, reason) => { clearTimeout(timeout) this._handleDisconnect(code, reason.toString()) + throw new Error('WebSocket connection closed before ready') }) } catch (e) { rj(e) } + }).finally(() => { + this.connecting = false }) } @@ -108,6 +116,8 @@ export class ClientWebSocketManager { send(packet: Packet) { this.#throwIfDisconnected('Cannot send a packet when already disconnected from the server') + this.currentSequence++ + this.#socket.send(serializePacket(packet), err => { throw err }) @@ -164,6 +174,7 @@ export class ClientWebSocketManager { protected _handleDisconnect(reason: DisconnectReason | number, message?: string) { this.disconnected = reason in DisconnectReason ? reason : DisconnectReason.Generic + this.connecting = false this.#socket?.close(reason) this.#socket = null! diff --git a/packages/api/src/types.d.ts b/packages/api/src/types.d.ts index 778651e..f0639d5 100755 --- a/packages/api/src/types.d.ts +++ b/packages/api/src/types.d.ts @@ -1 +1,2 @@ type RequiredProperty = { [P in keyof T]: Required> } +type IfTrueElseNever = T extends true ? U : never diff --git a/packages/api/src/utils/packets.ts b/packages/api/src/utils/packets.ts deleted file mode 100644 index 70e14de..0000000 --- a/packages/api/src/utils/packets.ts +++ /dev/null @@ -1,26 +0,0 @@ -import type { Packet, ServerOperation } from '@revanced/bot-shared' -import type { ClientWebSocketManager } from 'src/classes' - -export function awaitPacket( - ws: ClientWebSocketManager, - op: TOp, - expectedSeq: number, - timeout = 10000, -): Promise> { - return new Promise((resolve, reject) => { - const timer = setTimeout(() => { - ws.off('packet', handler) - reject('Awaiting packet timed out') - }, timeout) - - function handler(packet: Packet) { - if (packet.op === op && packet.s === expectedSeq) { - clearTimeout(timer) - ws.off('packet', handler) - resolve(packet as Packet) - } - } - - ws.on('packet', handler) - }) -}