diff --git a/downloader.py b/downloader.py index aecb53c..71184e6 100644 --- a/downloader.py +++ b/downloader.py @@ -1,120 +1,62 @@ -import os, threading, requests -from tqdm import tqdm +import asyncio, aiohttp, os +import tqdm + +r_semaphore = asyncio.Semaphore(10) -class FileDownloader(): - """ - @source: https://gist.github.com/stefanfortuin/9dbfe8618701507d0ef2b5515b165c5f - """ - def __init__(self, max_threads=10): - print("> Threaded downloader using {} threads.".format( - str(max_threads))) - self.sema = threading.Semaphore(value=max_threads) - self.headers = { - 'user-agent': - 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/62.0.3202.94 Safari/537.36' - } - self.block_size = 1024 +# get content and write it to file +def write_to_file(filename, content): + f = open(filename, 'wb') + f.write(content) + f.close() - def t_getfile(self, link, filepath, filename, bar, session): - """ - Threaded function that uses a semaphore - to not instantiate too many threads - """ - self.sema.acquire() +async def get(*args, **kwargs): + response = await aiohttp.request('GET', *args, **kwargs) + return response - os.makedirs(os.path.dirname(filepath), exist_ok=True) - if not os.path.isfile(filepath): - headers = requests.head(link).headers - if 'content-length' not in headers: - print(f"server doesn't support content-length for {link}") - self.sema.release() - return +async def head(*args, **kwargs): + response = await aiohttp.request('HEAD', *args, **kwargs) + return response - total_bytes = int(requests.head(link).headers['content-length']) - if not bar: - bar = tqdm(total=total_bytes, - initial=0, - unit='B', - unit_scale=True, - desc=filename) - self.download_new_file(link, filename, filepath, total_bytes, bar, - session) +async def download_file(url, filepath, filename): + async with r_semaphore: + if os.path.isfile(filepath): + # already downloaded + pass else: - current_bytes = os.stat(filepath).st_size + async with aiohttp.request("GET", url, chunked=True) as media: + media_length = int(media.headers.get("content-length")) + if media.status == 200: + if os.path.isfile(filepath) and os.path.getsize( + filepath >= media_length): + # already downloaded + pass + else: + try: + with open(filepath, 'wb') as f: + async for chunk in media.content.iter_chunked( + 1024): + if chunk: + f.write(chunk) + f.close() + # success + except Exception as e: + raise e - headers = requests.head(link).headers - if 'content-length' not in headers: - print(f"server doesn't support content-length for {link}") - self.sema.release() - return - - total_bytes = int(requests.head(link).headers['content-length']) - if not bar: - bar = tqdm(total=total_bytes, - initial=current_bytes, - unit='B', - unit_scale=True, - desc=filename) - if current_bytes < total_bytes: - self.continue_file_download(link, filename, filepath, - current_bytes, total_bytes, bar) - else: - # print(f"already done: {filename}") - if bar.unit == "B": - bar.update(self.block_size) + if os.path.getsize(filepath) >= media_length: + pass + else: + print("Segment is corrupt") + elif media.status == 404: + print("404") else: - bar.update(1) + print("Error fetching segment") - self.sema.release() - def download_new_file(self, link, filename, filepath, total_bytes, bar, - session): - if session == None: - try: - request = requests.get(link, - headers=self.headers, - timeout=30, - stream=True) - self.write_file(request, filepath, 'wb', bar) - except requests.exceptions.RequestException as e: - print(e) - else: - request = session.get(link, stream=True) - self.write_file(request, filepath, 'wb', bar) - - def continue_file_download(self, link, filename, filepath, current_bytes, - total_bytes, bar): - range_header = self.headers.copy() - range_header['Range'] = f"bytes={current_bytes}-{total_bytes}" - - try: - request = requests.get(link, - headers=range_header, - timeout=30, - stream=True) - self.write_file(request, filepath, 'ab', bar) - except requests.exceptions.RequestException as e: - print(e) - - def write_file(self, content, filepath, writemode, bar): - with open(filepath, writemode) as f: - for chunk in content.iter_content(chunk_size=self.block_size): - if chunk: - f.write(chunk) - if bar.unit == "B": - bar.update(self.block_size) - - # print(f"completed file {filepath}", end='\n') - f.close() - bar.update(1) - - def get_file(self, link, path, filename, bar=None, session=None): - """ Downloads the file""" - thread = threading.Thread(target=self.t_getfile, - args=(link, path, filename, bar, session)) - thread.start() - return thread +@asyncio.coroutine +def wait_with_progressbar(coros): + for f in tqdm.tqdm(asyncio.as_completed(coros), total=len(coros)): + yield from f \ No newline at end of file diff --git a/main.py b/main.py index 427d301..10c5d87 100644 --- a/main.py +++ b/main.py @@ -1,11 +1,11 @@ -import os, requests, json, glob, argparse, sys, re +import os, requests, json, glob, argparse, sys, re, time, asyncio from sanitize_filename import sanitize from tqdm import tqdm from dotenv import load_dotenv from mpegdash.parser import MPEGDASHParser from utils import extract_kid from vtt_to_srt import convert -from downloader import FileDownloader +from downloader import download_file, wait_with_progressbar download_dir = os.path.join(os.getcwd(), "out_dir") working_dir = os.path.join(os.getcwd(), "working_dir") @@ -125,7 +125,7 @@ def cleanup(path): os.removedirs(path) -def mux_process(video_title, lecture_working_dir, outfile): +def mux_process(video_title, lecture_working_dir, output_path): """ @author Jayapraveen """ @@ -133,12 +133,12 @@ def mux_process(video_title, lecture_working_dir, outfile): command = "ffmpeg -y -i \"{}\" -i \"{}\" -acodec copy -vcodec copy -fflags +bitexact -map_metadata -1 -metadata title=\"{}\" \"{}\"".format( os.path.join(lecture_working_dir, "decrypted_audio.mp4"), os.path.join(lecture_working_dir, "decrypted_video.mp4"), - video_title, outfile) + video_title, output_path) else: command = "nice -n 7 ffmpeg -y -i \"{}\" -i \"{}\" -acodec copy -vcodec copy -fflags +bitexact -map_metadata -1 -metadata title=\"{}\" \"{}\"".format( os.path.join(lecture_working_dir, "decrypted_audio.mp4"), os.path.join(lecture_working_dir, "decrypted_video.mp4"), - video_title, outfile) + video_title, output_path) os.system(command) @@ -146,24 +146,26 @@ def decrypt(kid, filename, lecture_working_dir): """ @author Jayapraveen """ + print("> Decrypting, this might take a minute...") try: key = keyfile[kid.lower()] except KeyError: exit("Key not found") if (os.name == "nt"): - code = os.system("mp4decrypt --key 1:{0} \"{1}\" \"{2}\"".format( - key, - os.path.join(lecture_working_dir, - "encrypted_{0}.mp4".format(filename)), - os.path.join(lecture_working_dir, - "decrypted_{0}.mp4".format(filename)))) + code = os.system(f"mp4decrypt --key 1:%s \"%s\" \"%s\"" % + (key, + os.path.join(lecture_working_dir, + "encrypted_{}.mp4".format(filename)), + os.path.join(lecture_working_dir, + "decrypted_{}.mp4".format(filename)))) else: - os.system("nice -n 7 mp4decrypt --key 1:{0} \"{1}\" \"{2}\"".format( - key, - os.path.join(lecture_working_dir, - "encrypted_{0}.mp4".format(filename)), - os.path.join(lecture_working_dir, - "decrypted_{0}.mp4".format(filename)))) + os.system(f"nice -n 7 mp4decrypt --key 1:%s \"%s\" \"%s\"" % + (key, + os.path.join(lecture_working_dir, + "encrypted_{}.mp4".format(filename)), + os.path.join(lecture_working_dir, + "decrypted_{}.mp4".format(filename)))) + print("> Decryption complete") def handle_segments(media_info, video_title, lecture_working_dir, output_path): @@ -219,8 +221,8 @@ def handle_segments(media_info, video_title, lecture_working_dir, output_path): break -def handle_segments_threaded(media_info, video_title, lecture_working_dir, - output_path): +def handle_segments_async(media_info, video_title, lecture_working_dir, + output_path): """ @author Jayapraveen """ @@ -234,35 +236,42 @@ def handle_segments_threaded(media_info, video_title, lecture_working_dir, os.path.join(lecture_working_dir, "audio_0.seg.mp4")) print("KID for audio file is: " + audio_kid) - vbar = tqdm(total=no_segment, - initial=1, - unit='Video Segments', - desc=video_title + " (Video)") - abar = tqdm(total=no_segment, - initial=1, - unit='Audio Segments', - desc=video_title + " (Audio)") + # bar = tqdm(total=(no_segment * 2) - 2, + # initial=0, + # unit='Segments', + # desc=video_title) - threads = [] + urls = [] for count in range(1, no_segment): video_filename = f"video_{str(count)}.seg.{video_extension}" - video_path = os.path.join(lecture_working_dir, video_filename) - video_segment_url = video_url.replace("$Number$", str(count)) - video = downloader.get_file(video_segment_url, video_path, - video_filename, vbar) - threads.append(video) - - for count in range(1, no_segment): audio_filename = f"audio_{str(count)}.seg.{audio_extension}" + video_path = os.path.join(lecture_working_dir, video_filename) audio_path = os.path.join(lecture_working_dir, audio_filename) + video_segment_url = video_url.replace("$Number$", str(count)) audio_segment_url = audio_url.replace("$Number$", str(count)) - audio = downloader.get_file(audio_segment_url, audio_path, - audio_filename, abar) - threads.append(audio) - for x in threads: - x.join() + urls.append({ + "url": audio_segment_url, + "filename": audio_filename, + "path": audio_path + }) + urls.append({ + "url": video_segment_url, + "filename": video_filename, + "path": video_path + }) + + start_time = time.time() + eloop = asyncio.get_event_loop() + coroutines = [ + eloop.create_task( + download_file(obj.get("url"), obj.get("path"), + obj.get("filename"))) for obj in urls + ] + eloop.run_until_complete(wait_with_progressbar(coroutines)) + duration = time.time() - start_time + print(f"Downloaded {len(urls)} files in {duration} seconds") os.chdir(lecture_working_dir) if os.name == "nt": @@ -324,28 +333,29 @@ def manifest_parser(mpd_url, quality): else: repr = adapt_set.representations[-1] # Max Quality print("> Using max quality of %s" % repr.height) - segment_count = 0 + elif content_type == "audio/mp4": + repr = adapt_set.representations[-1] # Best - segment = repr.segment_templates[0] - timeline = segment.segment_timelines[0] - segment_count += len(timeline.Ss) - for s in timeline.Ss: - if s.r: - segment_count += s.r - - print("Expected No of segments:", segment_count) - if (content_type == "audio/mp4"): - segment_extension = segment.media.split(".")[-1] - audio.append(segment_count) - audio.append(segment.media) - audio.append(segment.initialization) - audio.append(segment_extension) - elif (content_type == "video/mp4"): - segment_extension = segment.media.split(".")[-1] - video.append(segment_count) - video.append(segment.media) - video.append(segment.initialization) - video.append(segment_extension) + for segment in repr.segment_templates: + segment_count = 1 + timeline = segment.segment_timelines[0] + segment_count += len(timeline.Ss) + for s in timeline.Ss: + if s.r: + segment_count += s.r + print("Expected No of segments:", segment_count) + if (content_type == "audio/mp4"): + segment_extension = segment.media.split(".")[-1] + audio.append(segment_count) + audio.append(segment.media) + audio.append(segment.initialization) + audio.append(segment_extension) + elif (content_type == "video/mp4"): + segment_extension = segment.media.split(".")[-1] + video.append(segment_count) + video.append(segment.media) + video.append(segment.initialization) + video.append(segment_extension) return video + audio @@ -382,7 +392,6 @@ def process_caption(caption, lecture_index, lecture_title, lecture_dir, - use_threaded_downloader, threads, tries=0): filename = f"%s. %s_%s.%s" % (lecture_index, sanitize(lecture_title), @@ -396,12 +405,7 @@ def process_caption(caption, else: print(f"> Downloading captions: '%s'" % filename) try: - if use_threaded_downloader: - thread = downloader.get_file(caption.get("url"), filepath, - filename) - thread.join() - else: - download(caption.get("url"), filepath, filename) + download(caption.get("url"), filepath, filename) except Exception as e: if tries >= 3: print( @@ -413,8 +417,7 @@ def process_caption(caption, f"> Error downloading captions: {e}. Will retry {3-tries} more times." ) process_caption(caption, lecture_index, lecture_title, - lecture_dir, use_threaded_downloader, threads, - tries + 1) + lecture_dir, tries + 1) if caption.get("ext") == "vtt": try: print("> Converting captions to SRT format...") @@ -425,9 +428,17 @@ def process_caption(caption, print(f"> Error converting captions: {e}") -def process_lecture(lecture, lecture_index, lecture_path, lecture_dir, quality, - skip_lectures, dl_assets, dl_captions, caption_locale, - use_threaded_downloader): +def process_lecture( + lecture, + lecture_index, + lecture_path, + lecture_dir, + quality, + skip_lectures, + dl_assets, + dl_captions, + caption_locale, +): lecture_title = lecture["title"] lecture_asset = lecture["asset"] if not skip_lectures: @@ -446,12 +457,7 @@ def process_lecture(lecture, lecture_index, lecture_path, lecture_dir, quality, if not os.path.isfile(lecture_path): try: - if use_threaded_downloader: - thread = downloader.get_file(lecture_url, lecture_path, - lecture_title) - thread.join() - else: - download(lecture_url, lecture_path, lecture_title) + download(lecture_url, lecture_path, lecture_title) except Exception as e: # We could add a retry here print(f"> Error downloading lecture: {e}. Skipping...") @@ -477,12 +483,8 @@ def process_lecture(lecture, lecture_index, lecture_path, lecture_dir, quality, lecture_title) return media_info = manifest_parser(mpd_url, quality) - if use_threaded_downloader: - handle_segments_threaded(media_info, lecture_title, - lecture_working_dir, lecture_path) - else: - handle_segments(media_info, lecture_title, - lecture_working_dir, lecture_path) + handle_segments(media_info, lecture_title, lecture_working_dir, + lecture_path) else: print("> Lecture '%s' is already downloaded, skipping..." % lecture_title) @@ -501,16 +503,9 @@ def process_lecture(lecture, lecture_index, lecture_path, lecture_dir, quality, if x["label"] == "download"), None) if download_url: try: - if use_threaded_downloader: - thread = downloader.get_file( - download_url, - os.path.join(lecture_dir, asset_filename), - asset_filename) - thread.join() - else: - download(download_url, - os.path.join(lecture_dir, asset_filename), - asset_filename) + download(download_url, + os.path.join(lecture_dir, asset_filename), + asset_filename) except Exception as e: print( f"> Error downloading lecture asset: {e}. Skipping" @@ -562,12 +557,16 @@ def process_lecture(lecture, lecture_index, lecture_path, lecture_dir, quality, }) for caption in captions: - process_caption(caption, lecture_index, lecture_title, lecture_dir, - use_threaded_downloader) + process_caption( + caption, + lecture_index, + lecture_title, + lecture_dir, + ) def parse(data, course_id, course_name, skip_lectures, dl_assets, dl_captions, - quality, caption_locale, use_threaded_downloader): + quality, caption_locale): course_dir = os.path.join(download_dir, course_name) if not os.path.exists(course_dir): os.mkdir(course_dir) @@ -599,7 +598,6 @@ def parse(data, course_id, course_name, skip_lectures, dl_assets, dl_captions, dl_assets, dl_captions, caption_locale, - use_threaded_downloader, ) for chapter in chapters: @@ -616,7 +614,7 @@ def parse(data, course_id, course_name, skip_lectures, dl_assets, dl_captions, sanitize(lecture["title"]))) process_lecture(lecture, lecture_index, lecture_path, chapter_dir, quality, skip_lectures, dl_assets, dl_captions, - caption_locale, use_threaded_downloader) + caption_locale) print("\n\n\n\n\n\n\n\n=====================") print("All downloads completed for course!") print("=====================") @@ -678,14 +676,6 @@ if __name__ == "__main__": type=int, help="Download specific video quality. (144, 360, 480, 720, 1080)", ) - parser.add_argument( - "-t", - "--threads", - dest="threads", - type=int, - help= - "Max number of threads to use when using the threaded downloader (default 10)", - ) parser.add_argument( "-l", "--lang", @@ -711,12 +701,6 @@ if __name__ == "__main__": action="store_true", help="If specified, captions will be downloaded.", ) - parser.add_argument( - "--use-threaded-downloader", - dest="use_threaded_downloader", - action="store_true", - help="If specified, the experimental threaded downloader will be used", - ) parser.add_argument( "-d", "--debug", @@ -733,8 +717,6 @@ if __name__ == "__main__": bearer_token = None portal_name = None course_name = None - use_threaded_downloader = False - threads = 10 args = parser.parse_args() if args.download_assets: @@ -751,11 +733,6 @@ if __name__ == "__main__": sys.exit(1) else: quality = args.quality - if args.use_threaded_downloader: - use_threaded_downloader = args.use_threaded_downloader - if args.threads: - threads = args.threads - downloader = FileDownloader(max_threads=threads) load_dotenv() if args.bearer_token: @@ -814,8 +791,7 @@ if __name__ == "__main__": course_data = json.loads(f.read()) parse(course_data["results"], course_id, course_name, skip_lectures, dl_assets, dl_captions, quality, - caption_locale, use_threaded_downloader) + caption_locale) else: parse(course_data["results"], course_id, course_name, skip_lectures, - dl_assets, dl_captions, quality, caption_locale, - use_threaded_downloader) + dl_assets, dl_captions, quality, caption_locale)