feat: moving sources to worker

This commit is contained in:
Chubby Granny Chaser
2025-10-18 14:07:44 +01:00
parent 393c55738c
commit c2273dbf71
6 changed files with 329 additions and 84 deletions

View File

@@ -192,83 +192,30 @@ export const addNewDownloads = async (
const batch = repacksSublevel.batch();
// Get title hash mapping and perform matching in worker thread
const titleHashMapping = await getTitleHashMapping();
let hashMatchCount = 0;
let fuzzyMatchCount = 0;
let noMatchCount = 0;
for (const download of downloads) {
let objectIds: string[] = [];
let usedHashMatch = false;
const { GameMatcherWorkerManager } = await import("@main/services");
const matchResult = await GameMatcherWorkerManager.matchDownloads(
downloads,
steamGames,
titleHashMapping
);
const titleHash = hashTitle(download.title);
const steamIdsFromHash = titleHashMapping[titleHash];
if (steamIdsFromHash && steamIdsFromHash.length > 0) {
hashMatchCount++;
usedHashMatch = true;
objectIds = steamIdsFromHash.map(String);
}
if (!usedHashMatch) {
let gamesInSteam: FormattedSteamGame[] = [];
const formattedTitle = formatRepackName(download.title);
if (formattedTitle && formattedTitle.length > 0) {
const [firstLetter] = formattedTitle;
const games = steamGames[firstLetter] || [];
gamesInSteam = games.filter((game) =>
formattedTitle.startsWith(game.formattedName)
);
if (gamesInSteam.length === 0) {
gamesInSteam = games.filter(
(game) =>
formattedTitle.includes(game.formattedName) ||
game.formattedName.includes(formattedTitle)
);
}
if (gamesInSteam.length === 0) {
for (const letter of Object.keys(steamGames)) {
const letterGames = steamGames[letter] || [];
const matches = letterGames.filter(
(game) =>
formattedTitle.includes(game.formattedName) ||
game.formattedName.includes(formattedTitle)
);
if (matches.length > 0) {
gamesInSteam = matches;
break;
}
}
}
if (gamesInSteam.length > 0) {
fuzzyMatchCount++;
objectIds = gamesInSteam.map((game) => String(game.id));
} else {
noMatchCount++;
}
} else {
noMatchCount++;
}
}
for (const id of objectIds) {
// Process matched results and write to database
for (const matchedDownload of matchResult.matchedDownloads) {
for (const id of matchedDownload.objectIds) {
objectIdsOnSource.add(id);
}
const repack = {
id: nextRepackId++,
objectIds: objectIds,
title: download.title,
uris: download.uris,
fileSize: download.fileSize,
objectIds: matchedDownload.objectIds,
title: matchedDownload.title,
uris: matchedDownload.uris,
fileSize: matchedDownload.fileSize,
repacker: downloadSource.name,
uploadDate: download.uploadDate,
uploadDate: matchedDownload.uploadDate,
downloadSourceId: downloadSource.id,
createdAt: now,
updatedAt: now,
@@ -280,7 +227,7 @@ export const addNewDownloads = async (
await batch.write();
logger.info(
`Matching stats for ${downloadSource.name}: Hash=${hashMatchCount}, Fuzzy=${fuzzyMatchCount}, None=${noMatchCount}`
`Matching stats for ${downloadSource.name}: Hash=${matchResult.stats.hashMatchCount}, Fuzzy=${matchResult.stats.fuzzyMatchCount}, None=${matchResult.stats.noMatchCount}`
);
const existingSource = await downloadSourcesSublevel.get(

View File

@@ -31,20 +31,10 @@ const syncDownloadSources = async (
downloadSources.push(source);
}
const existingRepacks: Array<{
id: number;
title: string;
uris: string[];
repacker: string;
fileSize: string | null;
objectIds: string[];
uploadDate: Date | string | null;
downloadSourceId: number;
createdAt: Date;
updatedAt: Date;
}> = [];
// Use a Set for O(1) lookups instead of O(n) with array.some()
const existingRepackTitles = new Set<string>();
for await (const [, repack] of repacksSublevel.iterator()) {
existingRepacks.push(repack);
existingRepackTitles.add(repack.title);
}
// Handle sources with missing fingerprints individually, don't delete all sources
@@ -77,9 +67,9 @@ const syncDownloadSources = async (
const source = downloadSourceSchema.parse(response.data);
const steamGames = await getSteamGames();
// O(1) lookup instead of O(n) - massive performance improvement
const repacks = source.downloads.filter(
(download) =>
!existingRepacks.some((repack) => repack.title === download.title)
(download) => !existingRepackTitles.has(download.title)
);
await downloadSourcesSublevel.put(`${downloadSource.id}`, {

View File

@@ -17,6 +17,7 @@ import {
Lock,
DeckyPlugin,
ResourceCache,
GameMatcherWorkerManager,
} from "@main/services";
export const loadState = async () => {
@@ -25,6 +26,9 @@ export const loadState = async () => {
ResourceCache.initialize();
await ResourceCache.updateResourcesOnStartup();
// Initialize game matcher worker thread
GameMatcherWorkerManager.initialize();
const userPreferences = await db.get<string, UserPreferences | null>(
levelKeys.userPreferences,
{

View File

@@ -0,0 +1,145 @@
import { Worker } from "worker_threads";
import workerPath from "../workers/game-matcher-worker?modulePath";
interface WorkerMessage {
id: string;
data: unknown;
}
interface WorkerResponse {
id: string;
success: boolean;
result?: unknown;
error?: string;
}
export type TitleHashMapping = Record<string, number[]>;
export type FormattedSteamGame = {
id: string;
name: string;
formattedName: string;
};
export type FormattedSteamGamesByLetter = Record<string, FormattedSteamGame[]>;
interface DownloadToMatch {
title: string;
uris: string[];
uploadDate: string;
fileSize: string;
}
interface MatchedDownload {
title: string;
uris: string[];
uploadDate: string;
fileSize: string;
objectIds: string[];
usedHashMatch: boolean;
}
interface MatchResponse {
matchedDownloads: MatchedDownload[];
stats: {
hashMatchCount: number;
fuzzyMatchCount: number;
noMatchCount: number;
};
}
export class GameMatcherWorkerManager {
private static worker: Worker | null = null;
private static messageId = 0;
private static pendingMessages = new Map<
string,
// eslint-disable-next-line @typescript-eslint/no-explicit-any
{ resolve: (value: any) => void; reject: (error: Error) => void }
>();
public static initialize() {
if (this.worker) {
return;
}
try {
console.log(
"[GameMatcherWorker] Initializing worker with path:",
workerPath
);
this.worker = new Worker(workerPath);
this.worker.on("message", (response: WorkerResponse) => {
const pending = this.pendingMessages.get(response.id);
if (pending) {
if (response.success) {
pending.resolve(response.result);
} else {
pending.reject(new Error(response.error || "Unknown error"));
}
this.pendingMessages.delete(response.id);
}
});
this.worker.on("error", (error) => {
console.error("[GameMatcherWorker] Worker error:", error);
for (const [id, pending] of this.pendingMessages.entries()) {
pending.reject(error);
this.pendingMessages.delete(id);
}
});
this.worker.on("exit", (code) => {
if (code !== 0) {
console.error(
`[GameMatcherWorker] Worker stopped with exit code ${code}`
);
}
this.worker = null;
for (const [id, pending] of this.pendingMessages.entries()) {
pending.reject(new Error("Worker exited unexpectedly"));
this.pendingMessages.delete(id);
}
});
console.log("[GameMatcherWorker] Worker initialized successfully");
} catch (error) {
console.error("[GameMatcherWorker] Failed to initialize worker:", error);
throw error;
}
}
private static sendMessage<T>(data: unknown): Promise<T> {
if (!this.worker) {
return Promise.reject(new Error("Worker not initialized"));
}
const id = `msg_${++this.messageId}`;
const message: WorkerMessage = { id, data };
return new Promise<T>((resolve, reject) => {
this.pendingMessages.set(id, { resolve, reject });
this.worker!.postMessage(message);
});
}
public static async matchDownloads(
downloads: DownloadToMatch[],
steamGames: FormattedSteamGamesByLetter,
titleHashMapping: TitleHashMapping
): Promise<MatchResponse> {
return this.sendMessage<MatchResponse>({
downloads,
steamGames,
titleHashMapping,
});
}
public static terminate() {
if (this.worker) {
this.worker.terminate();
this.worker = null;
this.pendingMessages.clear();
}
}
}

View File

@@ -19,3 +19,4 @@ export * from "./wine";
export * from "./lock";
export * from "./decky-plugin";
export * from "./resource-cache";
export * from "./game-matcher-worker-manager";

View File

@@ -0,0 +1,158 @@
import { parentPort } from "worker_threads";
import crypto from "node:crypto";
export type TitleHashMapping = Record<string, number[]>;
export type FormattedSteamGame = {
id: string;
name: string;
formattedName: string;
};
export type FormattedSteamGamesByLetter = Record<string, FormattedSteamGame[]>;
interface DownloadToMatch {
title: string;
uris: string[];
uploadDate: string;
fileSize: string;
}
interface MatchedDownload {
title: string;
uris: string[];
uploadDate: string;
fileSize: string;
objectIds: string[];
usedHashMatch: boolean;
}
interface MatchRequest {
downloads: DownloadToMatch[];
steamGames: FormattedSteamGamesByLetter;
titleHashMapping: TitleHashMapping;
}
interface MatchResponse {
matchedDownloads: MatchedDownload[];
stats: {
hashMatchCount: number;
fuzzyMatchCount: number;
noMatchCount: number;
};
}
const hashTitle = (title: string): string => {
return crypto.createHash("sha256").update(title).digest("hex");
};
const formatName = (name: string) => {
return name
.normalize("NFD")
.replaceAll(/[\u0300-\u036f]/g, "")
.toLowerCase()
.replaceAll(/[^a-z0-9]/g, "");
};
const formatRepackName = (name: string) => {
return formatName(name.replace("[DL]", ""));
};
const matchDownloads = (request: MatchRequest): MatchResponse => {
const { downloads, steamGames, titleHashMapping } = request;
const matchedDownloads: MatchedDownload[] = [];
let hashMatchCount = 0;
let fuzzyMatchCount = 0;
let noMatchCount = 0;
for (const download of downloads) {
let objectIds: string[] = [];
let usedHashMatch = false;
const titleHash = hashTitle(download.title);
const steamIdsFromHash = titleHashMapping[titleHash];
if (steamIdsFromHash && steamIdsFromHash.length > 0) {
hashMatchCount++;
usedHashMatch = true;
objectIds = steamIdsFromHash.map(String);
}
if (!usedHashMatch) {
let gamesInSteam: FormattedSteamGame[] = [];
const formattedTitle = formatRepackName(download.title);
if (formattedTitle && formattedTitle.length > 0) {
const [firstLetter] = formattedTitle;
const games = steamGames[firstLetter] || [];
gamesInSteam = games.filter((game) =>
formattedTitle.startsWith(game.formattedName)
);
if (gamesInSteam.length === 0) {
gamesInSteam = games.filter(
(game) =>
formattedTitle.includes(game.formattedName) ||
game.formattedName.includes(formattedTitle)
);
}
if (gamesInSteam.length === 0) {
for (const letter of Object.keys(steamGames)) {
const letterGames = steamGames[letter] || [];
const matches = letterGames.filter(
(game) =>
formattedTitle.includes(game.formattedName) ||
game.formattedName.includes(formattedTitle)
);
if (matches.length > 0) {
gamesInSteam = matches;
break;
}
}
}
if (gamesInSteam.length > 0) {
fuzzyMatchCount++;
objectIds = gamesInSteam.map((game) => String(game.id));
} else {
noMatchCount++;
}
} else {
noMatchCount++;
}
}
matchedDownloads.push({
...download,
objectIds,
usedHashMatch,
});
}
return {
matchedDownloads,
stats: {
hashMatchCount,
fuzzyMatchCount,
noMatchCount,
},
};
};
// Message handler
if (parentPort) {
parentPort.on("message", (message: { id: string; data: MatchRequest }) => {
try {
const result = matchDownloads(message.data);
parentPort!.postMessage({ id: message.id, success: true, result });
} catch (error) {
parentPort!.postMessage({
id: message.id,
success: false,
error: error instanceof Error ? error.message : String(error),
});
}
});
}