From e7f8f7110cb5bf5a1ebc9f9b4c679e4de4e49c21 Mon Sep 17 00:00:00 2001 From: LightZirconite Date: Sat, 15 Nov 2025 12:47:09 +0100 Subject: [PATCH] feat: Improve workflow management by making runMaster asynchronous and adding error handling --- src/index.ts | 181 ++++++++++++++++++++++++++------------------------- 1 file changed, 94 insertions(+), 87 deletions(-) diff --git a/src/index.ts b/src/index.ts index 2fd46e0..e981ddd 100644 --- a/src/index.ts +++ b/src/index.ts @@ -205,7 +205,7 @@ export class MicrosoftRewardsBot { // Only cluster when there's more than 1 cluster demanded if (this.config.clusters > 1) { if (cluster.isPrimary) { - this.runMaster() + await this.runMaster() } else if (cluster.worker) { await this.runWorker() } else { @@ -279,101 +279,108 @@ export class MicrosoftRewardsBot { return this.accountSummaries } - private runMaster() { - log('main', 'MAIN-PRIMARY', 'Primary process started') + private runMaster(): Promise { + return new Promise((resolve) => { + log('main', 'MAIN-PRIMARY', 'Primary process started') - const totalAccounts = this.accounts.length + const totalAccounts = this.accounts.length - // Validate accounts exist - if (totalAccounts === 0) { - log('main', 'MAIN-PRIMARY', 'No accounts found to process. Exiting.', 'warn') - process.exit(0) - } - - // If user over-specified clusters (e.g. 10 clusters but only 2 accounts), don't spawn useless idle workers. - const workerCount = Math.min(this.config.clusters, totalAccounts) - const accountChunks = this.utils.chunkArray(this.accounts, workerCount) - // Reset activeWorkers to actual spawn count (constructor used raw clusters) - this.activeWorkers = workerCount - - // Store worker-to-chunk mapping for crash recovery - const workerChunkMap = new Map() - - for (let i = 0; i < workerCount; i++) { - const worker = cluster.fork() - const chunk = accountChunks[i] || [] - - // Validate chunk has accounts - if (chunk.length === 0) { - log('main', 'MAIN-PRIMARY', `Warning: Worker ${i} received empty account chunk`, 'warn') + // Validate accounts exist + if (totalAccounts === 0) { + log('main', 'MAIN-PRIMARY', 'No accounts found to process. Nothing to do.', 'warn') + resolve() + return } - // Store chunk mapping for crash recovery - if (worker.id) { - workerChunkMap.set(worker.id, chunk) + // If user over-specified clusters (e.g. 10 clusters but only 2 accounts), don't spawn useless idle workers. + const workerCount = Math.min(this.config.clusters, totalAccounts) + const accountChunks = this.utils.chunkArray(this.accounts, workerCount) + // Reset activeWorkers to actual spawn count (constructor used raw clusters) + this.activeWorkers = workerCount + + // Store worker-to-chunk mapping for crash recovery + const workerChunkMap = new Map() + + let resolved = false + const finishRun = async () => { + if (resolved) return + resolved = true + try { + await this.sendConclusion(this.accountSummaries) + } catch (e) { + log('main', 'CONCLUSION', `Failed to send conclusion: ${e instanceof Error ? e.message : String(e)}`, 'warn') + } + log('main', 'MAIN-WORKER', 'All workers destroyed. Run complete.', 'warn') + resolve() } - // FIXED: Proper type checking before calling send - if (worker.send && typeof worker.send === 'function') { - worker.send({ chunk }) - } else { - log('main', 'MAIN-PRIMARY', `ERROR: Worker ${i} does not have a send function!`, 'error') + for (let i = 0; i < workerCount; i++) { + const worker = cluster.fork() + const chunk = accountChunks[i] || [] + + // Validate chunk has accounts + if (chunk.length === 0) { + log('main', 'MAIN-PRIMARY', `Warning: Worker ${i} received empty account chunk`, 'warn') + } + + // Store chunk mapping for crash recovery + if (worker.id) { + workerChunkMap.set(worker.id, chunk) + } + + // FIXED: Proper type checking before calling send + if (worker.send && typeof worker.send === 'function') { + worker.send({ chunk }) + } else { + log('main', 'MAIN-PRIMARY', `ERROR: Worker ${i} does not have a send function!`, 'error') + } + worker.on('message', (msg: unknown) => { + // IMPROVED: Using type-safe interface and type guard + if (isWorkerMessage(msg)) { + this.accountSummaries.push(...msg.data) + } + }) } - worker.on('message', (msg: unknown) => { - // IMPROVED: Using type-safe interface and type guard - if (isWorkerMessage(msg)) { - this.accountSummaries.push(...msg.data) + + cluster.on('exit', (worker: Worker, code: number) => { + this.activeWorkers -= 1 + + log('main', 'MAIN-WORKER', `Worker ${worker.process.pid} destroyed | Code: ${code} | Active workers: ${this.activeWorkers}`, 'warn') + + // Optional: restart crashed worker (basic heuristic) if crashRecovery allows + const cr = this.config.crashRecovery + if (cr?.restartFailedWorker && code !== 0 && worker.id) { + const attempts = (worker as { _restartAttempts?: number })._restartAttempts || 0 + if (attempts < (cr.restartFailedWorkerAttempts ?? 1)) { + (worker as { _restartAttempts?: number })._restartAttempts = attempts + 1 + log('main', 'CRASH-RECOVERY', `Respawning worker (attempt ${attempts + 1})`, 'warn') + + const originalChunk = workerChunkMap.get(worker.id) + const newW = cluster.fork() + + if (originalChunk && originalChunk.length > 0 && newW.id) { + (newW as { send?: (m: { chunk: Account[] }) => void }).send?.({ chunk: originalChunk }) + workerChunkMap.set(newW.id, originalChunk) + workerChunkMap.delete(worker.id) + log('main', 'CRASH-RECOVERY', `Assigned ${originalChunk.length} account(s) to respawned worker`) + } else { + log('main', 'CRASH-RECOVERY', 'Warning: Could not reassign accounts to respawned worker', 'warn') + } + + newW.on('message', (msg: unknown) => { + // IMPROVED: Using type-safe interface and type guard + if (isWorkerMessage(msg)) { + this.accountSummaries.push(...msg.data) + } + }) + } + } + + // Check if all workers have exited + if (this.activeWorkers === 0) { + void finishRun() } }) - } - - cluster.on('exit', (worker: Worker, code: number) => { - this.activeWorkers -= 1 - - log('main', 'MAIN-WORKER', `Worker ${worker.process.pid} destroyed | Code: ${code} | Active workers: ${this.activeWorkers}`, 'warn') - - // Optional: restart crashed worker (basic heuristic) if crashRecovery allows - const cr = this.config.crashRecovery - if (cr?.restartFailedWorker && code !== 0 && worker.id) { - const attempts = (worker as { _restartAttempts?: number })._restartAttempts || 0 - if (attempts < (cr.restartFailedWorkerAttempts ?? 1)) { - (worker as { _restartAttempts?: number })._restartAttempts = attempts + 1 - log('main', 'CRASH-RECOVERY', `Respawning worker (attempt ${attempts + 1})`, 'warn') - - const originalChunk = workerChunkMap.get(worker.id) - const newW = cluster.fork() - - if (originalChunk && originalChunk.length > 0 && newW.id) { - (newW as { send?: (m: { chunk: Account[] }) => void }).send?.({ chunk: originalChunk }) - workerChunkMap.set(newW.id, originalChunk) - workerChunkMap.delete(worker.id) - log('main', 'CRASH-RECOVERY', `Assigned ${originalChunk.length} account(s) to respawned worker`) - } else { - log('main', 'CRASH-RECOVERY', 'Warning: Could not reassign accounts to respawned worker', 'warn') - } - - newW.on('message', (msg: unknown) => { - // IMPROVED: Using type-safe interface and type guard - if (isWorkerMessage(msg)) { - this.accountSummaries.push(...msg.data) - } - }) - } - } - - // Check if all workers have exited - if (this.activeWorkers === 0) { - // All workers done -> send conclusion and exit (update check moved to startup) - (async () => { - try { - await this.sendConclusion(this.accountSummaries) - } catch (e) { - log('main', 'CONCLUSION', `Failed to send conclusion: ${e instanceof Error ? e.message : String(e)}`, 'warn') - } - log('main', 'MAIN-WORKER', 'All workers destroyed. Exiting main process!', 'warn') - process.exit(0) - })() - } }) }