diff --git a/web/src/lib/queen-bee/run-worker.ts b/web/src/lib/queen-bee/run-worker.ts index 7605d7ae..113ba4f6 100644 --- a/web/src/lib/queen-bee/run-worker.ts +++ b/web/src/lib/queen-bee/run-worker.ts @@ -1,8 +1,8 @@ import RemuxWorker from "$lib/workers/remux?worker"; import FetchWorker from "$lib/workers/fetch?worker"; -import { itemDone, itemError, queue } from "$lib/state/queen-bee/queue"; import { updateWorkerProgress } from "$lib/state/queen-bee/current-tasks"; +import { pipelineTaskDone, itemError, queue } from "$lib/state/queen-bee/queue"; import type { CobaltQueue } from "$lib/types/queue"; import type { CobaltPipelineItem } from "$lib/types/workers"; @@ -78,7 +78,7 @@ export const runRemuxWorker = async (workerId: string, parentId: string, file: F if (eventData.render) { killWorker(worker, unsubscribe, startCheck); - return itemDone( + return pipelineTaskDone( parentId, workerId, new File([eventData.render], eventData.filename, { @@ -124,7 +124,7 @@ export const runFetchWorker = async (workerId: string, parentId: string, url: st if (eventData.file) { killWorker(worker, unsubscribe); - return itemDone( + return pipelineTaskDone( parentId, workerId, eventData.file, diff --git a/web/src/lib/queen-bee/scheduler.ts b/web/src/lib/queen-bee/scheduler.ts index 58817d24..c018dc5b 100644 --- a/web/src/lib/queen-bee/scheduler.ts +++ b/web/src/lib/queen-bee/scheduler.ts @@ -1,41 +1,65 @@ import { get } from "svelte/store"; -import { itemRunning, queue } from "$lib/state/queen-bee/queue"; import { startWorker } from "$lib/queen-bee/run-worker"; +import { itemDone, itemError, itemRunning, queue } from "$lib/state/queen-bee/queue"; import { addWorkerToQueue, currentTasks } from "$lib/state/queen-bee/current-tasks"; +import type { CobaltPipelineItem } from "$lib/types/workers"; + +const startPipeline = (pipelineItem: CobaltPipelineItem) => { + addWorkerToQueue(pipelineItem.workerId, { + type: pipelineItem.worker, + parentId: pipelineItem.parentId, + }); + + itemRunning( + pipelineItem.parentId, + pipelineItem.workerId, + ); + + startWorker(pipelineItem); +} export const checkTasks = () => { const queueItems = get(queue); - const ongoingTasks = get(currentTasks) + const ongoingTasks = get(currentTasks); + // TODO (?): task concurrency if (Object.keys(ongoingTasks).length > 0) return; for (const item of Object.keys(queueItems)) { const task = queueItems[item]; if (task.state === "running") { - break; - } - - if (task.state === "waiting") { - for (let i = 0; i < task.pipeline.length; i++) { - // TODO: loop here and pass the file between pipelines - // or schedule several tasks one after another but within - // one parent & pipeline - const pipelineItem = task.pipeline[i]; - - addWorkerToQueue(pipelineItem.workerId, { - type: pipelineItem.worker, - parentId: task.id, - }); - - itemRunning( - task.id, - pipelineItem.workerId - ); - - startWorker(pipelineItem); + // if the running worker isn't completed and wait to be called again + // (on worker completion) + if (!task.completedWorkers?.includes(task.runningWorker)) { break; } + + // if all workers are completed, then return the final file and go to next task + if (task.completedWorkers.length === task.pipeline.length) { + const finalFile = task.pipelineResults?.pop(); + if (finalFile) { + itemDone(task.id, finalFile); + continue; + } else { + itemError(task.id, task.runningWorker, "no final file"); + continue; + } + } + + // if current worker is completed, but there are more workers, + // then start the next one and wait to be called again + for (let i = 0; i < task.pipeline.length; i++) { + if (!task.completedWorkers.includes(task.pipeline[i].workerId)) { + startPipeline(task.pipeline[i]); + break; + } + } + } + + // start the nearest waiting task and wait to be called again + if (task.state === "waiting" && task.pipeline.length > 0) { + startPipeline(task.pipeline[0]); break; } } diff --git a/web/src/lib/state/queen-bee/queue.ts b/web/src/lib/state/queen-bee/queue.ts index 8ccdc8c7..d2ca093c 100644 --- a/web/src/lib/state/queen-bee/queue.ts +++ b/web/src/lib/state/queen-bee/queue.ts @@ -35,9 +35,13 @@ export function itemError(id: string, workerId: string, error: string) { checkTasks(); } -export function itemDone(id: string, workerId: string, file: File) { +export function itemDone(id: string, file: File) { update(queueData => { if (queueData[id]) { + if (queueData[id].state === "running" && queueData[id].pipelineResults) { + delete queueData[id].pipelineResults; + } + queueData[id] = { ...queueData[id], state: "done", @@ -47,6 +51,18 @@ export function itemDone(id: string, workerId: string, file: File) { return queueData; }); + checkTasks(); +} + +export function pipelineTaskDone(id: string, workerId: string, file: File) { + update(queueData => { + if (queueData[id] && queueData[id].state === "running") { + queueData[id].pipelineResults = [...queueData[id].pipelineResults || [], file]; + queueData[id].completedWorkers = [...queueData[id].completedWorkers || [], workerId]; + } + return queueData; + }); + removeWorkerFromQueue(workerId); checkTasks(); } diff --git a/web/src/lib/types/queue.ts b/web/src/lib/types/queue.ts index 859afee7..f50093a4 100644 --- a/web/src/lib/types/queue.ts +++ b/web/src/lib/types/queue.ts @@ -18,6 +18,8 @@ export type CobaltQueueItemWaiting = CobaltQueueBaseItem & { export type CobaltQueueItemRunning = CobaltQueueBaseItem & { state: "running", runningWorker: string, + completedWorkers?: string[], + pipelineResults?: File[], }; export type CobaltQueueItemDone = CobaltQueueBaseItem & { diff --git a/web/vite.config.ts b/web/vite.config.ts index e37aebb2..831b9ba8 100644 --- a/web/vite.config.ts +++ b/web/vite.config.ts @@ -1,13 +1,14 @@ -import { defineConfig, searchForWorkspaceRoot, type PluginOption } from "vite"; -import { sveltekit } from "@sveltejs/kit/vite"; -import basicSSL from "@vitejs/plugin-basic-ssl"; -import { glob } from "glob"; import mime from "mime"; -import { createSitemap } from 'svelte-sitemap/src/index' +import basicSSL from "@vitejs/plugin-basic-ssl"; + +import { glob } from "glob"; +import { sveltekit } from "@sveltejs/kit/vite"; +import { createSitemap } from "svelte-sitemap/src/index"; +import { defineConfig, searchForWorkspaceRoot, type PluginOption } from "vite"; -import { cp, readdir, mkdir } from "node:fs/promises"; -import { createReadStream } from "node:fs"; import { join, basename } from "node:path"; +import { createReadStream } from "node:fs"; +import { cp, readdir, mkdir } from "node:fs/promises"; const exposeLibAV: PluginOption = (() => { const IMPUT_MODULE_DIR = join(__dirname, 'node_modules/@imput'); @@ -20,7 +21,7 @@ const exposeLibAV: PluginOption = (() => { const filename = basename(req.url).split('?')[0]; if (!filename) return next(); - const [ file ] = await glob(join(IMPUT_MODULE_DIR, '/**/dist/', filename)); + const [file] = await glob(join(IMPUT_MODULE_DIR, '/**/dist/', filename)); if (!file) return next(); const fileType = mime.getType(filename); @@ -114,6 +115,6 @@ export default defineConfig({ proxy: {} }, optimizeDeps: { - exclude: [ "@imput/libav.js-remux-cli" ] + exclude: ["@imput/libav.js-remux-cli"] }, });