web/scheduler: worker pipeline sequencing, file exchange between workers

This commit is contained in:
wukko
2025-01-31 11:12:00 +06:00
parent d15f1ec8f2
commit 7caee22aee
5 changed files with 79 additions and 36 deletions

View File

@ -1,8 +1,8 @@
import RemuxWorker from "$lib/workers/remux?worker"; import RemuxWorker from "$lib/workers/remux?worker";
import FetchWorker from "$lib/workers/fetch?worker"; import FetchWorker from "$lib/workers/fetch?worker";
import { itemDone, itemError, queue } from "$lib/state/queen-bee/queue";
import { updateWorkerProgress } from "$lib/state/queen-bee/current-tasks"; import { updateWorkerProgress } from "$lib/state/queen-bee/current-tasks";
import { pipelineTaskDone, itemError, queue } from "$lib/state/queen-bee/queue";
import type { CobaltQueue } from "$lib/types/queue"; import type { CobaltQueue } from "$lib/types/queue";
import type { CobaltPipelineItem } from "$lib/types/workers"; import type { CobaltPipelineItem } from "$lib/types/workers";
@ -78,7 +78,7 @@ export const runRemuxWorker = async (workerId: string, parentId: string, file: F
if (eventData.render) { if (eventData.render) {
killWorker(worker, unsubscribe, startCheck); killWorker(worker, unsubscribe, startCheck);
return itemDone( return pipelineTaskDone(
parentId, parentId,
workerId, workerId,
new File([eventData.render], eventData.filename, { new File([eventData.render], eventData.filename, {
@ -124,7 +124,7 @@ export const runFetchWorker = async (workerId: string, parentId: string, url: st
if (eventData.file) { if (eventData.file) {
killWorker(worker, unsubscribe); killWorker(worker, unsubscribe);
return itemDone( return pipelineTaskDone(
parentId, parentId,
workerId, workerId,
eventData.file, eventData.file,

View File

@ -1,41 +1,65 @@
import { get } from "svelte/store"; import { get } from "svelte/store";
import { itemRunning, queue } from "$lib/state/queen-bee/queue";
import { startWorker } from "$lib/queen-bee/run-worker"; import { startWorker } from "$lib/queen-bee/run-worker";
import { itemDone, itemError, itemRunning, queue } from "$lib/state/queen-bee/queue";
import { addWorkerToQueue, currentTasks } from "$lib/state/queen-bee/current-tasks"; import { addWorkerToQueue, currentTasks } from "$lib/state/queen-bee/current-tasks";
import type { CobaltPipelineItem } from "$lib/types/workers";
const startPipeline = (pipelineItem: CobaltPipelineItem) => {
addWorkerToQueue(pipelineItem.workerId, {
type: pipelineItem.worker,
parentId: pipelineItem.parentId,
});
itemRunning(
pipelineItem.parentId,
pipelineItem.workerId,
);
startWorker(pipelineItem);
}
export const checkTasks = () => { export const checkTasks = () => {
const queueItems = get(queue); const queueItems = get(queue);
const ongoingTasks = get(currentTasks) const ongoingTasks = get(currentTasks);
// TODO (?): task concurrency
if (Object.keys(ongoingTasks).length > 0) return; if (Object.keys(ongoingTasks).length > 0) return;
for (const item of Object.keys(queueItems)) { for (const item of Object.keys(queueItems)) {
const task = queueItems[item]; const task = queueItems[item];
if (task.state === "running") { if (task.state === "running") {
break; // if the running worker isn't completed and wait to be called again
} // (on worker completion)
if (!task.completedWorkers?.includes(task.runningWorker)) {
if (task.state === "waiting") {
for (let i = 0; i < task.pipeline.length; i++) {
// TODO: loop here and pass the file between pipelines
// or schedule several tasks one after another but within
// one parent & pipeline
const pipelineItem = task.pipeline[i];
addWorkerToQueue(pipelineItem.workerId, {
type: pipelineItem.worker,
parentId: task.id,
});
itemRunning(
task.id,
pipelineItem.workerId
);
startWorker(pipelineItem);
break; break;
} }
// if all workers are completed, then return the final file and go to next task
if (task.completedWorkers.length === task.pipeline.length) {
const finalFile = task.pipelineResults?.pop();
if (finalFile) {
itemDone(task.id, finalFile);
continue;
} else {
itemError(task.id, task.runningWorker, "no final file");
continue;
}
}
// if current worker is completed, but there are more workers,
// then start the next one and wait to be called again
for (let i = 0; i < task.pipeline.length; i++) {
if (!task.completedWorkers.includes(task.pipeline[i].workerId)) {
startPipeline(task.pipeline[i]);
break;
}
}
}
// start the nearest waiting task and wait to be called again
if (task.state === "waiting" && task.pipeline.length > 0) {
startPipeline(task.pipeline[0]);
break; break;
} }
} }

View File

@ -35,9 +35,13 @@ export function itemError(id: string, workerId: string, error: string) {
checkTasks(); checkTasks();
} }
export function itemDone(id: string, workerId: string, file: File) { export function itemDone(id: string, file: File) {
update(queueData => { update(queueData => {
if (queueData[id]) { if (queueData[id]) {
if (queueData[id].state === "running" && queueData[id].pipelineResults) {
delete queueData[id].pipelineResults;
}
queueData[id] = { queueData[id] = {
...queueData[id], ...queueData[id],
state: "done", state: "done",
@ -47,6 +51,18 @@ export function itemDone(id: string, workerId: string, file: File) {
return queueData; return queueData;
}); });
checkTasks();
}
export function pipelineTaskDone(id: string, workerId: string, file: File) {
update(queueData => {
if (queueData[id] && queueData[id].state === "running") {
queueData[id].pipelineResults = [...queueData[id].pipelineResults || [], file];
queueData[id].completedWorkers = [...queueData[id].completedWorkers || [], workerId];
}
return queueData;
});
removeWorkerFromQueue(workerId); removeWorkerFromQueue(workerId);
checkTasks(); checkTasks();
} }

View File

@ -18,6 +18,8 @@ export type CobaltQueueItemWaiting = CobaltQueueBaseItem & {
export type CobaltQueueItemRunning = CobaltQueueBaseItem & { export type CobaltQueueItemRunning = CobaltQueueBaseItem & {
state: "running", state: "running",
runningWorker: string, runningWorker: string,
completedWorkers?: string[],
pipelineResults?: File[],
}; };
export type CobaltQueueItemDone = CobaltQueueBaseItem & { export type CobaltQueueItemDone = CobaltQueueBaseItem & {

View File

@ -1,13 +1,14 @@
import { defineConfig, searchForWorkspaceRoot, type PluginOption } from "vite";
import { sveltekit } from "@sveltejs/kit/vite";
import basicSSL from "@vitejs/plugin-basic-ssl";
import { glob } from "glob";
import mime from "mime"; import mime from "mime";
import { createSitemap } from 'svelte-sitemap/src/index' import basicSSL from "@vitejs/plugin-basic-ssl";
import { glob } from "glob";
import { sveltekit } from "@sveltejs/kit/vite";
import { createSitemap } from "svelte-sitemap/src/index";
import { defineConfig, searchForWorkspaceRoot, type PluginOption } from "vite";
import { cp, readdir, mkdir } from "node:fs/promises";
import { createReadStream } from "node:fs";
import { join, basename } from "node:path"; import { join, basename } from "node:path";
import { createReadStream } from "node:fs";
import { cp, readdir, mkdir } from "node:fs/promises";
const exposeLibAV: PluginOption = (() => { const exposeLibAV: PluginOption = (() => {
const IMPUT_MODULE_DIR = join(__dirname, 'node_modules/@imput'); const IMPUT_MODULE_DIR = join(__dirname, 'node_modules/@imput');
@ -20,7 +21,7 @@ const exposeLibAV: PluginOption = (() => {
const filename = basename(req.url).split('?')[0]; const filename = basename(req.url).split('?')[0];
if (!filename) return next(); if (!filename) return next();
const [ file ] = await glob(join(IMPUT_MODULE_DIR, '/**/dist/', filename)); const [file] = await glob(join(IMPUT_MODULE_DIR, '/**/dist/', filename));
if (!file) return next(); if (!file) return next();
const fileType = mime.getType(filename); const fileType = mime.getType(filename);
@ -114,6 +115,6 @@ export default defineConfig({
proxy: {} proxy: {}
}, },
optimizeDeps: { optimizeDeps: {
exclude: [ "@imput/libav.js-remux-cli" ] exclude: ["@imput/libav.js-remux-cli"]
}, },
}); });