Merge pull request #11 from BrentJMadison/cluster_issue

Cluster issue fix
This commit is contained in:
Bot
2025-11-14 23:59:01 +01:00
committed by GitHub

View File

@@ -206,8 +206,23 @@ export class MicrosoftRewardsBot {
if (this.config.clusters > 1) {
if (cluster.isPrimary) {
this.runMaster()
} else if (cluster.worker) {
await this.runWorker()
} else {
this.runWorker()
// Neither primary nor worker - something's wrong with clustering
log('main', 'MAIN', `ERROR: Cluster mode failed - neither primary nor worker! Falling back to single-process mode.`, 'error')
const passes = this.config.passesPerRun ?? 1
for (let pass = 1; pass <= passes; pass++) {
if (passes > 1) {
log('main', 'MAIN', `Starting pass ${pass}/${passes}`)
}
await this.runTasks(this.accounts, pass, passes)
if (pass < passes) {
log('main', 'MAIN', `Completed pass ${pass}/${passes}. Waiting before next pass...`)
await this.utils.wait(TIMEOUTS.ONE_MINUTE)
}
}
return
}
} else {
const passes = this.config.passesPerRun ?? 1
@@ -301,6 +316,8 @@ export class MicrosoftRewardsBot {
// FIXED: Proper type checking before calling send
if (worker.send && typeof worker.send === 'function') {
worker.send({ chunk })
} else {
log('main', 'MAIN-PRIMARY', `ERROR: Worker ${i} does not have a send function!`, 'error')
}
worker.on('message', (msg: unknown) => {
// IMPROVED: Using type-safe interface and type guard
@@ -360,22 +377,34 @@ export class MicrosoftRewardsBot {
})
}
private runWorker() {
private async runWorker() {
log('main', 'MAIN-WORKER', `Worker ${process.pid} spawned`)
// Receive the chunk of accounts from the master
; (process as unknown as { on: (ev: 'message', cb: (m: { chunk: Account[] }) => void) => void }).on('message', async ({ chunk }: { chunk: Account[] }) => {
const passes = this.config.passesPerRun ?? 1
for (let pass = 1; pass <= passes; pass++) {
if (passes > 1) {
log('main', 'MAIN-WORKER', `Starting pass ${pass}/${passes}`)
}
await this.runTasks(chunk, pass, passes)
if (pass < passes) {
log('main', 'MAIN-WORKER', `Completed pass ${pass}/${passes}. Waiting before next pass...`)
await this.utils.wait(TIMEOUTS.ONE_MINUTE)
}
}
})
// Wait for chunk (either already received during init, or will arrive soon)
const chunk = await new Promise<Account[]>((resolve) => {
if ((global as any).__workerChunk) {
resolve((global as any).__workerChunk)
} else {
(process as unknown as { on: (ev: 'message', cb: (m: { chunk: Account[] }) => void) => void }).on('message', ({ chunk: c }: { chunk: Account[] }) => resolve(c))
}
})
if (!chunk || chunk.length === 0) {
log('main', 'MAIN-WORKER', `ERROR: Worker ${process.pid} received empty or undefined chunk!`, 'error')
return
}
const passes = this.config.passesPerRun ?? 1
for (let pass = 1; pass <= passes; pass++) {
if (passes > 1) {
log('main', 'MAIN-WORKER', `Starting pass ${pass}/${passes}`)
}
await this.runTasks(chunk, pass, passes)
if (pass < passes) {
log('main', 'MAIN-WORKER', `Completed pass ${pass}/${passes}. Waiting before next pass...`)
await this.utils.wait(TIMEOUTS.ONE_MINUTE)
}
}
}
private async runTasks(accounts: Account[], currentPass: number = 1, totalPasses: number = 1) {
@@ -872,6 +901,15 @@ const shortErr = shortErrorMessage
const formatFullError = formatDetailedError
async function main(): Promise<void> {
// FIX: Set up message listener early to prevent race condition
// Workers initialize for ~2 seconds before reaching runWorker(), so messages
// sent by primary during initialization would be lost without this early listener
if (!cluster.isPrimary && cluster.worker) {
(process as unknown as { on: (ev: 'message', cb: (m: { chunk: Account[] }) => void) => void }).on('message', ({ chunk }: { chunk: Account[] }) => {
(global as any).__workerChunk = chunk
})
}
// Check for dashboard mode flag (standalone dashboard)
if (process.argv.includes('-dashboard')) {
const { startDashboardServer } = await import('./dashboard/server')