web/scheduler: rename function to schedule(), cleanup control flow

This commit is contained in:
jj 2025-03-29 14:17:04 +00:00
parent a183265838
commit 165fa65964
No known key found for this signature in database
2 changed files with 17 additions and 21 deletions

View File

@ -18,21 +18,21 @@ const startPipeline = (pipelineItem: CobaltPipelineItem) => {
startWorker(pipelineItem);
}
export const checkTasks = () => {
export const schedule = () => {
const queueItems = get(queue);
const ongoingTasks = get(currentTasks);
// TODO (?): task concurrency
if (Object.keys(ongoingTasks).length > 0) return;
for (const item of Object.keys(queueItems)) {
const task = queueItems[item];
if (Object.keys(ongoingTasks).length > 0) {
return;
}
for (const task of Object.values(queueItems)) {
if (task.state === "running") {
// if the running worker isn't completed, wait
// to be called again on worker completion
if (!task.completedWorkers.has(task.runningWorker)) {
break;
return;
}
// if all workers are completed, then return the
@ -42,11 +42,11 @@ export const checkTasks = () => {
if (finalFile) {
itemDone(task.id, finalFile);
continue;
} else {
itemError(task.id, task.runningWorker, "no final file");
continue;
}
continue;
}
// if current worker is completed, but there are more workers,
@ -54,17 +54,13 @@ export const checkTasks = () => {
for (const worker of task.pipeline) {
if (!task.completedWorkers.has(worker.workerId)) {
startPipeline(worker);
break;
return;
}
}
break;
}
// start the nearest waiting task and wait to be called again
if (task.state === "waiting" && task.pipeline.length > 0) {
} else if (task.state === "waiting" && task.pipeline.length > 0) {
startPipeline(task.pipeline[0]);
break;
}
}
}

View File

@ -1,6 +1,6 @@
import { readable, type Updater } from "svelte/store";
import { checkTasks } from "$lib/queen-bee/scheduler";
import { schedule } from "$lib/queen-bee/scheduler";
import { clearFileStorage, removeFromFileStorage } from "$lib/storage";
import { clearCurrentTasks, removeWorkerFromQueue } from "$lib/state/queen-bee/current-tasks";
@ -34,7 +34,7 @@ export function addItem(item: CobaltQueueItem) {
return queueData;
});
checkTasks();
schedule();
}
export function itemError(id: string, workerId: string, error: string) {
@ -52,7 +52,7 @@ export function itemError(id: string, workerId: string, error: string) {
});
removeWorkerFromQueue(workerId);
checkTasks();
schedule();
}
export function itemDone(id: string, file: CobaltFileReference) {
@ -69,7 +69,7 @@ export function itemDone(id: string, file: CobaltFileReference) {
return queueData;
});
checkTasks();
schedule();
}
export function pipelineTaskDone(id: string, workerId: string, file: CobaltFileReference) {
@ -85,7 +85,7 @@ export function pipelineTaskDone(id: string, workerId: string, file: CobaltFileR
});
removeWorkerFromQueue(workerId);
checkTasks();
schedule();
}
export function itemRunning(id: string, workerId: string) {
@ -102,7 +102,7 @@ export function itemRunning(id: string, workerId: string) {
return queueData;
});
checkTasks();
schedule();
}
export function removeItem(id: string) {
@ -118,7 +118,7 @@ export function removeItem(id: string) {
return queueData;
});
checkTasks();
schedule();
}
export function clearQueue() {