mirror of
https://github.com/wukko/cobalt.git
synced 2025-04-29 22:14:26 +02:00
web/scheduler: break the queue loop when necessary
This commit is contained in:
parent
75e1fb689a
commit
f4f7032062
@ -1,7 +1,8 @@
|
|||||||
import { get } from "svelte/store";
|
import { get } from "svelte/store";
|
||||||
import { startWorker } from "$lib/queen-bee/run-worker";
|
import { startWorker } from "$lib/queen-bee/run-worker";
|
||||||
import { itemDone, itemError, itemRunning, queue } from "$lib/state/queen-bee/queue";
|
|
||||||
import { addWorkerToQueue, currentTasks } from "$lib/state/queen-bee/current-tasks";
|
import { addWorkerToQueue, currentTasks } from "$lib/state/queen-bee/current-tasks";
|
||||||
|
import { itemDone, itemError, itemRunning, queue } from "$lib/state/queen-bee/queue";
|
||||||
|
|
||||||
import type { CobaltPipelineItem } from "$lib/types/workers";
|
import type { CobaltPipelineItem } from "$lib/types/workers";
|
||||||
|
|
||||||
const startPipeline = (pipelineItem: CobaltPipelineItem) => {
|
const startPipeline = (pipelineItem: CobaltPipelineItem) => {
|
||||||
@ -32,7 +33,7 @@ export const schedule = () => {
|
|||||||
// if the running worker isn't completed, wait
|
// if the running worker isn't completed, wait
|
||||||
// to be called again on worker completion
|
// to be called again on worker completion
|
||||||
if (!task.completedWorkers.has(task.runningWorker)) {
|
if (!task.completedWorkers.has(task.runningWorker)) {
|
||||||
return;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
// if all workers are completed, then return the
|
// if all workers are completed, then return the
|
||||||
@ -54,13 +55,22 @@ export const schedule = () => {
|
|||||||
for (const worker of task.pipeline) {
|
for (const worker of task.pipeline) {
|
||||||
if (!task.completedWorkers.has(worker.workerId)) {
|
if (!task.completedWorkers.has(worker.workerId)) {
|
||||||
startPipeline(worker);
|
startPipeline(worker);
|
||||||
return;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// break because we don't want to start next tasks before this one is done
|
||||||
|
// it's necessary because some tasks might take some time before being marked as running
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
// start the nearest waiting task and wait to be called again
|
// start the nearest waiting task and wait to be called again
|
||||||
} else if (task.state === "waiting" && task.pipeline.length > 0) {
|
else if (task.state === "waiting" && task.pipeline.length > 0) {
|
||||||
startPipeline(task.pipeline[0]);
|
startPipeline(task.pipeline[0]);
|
||||||
|
|
||||||
|
// break because we don't want to start next tasks before this one is done
|
||||||
|
// it's necessary because some tasks might take some time before being marked as running
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user