mirror of
https://github.com/wukko/cobalt.git
synced 2025-06-12 13:17:45 +02:00
api & web: merge base queue ui & api updates
This commit is contained in:
@ -8,18 +8,19 @@ import jwt from "../security/jwt.js";
|
||||
import stream from "../stream/stream.js";
|
||||
import match from "../processing/match.js";
|
||||
|
||||
import { env, isCluster, setTunnelPort } from "../config.js";
|
||||
import { env } from "../config.js";
|
||||
import { extract } from "../processing/url.js";
|
||||
import { Green, Bright, Cyan } from "../misc/console-text.js";
|
||||
import { Bright, Cyan } from "../misc/console-text.js";
|
||||
import { hashHmac } from "../security/secrets.js";
|
||||
import { createStore } from "../store/redis-ratelimit.js";
|
||||
import { randomizeCiphers } from "../misc/randomize-ciphers.js";
|
||||
import { verifyTurnstileToken } from "../security/turnstile.js";
|
||||
import { friendlyServiceName } from "../processing/service-alias.js";
|
||||
import { verifyStream, getInternalStream } from "../stream/manage.js";
|
||||
import { verifyStream } from "../stream/manage.js";
|
||||
import { createResponse, normalizeRequest, getIP } from "../processing/request.js";
|
||||
import * as APIKeys from "../security/api-keys.js";
|
||||
import * as Cookies from "../processing/cookie/manager.js";
|
||||
import { setupTunnelHandler } from "./itunnel.js";
|
||||
|
||||
const git = {
|
||||
branch: await getBranch(),
|
||||
@ -263,6 +264,15 @@ export const runAPI = async (express, app, __dirname, isPrimary = true) => {
|
||||
}
|
||||
})
|
||||
|
||||
app.use('/tunnel', cors({
|
||||
methods: ['GET'],
|
||||
exposedHeaders: [
|
||||
'Estimated-Content-Length',
|
||||
'Content-Disposition'
|
||||
],
|
||||
...corsConfig,
|
||||
}));
|
||||
|
||||
app.get('/tunnel', apiTunnelLimiter, async (req, res) => {
|
||||
const id = String(req.query.id);
|
||||
const exp = String(req.query.exp);
|
||||
@ -292,31 +302,7 @@ export const runAPI = async (express, app, __dirname, isPrimary = true) => {
|
||||
}
|
||||
|
||||
return stream(res, streamInfo);
|
||||
})
|
||||
|
||||
const itunnelHandler = (req, res) => {
|
||||
if (!req.ip.endsWith('127.0.0.1')) {
|
||||
return res.sendStatus(403);
|
||||
}
|
||||
|
||||
if (String(req.query.id).length !== 21) {
|
||||
return res.sendStatus(400);
|
||||
}
|
||||
|
||||
const streamInfo = getInternalStream(req.query.id);
|
||||
if (!streamInfo) {
|
||||
return res.sendStatus(404);
|
||||
}
|
||||
|
||||
streamInfo.headers = new Map([
|
||||
...(streamInfo.headers || []),
|
||||
...Object.entries(req.headers)
|
||||
]);
|
||||
|
||||
return stream(res, { type: 'internal', ...streamInfo });
|
||||
};
|
||||
|
||||
app.get('/itunnel', itunnelHandler);
|
||||
});
|
||||
|
||||
app.get('/', (_, res) => {
|
||||
res.type('json');
|
||||
@ -378,17 +364,5 @@ export const runAPI = async (express, app, __dirname, isPrimary = true) => {
|
||||
}
|
||||
});
|
||||
|
||||
if (isCluster) {
|
||||
const istreamer = express();
|
||||
istreamer.get('/itunnel', itunnelHandler);
|
||||
const server = istreamer.listen({
|
||||
port: 0,
|
||||
host: '127.0.0.1',
|
||||
exclusive: true
|
||||
}, () => {
|
||||
const { port } = server.address();
|
||||
console.log(`${Green('[✓]')} cobalt sub-instance running on 127.0.0.1:${port}`);
|
||||
setTunnelPort(port);
|
||||
});
|
||||
}
|
||||
setupTunnelHandler();
|
||||
}
|
||||
|
61
api/src/core/itunnel.js
Normal file
61
api/src/core/itunnel.js
Normal file
@ -0,0 +1,61 @@
|
||||
import stream from "../stream/stream.js";
|
||||
import { getInternalTunnel } from "../stream/manage.js";
|
||||
import { setTunnelPort } from "../config.js";
|
||||
import { Green } from "../misc/console-text.js";
|
||||
import express from "express";
|
||||
|
||||
const validateTunnel = (req, res) => {
|
||||
if (!req.ip.endsWith('127.0.0.1')) {
|
||||
res.sendStatus(403);
|
||||
return;
|
||||
}
|
||||
|
||||
if (String(req.query.id).length !== 21) {
|
||||
res.sendStatus(400);
|
||||
return;
|
||||
}
|
||||
|
||||
const streamInfo = getInternalTunnel(req.query.id);
|
||||
if (!streamInfo) {
|
||||
res.sendStatus(404);
|
||||
return;
|
||||
}
|
||||
|
||||
return streamInfo;
|
||||
}
|
||||
|
||||
const streamTunnel = (req, res) => {
|
||||
const streamInfo = validateTunnel(req, res);
|
||||
if (!streamInfo) {
|
||||
return;
|
||||
}
|
||||
|
||||
streamInfo.headers = new Map([
|
||||
...(streamInfo.headers || []),
|
||||
...Object.entries(req.headers)
|
||||
]);
|
||||
|
||||
return stream(res, { type: 'internal', ...streamInfo });
|
||||
}
|
||||
|
||||
export const setupTunnelHandler = () => {
|
||||
const tunnelHandler = express();
|
||||
|
||||
tunnelHandler.get('/itunnel', streamTunnel);
|
||||
|
||||
// fallback
|
||||
tunnelHandler.use((_, res) => res.sendStatus(400));
|
||||
// error handler
|
||||
tunnelHandler.use((_, __, res, ____) => res.socket.end());
|
||||
|
||||
|
||||
const server = tunnelHandler.listen({
|
||||
port: 0,
|
||||
host: '127.0.0.1',
|
||||
exclusive: true
|
||||
}, () => {
|
||||
const { port } = server.address();
|
||||
console.log(`${Green('[✓]')} internal tunnel handler running on 127.0.0.1:${port}`);
|
||||
setTunnelPort(port);
|
||||
});
|
||||
}
|
@ -58,7 +58,8 @@ async function com_download(id) {
|
||||
return {
|
||||
urls: [video.baseUrl, audio.baseUrl],
|
||||
audioFilename: `bilibili_${id}_audio`,
|
||||
filename: `bilibili_${id}_${video.width}x${video.height}.mp4`
|
||||
filename: `bilibili_${id}_${video.width}x${video.height}.mp4`,
|
||||
isHLS: true
|
||||
};
|
||||
}
|
||||
|
||||
|
@ -1,5 +1,6 @@
|
||||
import HLS from "hls-parser";
|
||||
import { createInternalStream } from "./manage.js";
|
||||
import { request } from "undici";
|
||||
|
||||
function getURL(url) {
|
||||
try {
|
||||
@ -55,8 +56,11 @@ function transformMediaPlaylist(streamInfo, hlsPlaylist) {
|
||||
|
||||
const HLS_MIME_TYPES = ["application/vnd.apple.mpegurl", "audio/mpegurl", "application/x-mpegURL"];
|
||||
|
||||
export function isHlsResponse (req) {
|
||||
return HLS_MIME_TYPES.includes(req.headers['content-type']);
|
||||
export function isHlsResponse(req, streamInfo) {
|
||||
return HLS_MIME_TYPES.includes(req.headers['content-type'])
|
||||
// bluesky's cdn responds with wrong content-type for the hls playlist,
|
||||
// so we enforce it here until they fix it
|
||||
|| (streamInfo.service === 'bsky' && streamInfo.url.endsWith('.m3u8'));
|
||||
}
|
||||
|
||||
export async function handleHlsPlaylist(streamInfo, req, res) {
|
||||
@ -71,3 +75,59 @@ export async function handleHlsPlaylist(streamInfo, req, res) {
|
||||
|
||||
res.send(hlsPlaylist);
|
||||
}
|
||||
|
||||
async function getSegmentSize(url, config) {
|
||||
const segmentResponse = await request(url, {
|
||||
...config,
|
||||
throwOnError: true
|
||||
});
|
||||
|
||||
if (segmentResponse.headers['content-length']) {
|
||||
segmentResponse.body.dump();
|
||||
return +segmentResponse.headers['content-length'];
|
||||
}
|
||||
|
||||
// if the response does not have a content-length
|
||||
// header, we have to compute it ourselves
|
||||
let size = 0;
|
||||
|
||||
for await (const data of segmentResponse.body) {
|
||||
size += data.length;
|
||||
}
|
||||
|
||||
return size;
|
||||
}
|
||||
|
||||
export async function probeInternalHLSTunnel(streamInfo) {
|
||||
const { url, headers, dispatcher, signal } = streamInfo;
|
||||
|
||||
// remove all falsy headers
|
||||
Object.keys(headers).forEach(key => {
|
||||
if (!headers[key]) delete headers[key];
|
||||
});
|
||||
|
||||
const config = { headers, dispatcher, signal, maxRedirections: 16 };
|
||||
|
||||
const manifestResponse = await fetch(url, config);
|
||||
|
||||
const manifest = HLS.parse(await manifestResponse.text());
|
||||
if (manifest.segments.length === 0)
|
||||
return -1;
|
||||
|
||||
const segmentSamples = await Promise.all(
|
||||
Array(5).fill().map(async () => {
|
||||
const manifestIdx = Math.floor(Math.random() * manifest.segments.length);
|
||||
const randomSegment = manifest.segments[manifestIdx];
|
||||
if (!randomSegment.uri)
|
||||
throw "segment is missing URI";
|
||||
|
||||
const segmentSize = await getSegmentSize(randomSegment.uri, config) / randomSegment.duration;
|
||||
return segmentSize;
|
||||
})
|
||||
);
|
||||
|
||||
const averageBitrate = segmentSamples.reduce((a, b) => a + b) / segmentSamples.length;
|
||||
const totalDuration = manifest.segments.reduce((acc, segment) => acc + segment.duration, 0);
|
||||
|
||||
return averageBitrate * totalDuration;
|
||||
}
|
||||
|
@ -1,7 +1,7 @@
|
||||
import { request } from "undici";
|
||||
import { Readable } from "node:stream";
|
||||
import { closeRequest, getHeaders, pipe } from "./shared.js";
|
||||
import { handleHlsPlaylist, isHlsResponse } from "./internal-hls.js";
|
||||
import { handleHlsPlaylist, isHlsResponse, probeInternalHLSTunnel } from "./internal-hls.js";
|
||||
|
||||
const CHUNK_SIZE = BigInt(8e6); // 8 MB
|
||||
const min = (a, b) => a < b ? a : b;
|
||||
@ -96,10 +96,7 @@ async function handleGenericStream(streamInfo, res) {
|
||||
res.status(fileResponse.statusCode);
|
||||
fileResponse.body.on('error', () => {});
|
||||
|
||||
// bluesky's cdn responds with wrong content-type for the hls playlist,
|
||||
// so we enforce it here until they fix it
|
||||
const isHls = isHlsResponse(fileResponse)
|
||||
|| (streamInfo.service === "bsky" && streamInfo.url.endsWith('.m3u8'));
|
||||
const isHls = isHlsResponse(fileResponse, streamInfo);
|
||||
|
||||
for (const [ name, value ] of Object.entries(fileResponse.headers)) {
|
||||
if (!isHls || name.toLowerCase() !== 'content-length') {
|
||||
@ -133,3 +130,40 @@ export function internalStream(streamInfo, res) {
|
||||
|
||||
return handleGenericStream(streamInfo, res);
|
||||
}
|
||||
|
||||
export async function probeInternalTunnel(streamInfo) {
|
||||
try {
|
||||
const signal = AbortSignal.timeout(3000);
|
||||
const headers = {
|
||||
...Object.fromEntries(streamInfo.headers || []),
|
||||
...getHeaders(streamInfo.service),
|
||||
host: undefined,
|
||||
range: undefined
|
||||
};
|
||||
|
||||
if (streamInfo.isHLS) {
|
||||
return probeInternalHLSTunnel({
|
||||
...streamInfo,
|
||||
signal,
|
||||
headers
|
||||
});
|
||||
}
|
||||
|
||||
const response = await request(streamInfo.url, {
|
||||
method: 'HEAD',
|
||||
headers,
|
||||
dispatcher: streamInfo.dispatcher,
|
||||
signal,
|
||||
maxRedirections: 16
|
||||
});
|
||||
|
||||
if (response.statusCode !== 200)
|
||||
throw "status is not 200 OK";
|
||||
|
||||
const size = +response.headers['content-length'];
|
||||
if (isNaN(size))
|
||||
throw "content-length is not a number";
|
||||
|
||||
return size;
|
||||
} catch {}
|
||||
}
|
||||
|
@ -68,10 +68,20 @@ export function createStream(obj) {
|
||||
return streamLink.toString();
|
||||
}
|
||||
|
||||
export function getInternalStream(id) {
|
||||
export function getInternalTunnel(id) {
|
||||
return internalStreamCache.get(id);
|
||||
}
|
||||
|
||||
export function getInternalTunnelFromURL(url) {
|
||||
url = new URL(url);
|
||||
if (url.hostname !== '127.0.0.1') {
|
||||
return;
|
||||
}
|
||||
|
||||
const id = url.searchParams.get('id');
|
||||
return getInternalTunnel(id);
|
||||
}
|
||||
|
||||
export function createInternalStream(url, obj = {}) {
|
||||
assert(typeof url === 'string');
|
||||
|
||||
@ -124,7 +134,7 @@ export function destroyInternalStream(url) {
|
||||
const id = url.searchParams.get('id');
|
||||
|
||||
if (internalStreamCache.has(id)) {
|
||||
closeRequest(getInternalStream(id)?.controller);
|
||||
closeRequest(getInternalTunnel(id)?.controller);
|
||||
internalStreamCache.delete(id);
|
||||
}
|
||||
}
|
||||
|
@ -1,5 +1,7 @@
|
||||
import { genericUserAgent } from "../config.js";
|
||||
import { vkClientAgent } from "../processing/services/vk.js";
|
||||
import { getInternalTunnelFromURL } from "./manage.js";
|
||||
import { probeInternalTunnel } from "./internal.js";
|
||||
|
||||
const defaultHeaders = {
|
||||
'user-agent': genericUserAgent
|
||||
@ -47,3 +49,40 @@ export function pipe(from, to, done) {
|
||||
|
||||
from.pipe(to);
|
||||
}
|
||||
|
||||
export async function estimateTunnelLength(streamInfo, multiplier = 1.1) {
|
||||
let urls = streamInfo.urls;
|
||||
if (!Array.isArray(urls)) {
|
||||
urls = [ urls ];
|
||||
}
|
||||
|
||||
const internalTunnels = urls.map(getInternalTunnelFromURL);
|
||||
if (internalTunnels.some(t => !t))
|
||||
return -1;
|
||||
|
||||
const sizes = await Promise.all(internalTunnels.map(probeInternalTunnel));
|
||||
const estimatedSize = sizes.reduce(
|
||||
// if one of the sizes is missing, let's just make a very
|
||||
// bold guess that it's the same size as the existing one
|
||||
(acc, cur) => cur <= 0 ? acc * 2 : acc + cur,
|
||||
0
|
||||
);
|
||||
|
||||
if (isNaN(estimatedSize) || estimatedSize <= 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
return Math.floor(estimatedSize * multiplier);
|
||||
}
|
||||
|
||||
export function estimateAudioMultiplier(streamInfo) {
|
||||
if (streamInfo.audioFormat === 'wav') {
|
||||
return 1411 / 128;
|
||||
}
|
||||
|
||||
if (streamInfo.audioCopy) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
return streamInfo.audioBitrate / 128;
|
||||
}
|
||||
|
@ -10,20 +10,20 @@ export default async function(res, streamInfo) {
|
||||
return await stream.proxy(streamInfo, res);
|
||||
|
||||
case "internal":
|
||||
return internalStream(streamInfo, res);
|
||||
return await internalStream(streamInfo, res);
|
||||
|
||||
case "merge":
|
||||
return stream.merge(streamInfo, res);
|
||||
return await stream.merge(streamInfo, res);
|
||||
|
||||
case "remux":
|
||||
case "mute":
|
||||
return stream.remux(streamInfo, res);
|
||||
return await stream.remux(streamInfo, res);
|
||||
|
||||
case "audio":
|
||||
return stream.convertAudio(streamInfo, res);
|
||||
return await stream.convertAudio(streamInfo, res);
|
||||
|
||||
case "gif":
|
||||
return stream.convertGif(streamInfo, res);
|
||||
return await stream.convertGif(streamInfo, res);
|
||||
}
|
||||
|
||||
closeResponse(res);
|
||||
|
@ -6,7 +6,7 @@ import { create as contentDisposition } from "content-disposition-header";
|
||||
import { env } from "../config.js";
|
||||
import { destroyInternalStream } from "./manage.js";
|
||||
import { hlsExceptions } from "../processing/service-config.js";
|
||||
import { getHeaders, closeRequest, closeResponse, pipe } from "./shared.js";
|
||||
import { getHeaders, closeRequest, closeResponse, pipe, estimateTunnelLength, estimateAudioMultiplier } from "./shared.js";
|
||||
|
||||
const ffmpegArgs = {
|
||||
webm: ["-c:v", "copy", "-c:a", "copy"],
|
||||
@ -29,7 +29,7 @@ const convertMetadataToFFmpeg = (metadata) => {
|
||||
|
||||
for (const [ name, value ] of Object.entries(metadata)) {
|
||||
if (metadataTags.includes(name)) {
|
||||
args.push('-metadata', `${name}=${value.replace(/[\u0000-\u0009]/g, "")}`);
|
||||
args.push('-metadata', `${name}=${value.replace(/[\u0000-\u0009]/g, "")}`); // skipcq: JS-0004
|
||||
} else {
|
||||
throw `${name} metadata tag is not supported.`;
|
||||
}
|
||||
@ -98,7 +98,7 @@ const proxy = async (streamInfo, res) => {
|
||||
}
|
||||
}
|
||||
|
||||
const merge = (streamInfo, res) => {
|
||||
const merge = async (streamInfo, res) => {
|
||||
let process;
|
||||
const shutdown = () => (
|
||||
killProcess(process),
|
||||
@ -112,7 +112,7 @@ const merge = (streamInfo, res) => {
|
||||
try {
|
||||
if (streamInfo.urls.length !== 2) return shutdown();
|
||||
|
||||
const format = streamInfo.filename.split('.')[streamInfo.filename.split('.').length - 1];
|
||||
const format = streamInfo.filename.split('.').pop();
|
||||
|
||||
let args = [
|
||||
'-loglevel', '-8',
|
||||
@ -152,6 +152,7 @@ const merge = (streamInfo, res) => {
|
||||
|
||||
res.setHeader('Connection', 'keep-alive');
|
||||
res.setHeader('Content-Disposition', contentDisposition(streamInfo.filename));
|
||||
res.setHeader('Estimated-Content-Length', await estimateTunnelLength(streamInfo));
|
||||
|
||||
pipe(muxOutput, res, shutdown);
|
||||
|
||||
@ -162,7 +163,7 @@ const merge = (streamInfo, res) => {
|
||||
}
|
||||
}
|
||||
|
||||
const remux = (streamInfo, res) => {
|
||||
const remux = async (streamInfo, res) => {
|
||||
let process;
|
||||
const shutdown = () => (
|
||||
killProcess(process),
|
||||
@ -196,7 +197,7 @@ const remux = (streamInfo, res) => {
|
||||
args.push('-bsf:a', 'aac_adtstoasc');
|
||||
}
|
||||
|
||||
let format = streamInfo.filename.split('.')[streamInfo.filename.split('.').length - 1];
|
||||
let format = streamInfo.filename.split('.').pop();
|
||||
if (format === "mp4") {
|
||||
args.push('-movflags', 'faststart+frag_keyframe+empty_moov')
|
||||
}
|
||||
@ -215,6 +216,7 @@ const remux = (streamInfo, res) => {
|
||||
|
||||
res.setHeader('Connection', 'keep-alive');
|
||||
res.setHeader('Content-Disposition', contentDisposition(streamInfo.filename));
|
||||
res.setHeader('Estimated-Content-Length', await estimateTunnelLength(streamInfo));
|
||||
|
||||
pipe(muxOutput, res, shutdown);
|
||||
|
||||
@ -225,7 +227,7 @@ const remux = (streamInfo, res) => {
|
||||
}
|
||||
}
|
||||
|
||||
const convertAudio = (streamInfo, res) => {
|
||||
const convertAudio = async (streamInfo, res) => {
|
||||
let process;
|
||||
const shutdown = () => (
|
||||
killProcess(process),
|
||||
@ -284,6 +286,13 @@ const convertAudio = (streamInfo, res) => {
|
||||
|
||||
res.setHeader('Connection', 'keep-alive');
|
||||
res.setHeader('Content-Disposition', contentDisposition(streamInfo.filename));
|
||||
res.setHeader(
|
||||
'Estimated-Content-Length',
|
||||
await estimateTunnelLength(
|
||||
streamInfo,
|
||||
estimateAudioMultiplier(streamInfo) * 1.1
|
||||
)
|
||||
);
|
||||
|
||||
pipe(muxOutput, res, shutdown);
|
||||
res.on('finish', shutdown);
|
||||
@ -292,7 +301,7 @@ const convertAudio = (streamInfo, res) => {
|
||||
}
|
||||
}
|
||||
|
||||
const convertGif = (streamInfo, res) => {
|
||||
const convertGif = async (streamInfo, res) => {
|
||||
let process;
|
||||
const shutdown = () => (killProcess(process), closeResponse(res));
|
||||
|
||||
@ -321,6 +330,7 @@ const convertGif = (streamInfo, res) => {
|
||||
|
||||
res.setHeader('Connection', 'keep-alive');
|
||||
res.setHeader('Content-Disposition', contentDisposition(streamInfo.filename));
|
||||
res.setHeader('Estimated-Content-Length', await estimateTunnelLength(streamInfo, 60));
|
||||
|
||||
pipe(muxOutput, res, shutdown);
|
||||
|
||||
|
Reference in New Issue
Block a user