web: very early implementation of a fetch worker

This commit is contained in:
wukko 2025-01-30 01:04:33 +06:00
parent affe49474d
commit 5d7724762d
No known key found for this signature in database
GPG Key ID: 3E30B3F26C7B4AA2
7 changed files with 166 additions and 9 deletions

View File

@ -6,5 +6,6 @@
"state.waiting": "queued",
"state.starting": "starting...",
"state.running.remux": "remuxing"
"state.running.remux": "remuxing",
"state.running.fetch": "downloading"
}

View File

@ -5,6 +5,7 @@ import { t } from "$lib/i18n/translations";
import { downloadFile } from "$lib/download";
import { createDialog } from "$lib/state/dialogs";
import { downloadButtonState } from "$lib/state/omnibox";
import { createSavePipeline } from "$lib/queen-bee/queue";
import type { DialogInfo } from "$lib/types/dialog";
@ -79,8 +80,11 @@ export const savingHandler = async (link: string) => {
}
if (response.status === "local-processing") {
// TODO: actual implementation
// TODO: remove debug logging
console.log(response);
downloadButtonState.set("done");
return createSavePipeline(response);
}
if (response.status === "picker") {

View File

@ -1,5 +1,6 @@
import { addItem } from "$lib/state/queen-bee/queue";
import type { CobaltPipelineItem } from "$lib/types/workers";
import type { CobaltLocalProcessingResponse } from "$lib/types/api";
export const getMediaType = (type: string) => {
const kind = type.split('/')[0];
@ -34,3 +35,27 @@ export const createRemuxPipeline = (file: File) => {
})
}
}
export const createSavePipeline = (info: CobaltLocalProcessingResponse) => {
const parentId = crypto.randomUUID();
const pipeline: CobaltPipelineItem[] = [];
for (const tunnel of info.tunnel) {
pipeline.push({
worker: "fetch",
workerId: crypto.randomUUID(),
parentId,
workerArgs: {
url: tunnel,
},
})
}
addItem({
id: parentId,
state: "waiting",
pipeline,
filename: info.filename,
mediaType: "video",
})
}

View File

@ -1,4 +1,5 @@
import RemuxWorker from "$lib/workers/remux?worker";
import FetchWorker from "$lib/workers/fetch?worker";
import { itemDone, itemError, queue } from "$lib/state/queen-bee/queue";
import { updateWorkerProgress } from "$lib/state/queen-bee/current-tasks";
@ -6,10 +7,10 @@ import { updateWorkerProgress } from "$lib/state/queen-bee/current-tasks";
import type { CobaltQueue } from "$lib/types/queue";
import type { CobaltPipelineItem } from "$lib/types/workers";
const killWorker = (worker: Worker, unsubscribe: () => void, interval: NodeJS.Timeout) => {
const killWorker = (worker: Worker, unsubscribe: () => void, interval?: NodeJS.Timeout) => {
unsubscribe();
worker.terminate();
clearInterval(interval);
if (interval) clearInterval(interval);
}
export const runRemuxWorker = async (workerId: string, parentId: string, file: File) => {
@ -89,10 +90,61 @@ export const runRemuxWorker = async (workerId: string, parentId: string, file: F
};
}
export const runFetchWorker = async (workerId: string, parentId: string, url: string) => {
const worker = new FetchWorker();
const unsubscribe = queue.subscribe((queue: CobaltQueue) => {
if (!queue[parentId]) {
// TODO: remove logging
console.log("worker's parent is gone, so it killed itself");
killWorker(worker, unsubscribe);
}
});
worker.postMessage({
cobaltFetchWorker: {
url
}
});
worker.onmessage = (event) => {
const eventData = event.data.cobaltFetchWorker;
if (!eventData) return;
if (eventData.progress) {
updateWorkerProgress(workerId, {
percentage: eventData.progress,
size: eventData.size,
})
}
if (eventData.file) {
killWorker(worker, unsubscribe);
return itemDone(
parentId,
workerId,
eventData.file,
);
}
if (eventData.error) {
killWorker(worker, unsubscribe);
return itemError(parentId, workerId, eventData.error);
}
}
}
export const startWorker = async ({ worker, workerId, parentId, workerArgs }: CobaltPipelineItem) => {
switch (worker) {
case "remux":
await runRemuxWorker(workerId, parentId, workerArgs.files[0]);
if (workerArgs?.files) {
await runRemuxWorker(workerId, parentId, workerArgs.files[0]);
}
break;
case "fetch":
if (workerArgs?.url) {
await runFetchWorker(workerId, parentId, workerArgs.url)
}
break;
}
}

View File

@ -41,14 +41,14 @@ type CobaltTunnelResponse = {
status: CobaltResponseType.Tunnel,
} & CobaltPartialURLResponse;
type CobaltLocalProcessingResponse = {
export type CobaltLocalProcessingResponse = {
status: CobaltResponseType.LocalProcessing,
tunnel: string[],
// TODO: proper type for processing types
type: string,
service: string,
filename?: string,
filename: string,
metadata?: {
album?: string,

View File

@ -1,6 +1,6 @@
export const resultFileTypes = ["video", "audio", "image"] as const;
export type CobaltWorkerType = "remux" | "removebg";
export type CobaltWorkerType = "remux" | "fetch";
export type CobaltPipelineResultFileType = typeof resultFileTypes[number];
export type CobaltWorkerProgress = {
@ -10,7 +10,8 @@ export type CobaltWorkerProgress = {
}
export type CobaltWorkerArgs = {
files: File[],
files?: File[],
url?: string,
//TODO: args for libav & etc with unique types
}

View File

@ -0,0 +1,74 @@
const error = (code: string) => {
// TODO: return proper errors and code here
self.postMessage({
cobaltFetchWorker: {
error: code,
}
})
};
const fetchFile = async (url: string) => {
try {
const response = await fetch(url);
if (!response.ok) {
error("file response wasn't ok");
return self.close();
}
const contentType = response.headers.get('Content-Type') || 'application/octet-stream';
const contentLength = response.headers.get('Content-Length');
const totalBytes = contentLength ? parseInt(contentLength, 10) : null;
const reader = response.body?.getReader();
if (!reader) {
error("no reader");
return self.close();
}
let receivedBytes = 0;
const chunks = [];
while (true) {
const { done, value } = await reader.read();
if (done) break;
receivedBytes += value.length;
chunks.push(value);
if (totalBytes) {
self.postMessage({
cobaltFetchWorker: {
progress: Math.round((receivedBytes / totalBytes) * 100),
size: receivedBytes,
}
});
}
}
if (receivedBytes === 0) {
error("tunnel is broken");
return self.close();
}
const file = new File(chunks, "file", { type: contentType });
self.postMessage({
cobaltFetchWorker: {
file
}
});
} catch (e) {
console.log(e)
error("error when downloading the file");
return self.close();
}
}
self.onmessage = async (event: MessageEvent) => {
if (event.data.cobaltFetchWorker) {
await fetchFile(event.data.cobaltFetchWorker.url);
self.close();
}
}