diff --git a/web/src/lib/queen-bee/scheduler.ts b/web/src/lib/queen-bee/scheduler.ts index e97f8032..241bb56b 100644 --- a/web/src/lib/queen-bee/scheduler.ts +++ b/web/src/lib/queen-bee/scheduler.ts @@ -18,21 +18,21 @@ const startPipeline = (pipelineItem: CobaltPipelineItem) => { startWorker(pipelineItem); } -export const checkTasks = () => { +export const schedule = () => { const queueItems = get(queue); const ongoingTasks = get(currentTasks); // TODO (?): task concurrency - if (Object.keys(ongoingTasks).length > 0) return; - - for (const item of Object.keys(queueItems)) { - const task = queueItems[item]; + if (Object.keys(ongoingTasks).length > 0) { + return; + } + for (const task of Object.values(queueItems)) { if (task.state === "running") { // if the running worker isn't completed, wait // to be called again on worker completion if (!task.completedWorkers.has(task.runningWorker)) { - break; + return; } // if all workers are completed, then return the @@ -42,11 +42,11 @@ export const checkTasks = () => { if (finalFile) { itemDone(task.id, finalFile); - continue; } else { itemError(task.id, task.runningWorker, "no final file"); - continue; } + + continue; } // if current worker is completed, but there are more workers, @@ -54,17 +54,13 @@ export const checkTasks = () => { for (const worker of task.pipeline) { if (!task.completedWorkers.has(worker.workerId)) { startPipeline(worker); - break; + return; } } - break; - } - // start the nearest waiting task and wait to be called again - if (task.state === "waiting" && task.pipeline.length > 0) { + } else if (task.state === "waiting" && task.pipeline.length > 0) { startPipeline(task.pipeline[0]); - break; } } } diff --git a/web/src/lib/state/queen-bee/queue.ts b/web/src/lib/state/queen-bee/queue.ts index 4fb4dcf1..6d5a3742 100644 --- a/web/src/lib/state/queen-bee/queue.ts +++ b/web/src/lib/state/queen-bee/queue.ts @@ -1,6 +1,6 @@ import { readable, type Updater } from "svelte/store"; -import { checkTasks } from "$lib/queen-bee/scheduler"; +import { schedule } from "$lib/queen-bee/scheduler"; import { clearFileStorage, removeFromFileStorage } from "$lib/storage"; import { clearCurrentTasks, removeWorkerFromQueue } from "$lib/state/queen-bee/current-tasks"; @@ -34,7 +34,7 @@ export function addItem(item: CobaltQueueItem) { return queueData; }); - checkTasks(); + schedule(); } export function itemError(id: string, workerId: string, error: string) { @@ -52,7 +52,7 @@ export function itemError(id: string, workerId: string, error: string) { }); removeWorkerFromQueue(workerId); - checkTasks(); + schedule(); } export function itemDone(id: string, file: CobaltFileReference) { @@ -69,7 +69,7 @@ export function itemDone(id: string, file: CobaltFileReference) { return queueData; }); - checkTasks(); + schedule(); } export function pipelineTaskDone(id: string, workerId: string, file: CobaltFileReference) { @@ -85,7 +85,7 @@ export function pipelineTaskDone(id: string, workerId: string, file: CobaltFileR }); removeWorkerFromQueue(workerId); - checkTasks(); + schedule(); } export function itemRunning(id: string, workerId: string) { @@ -102,7 +102,7 @@ export function itemRunning(id: string, workerId: string) { return queueData; }); - checkTasks(); + schedule(); } export function removeItem(id: string) { @@ -118,7 +118,7 @@ export function removeItem(id: string) { return queueData; }); - checkTasks(); + schedule(); } export function clearQueue() {