From 767bc3e7af316c8bd7fa4f904d900e9fe4ecd2ba Mon Sep 17 00:00:00 2001 From: brentjmadison Date: Fri, 14 Nov 2025 15:19:08 -0600 Subject: [PATCH] Cluster issue fix --- src/index.ts | 70 ++++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 54 insertions(+), 16 deletions(-) diff --git a/src/index.ts b/src/index.ts index 4ddef02..ac8d16d 100644 --- a/src/index.ts +++ b/src/index.ts @@ -206,8 +206,23 @@ export class MicrosoftRewardsBot { if (this.config.clusters > 1) { if (cluster.isPrimary) { this.runMaster() + } else if (cluster.worker) { + await this.runWorker() } else { - this.runWorker() + // Neither primary nor worker - something's wrong with clustering + log('main', 'MAIN', `ERROR: Cluster mode failed - neither primary nor worker! Falling back to single-process mode.`, 'error') + const passes = this.config.passesPerRun ?? 1 + for (let pass = 1; pass <= passes; pass++) { + if (passes > 1) { + log('main', 'MAIN', `Starting pass ${pass}/${passes}`) + } + await this.runTasks(this.accounts, pass, passes) + if (pass < passes) { + log('main', 'MAIN', `Completed pass ${pass}/${passes}. Waiting before next pass...`) + await this.utils.wait(TIMEOUTS.ONE_MINUTE) + } + } + return } } else { const passes = this.config.passesPerRun ?? 1 @@ -301,6 +316,8 @@ export class MicrosoftRewardsBot { // FIXED: Proper type checking before calling send if (worker.send && typeof worker.send === 'function') { worker.send({ chunk }) + } else { + log('main', 'MAIN-PRIMARY', `ERROR: Worker ${i} does not have a send function!`, 'error') } worker.on('message', (msg: unknown) => { // IMPROVED: Using type-safe interface and type guard @@ -360,22 +377,34 @@ export class MicrosoftRewardsBot { }) } - private runWorker() { + private async runWorker() { log('main', 'MAIN-WORKER', `Worker ${process.pid} spawned`) - // Receive the chunk of accounts from the master - ; (process as unknown as { on: (ev: 'message', cb: (m: { chunk: Account[] }) => void) => void }).on('message', async ({ chunk }: { chunk: Account[] }) => { - const passes = this.config.passesPerRun ?? 1 - for (let pass = 1; pass <= passes; pass++) { - if (passes > 1) { - log('main', 'MAIN-WORKER', `Starting pass ${pass}/${passes}`) - } - await this.runTasks(chunk, pass, passes) - if (pass < passes) { - log('main', 'MAIN-WORKER', `Completed pass ${pass}/${passes}. Waiting before next pass...`) - await this.utils.wait(TIMEOUTS.ONE_MINUTE) - } - } - }) + + // Wait for chunk (either already received during init, or will arrive soon) + const chunk = await new Promise((resolve) => { + if ((global as any).__workerChunk) { + resolve((global as any).__workerChunk) + } else { + (process as unknown as { on: (ev: 'message', cb: (m: { chunk: Account[] }) => void) => void }).on('message', ({ chunk: c }: { chunk: Account[] }) => resolve(c)) + } + }) + + if (!chunk || chunk.length === 0) { + log('main', 'MAIN-WORKER', `ERROR: Worker ${process.pid} received empty or undefined chunk!`, 'error') + return + } + + const passes = this.config.passesPerRun ?? 1 + for (let pass = 1; pass <= passes; pass++) { + if (passes > 1) { + log('main', 'MAIN-WORKER', `Starting pass ${pass}/${passes}`) + } + await this.runTasks(chunk, pass, passes) + if (pass < passes) { + log('main', 'MAIN-WORKER', `Completed pass ${pass}/${passes}. Waiting before next pass...`) + await this.utils.wait(TIMEOUTS.ONE_MINUTE) + } + } } private async runTasks(accounts: Account[], currentPass: number = 1, totalPasses: number = 1) { @@ -872,6 +901,15 @@ const shortErr = shortErrorMessage const formatFullError = formatDetailedError async function main(): Promise { + // FIX: Set up message listener early to prevent race condition + // Workers initialize for ~2 seconds before reaching runWorker(), so messages + // sent by primary during initialization would be lost without this early listener + if (!cluster.isPrimary && cluster.worker) { + (process as unknown as { on: (ev: 'message', cb: (m: { chunk: Account[] }) => void) => void }).on('message', ({ chunk }: { chunk: Account[] }) => { + (global as any).__workerChunk = chunk + }) + } + // Check for dashboard mode flag (standalone dashboard) if (process.argv.includes('-dashboard')) { const { startDashboardServer } = await import('./dashboard/server')