feat: Improve workflow management by making runMaster asynchronous and adding error handling

This commit is contained in:
2025-11-15 12:47:09 +01:00
parent cf0611d841
commit e7f8f7110c

View File

@@ -205,7 +205,7 @@ export class MicrosoftRewardsBot {
// Only cluster when there's more than 1 cluster demanded
if (this.config.clusters > 1) {
if (cluster.isPrimary) {
this.runMaster()
await this.runMaster()
} else if (cluster.worker) {
await this.runWorker()
} else {
@@ -279,101 +279,108 @@ export class MicrosoftRewardsBot {
return this.accountSummaries
}
private runMaster() {
log('main', 'MAIN-PRIMARY', 'Primary process started')
private runMaster(): Promise<void> {
return new Promise((resolve) => {
log('main', 'MAIN-PRIMARY', 'Primary process started')
const totalAccounts = this.accounts.length
const totalAccounts = this.accounts.length
// Validate accounts exist
if (totalAccounts === 0) {
log('main', 'MAIN-PRIMARY', 'No accounts found to process. Exiting.', 'warn')
process.exit(0)
}
// If user over-specified clusters (e.g. 10 clusters but only 2 accounts), don't spawn useless idle workers.
const workerCount = Math.min(this.config.clusters, totalAccounts)
const accountChunks = this.utils.chunkArray(this.accounts, workerCount)
// Reset activeWorkers to actual spawn count (constructor used raw clusters)
this.activeWorkers = workerCount
// Store worker-to-chunk mapping for crash recovery
const workerChunkMap = new Map<number, Account[]>()
for (let i = 0; i < workerCount; i++) {
const worker = cluster.fork()
const chunk = accountChunks[i] || []
// Validate chunk has accounts
if (chunk.length === 0) {
log('main', 'MAIN-PRIMARY', `Warning: Worker ${i} received empty account chunk`, 'warn')
// Validate accounts exist
if (totalAccounts === 0) {
log('main', 'MAIN-PRIMARY', 'No accounts found to process. Nothing to do.', 'warn')
resolve()
return
}
// Store chunk mapping for crash recovery
if (worker.id) {
workerChunkMap.set(worker.id, chunk)
// If user over-specified clusters (e.g. 10 clusters but only 2 accounts), don't spawn useless idle workers.
const workerCount = Math.min(this.config.clusters, totalAccounts)
const accountChunks = this.utils.chunkArray(this.accounts, workerCount)
// Reset activeWorkers to actual spawn count (constructor used raw clusters)
this.activeWorkers = workerCount
// Store worker-to-chunk mapping for crash recovery
const workerChunkMap = new Map<number, Account[]>()
let resolved = false
const finishRun = async () => {
if (resolved) return
resolved = true
try {
await this.sendConclusion(this.accountSummaries)
} catch (e) {
log('main', 'CONCLUSION', `Failed to send conclusion: ${e instanceof Error ? e.message : String(e)}`, 'warn')
}
log('main', 'MAIN-WORKER', 'All workers destroyed. Run complete.', 'warn')
resolve()
}
// FIXED: Proper type checking before calling send
if (worker.send && typeof worker.send === 'function') {
worker.send({ chunk })
} else {
log('main', 'MAIN-PRIMARY', `ERROR: Worker ${i} does not have a send function!`, 'error')
for (let i = 0; i < workerCount; i++) {
const worker = cluster.fork()
const chunk = accountChunks[i] || []
// Validate chunk has accounts
if (chunk.length === 0) {
log('main', 'MAIN-PRIMARY', `Warning: Worker ${i} received empty account chunk`, 'warn')
}
// Store chunk mapping for crash recovery
if (worker.id) {
workerChunkMap.set(worker.id, chunk)
}
// FIXED: Proper type checking before calling send
if (worker.send && typeof worker.send === 'function') {
worker.send({ chunk })
} else {
log('main', 'MAIN-PRIMARY', `ERROR: Worker ${i} does not have a send function!`, 'error')
}
worker.on('message', (msg: unknown) => {
// IMPROVED: Using type-safe interface and type guard
if (isWorkerMessage(msg)) {
this.accountSummaries.push(...msg.data)
}
})
}
worker.on('message', (msg: unknown) => {
// IMPROVED: Using type-safe interface and type guard
if (isWorkerMessage(msg)) {
this.accountSummaries.push(...msg.data)
cluster.on('exit', (worker: Worker, code: number) => {
this.activeWorkers -= 1
log('main', 'MAIN-WORKER', `Worker ${worker.process.pid} destroyed | Code: ${code} | Active workers: ${this.activeWorkers}`, 'warn')
// Optional: restart crashed worker (basic heuristic) if crashRecovery allows
const cr = this.config.crashRecovery
if (cr?.restartFailedWorker && code !== 0 && worker.id) {
const attempts = (worker as { _restartAttempts?: number })._restartAttempts || 0
if (attempts < (cr.restartFailedWorkerAttempts ?? 1)) {
(worker as { _restartAttempts?: number })._restartAttempts = attempts + 1
log('main', 'CRASH-RECOVERY', `Respawning worker (attempt ${attempts + 1})`, 'warn')
const originalChunk = workerChunkMap.get(worker.id)
const newW = cluster.fork()
if (originalChunk && originalChunk.length > 0 && newW.id) {
(newW as { send?: (m: { chunk: Account[] }) => void }).send?.({ chunk: originalChunk })
workerChunkMap.set(newW.id, originalChunk)
workerChunkMap.delete(worker.id)
log('main', 'CRASH-RECOVERY', `Assigned ${originalChunk.length} account(s) to respawned worker`)
} else {
log('main', 'CRASH-RECOVERY', 'Warning: Could not reassign accounts to respawned worker', 'warn')
}
newW.on('message', (msg: unknown) => {
// IMPROVED: Using type-safe interface and type guard
if (isWorkerMessage(msg)) {
this.accountSummaries.push(...msg.data)
}
})
}
}
// Check if all workers have exited
if (this.activeWorkers === 0) {
void finishRun()
}
})
}
cluster.on('exit', (worker: Worker, code: number) => {
this.activeWorkers -= 1
log('main', 'MAIN-WORKER', `Worker ${worker.process.pid} destroyed | Code: ${code} | Active workers: ${this.activeWorkers}`, 'warn')
// Optional: restart crashed worker (basic heuristic) if crashRecovery allows
const cr = this.config.crashRecovery
if (cr?.restartFailedWorker && code !== 0 && worker.id) {
const attempts = (worker as { _restartAttempts?: number })._restartAttempts || 0
if (attempts < (cr.restartFailedWorkerAttempts ?? 1)) {
(worker as { _restartAttempts?: number })._restartAttempts = attempts + 1
log('main', 'CRASH-RECOVERY', `Respawning worker (attempt ${attempts + 1})`, 'warn')
const originalChunk = workerChunkMap.get(worker.id)
const newW = cluster.fork()
if (originalChunk && originalChunk.length > 0 && newW.id) {
(newW as { send?: (m: { chunk: Account[] }) => void }).send?.({ chunk: originalChunk })
workerChunkMap.set(newW.id, originalChunk)
workerChunkMap.delete(worker.id)
log('main', 'CRASH-RECOVERY', `Assigned ${originalChunk.length} account(s) to respawned worker`)
} else {
log('main', 'CRASH-RECOVERY', 'Warning: Could not reassign accounts to respawned worker', 'warn')
}
newW.on('message', (msg: unknown) => {
// IMPROVED: Using type-safe interface and type guard
if (isWorkerMessage(msg)) {
this.accountSummaries.push(...msg.data)
}
})
}
}
// Check if all workers have exited
if (this.activeWorkers === 0) {
// All workers done -> send conclusion and exit (update check moved to startup)
(async () => {
try {
await this.sendConclusion(this.accountSummaries)
} catch (e) {
log('main', 'CONCLUSION', `Failed to send conclusion: ${e instanceof Error ? e.message : String(e)}`, 'warn')
}
log('main', 'MAIN-WORKER', 'All workers destroyed. Exiting main process!', 'warn')
process.exit(0)
})()
}
})
}