diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 7005c2c..de0f81b 100755 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -18,8 +18,11 @@ cache: before_script: - python -V # Print out python version for debugging - - apt update && apt upgrade -y + - apt update - apt install -y libchromaprint-tools ffmpeg --install-recommends + - git clone https://github.com/beetbox/pyacoustid.git && cd pyacoustid + - python setup.py install + - cd .. && rm -rf pyacoustid/ - pip install -r YouTubeMDBot/requirements.txt test:pylint: diff --git a/.idea/YouTubeMDBot.iml b/.idea/YouTubeMDBot.iml index 6711606..d0dcc75 100644 --- a/.idea/YouTubeMDBot.iml +++ b/.idea/YouTubeMDBot.iml @@ -2,7 +2,7 @@ - + diff --git a/.idea/misc.xml b/.idea/misc.xml index 8656114..9a549ad 100644 --- a/.idea/misc.xml +++ b/.idea/misc.xml @@ -3,5 +3,5 @@ - + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml index 94a25f7..97d245d 100644 --- a/.idea/vcs.xml +++ b/.idea/vcs.xml @@ -2,5 +2,6 @@ + \ No newline at end of file diff --git a/YouTubeMDBot/__init__.py b/YouTubeMDBot/__init__.py index 395373f..095653b 100755 --- a/YouTubeMDBot/__init__.py +++ b/YouTubeMDBot/__init__.py @@ -15,32 +15,23 @@ # along with this program. If not, see . from .api import YouTubeAPI from .api import YouTubeVideoData - -from .audio import FPCalc -from .audio import FFmpegOGG from .audio import FFmpegMP3 +from .audio import FFmpegOGG from .audio import FFmpegOpener +from .audio import FPCalc from .audio import ffmpeg_available - from .commands import StartHandler - from .constants import * - from .decorators import restricted from .decorators import send_action - +from .downloader import MultipleYouTubeDownloader from .downloader import YouTubeDownloader - from .errors import EmptyBodyError from .errors import FinishedException - from .logging_utils import LoggingHandler from .logging_utils import setup_logging - from .metadata import AudioMetadata from .metadata import MetadataIdentifier from .metadata import YouTubeMetadataIdentifier - +from .multiprocess import ThreadPoolBase from .utils import get_yt_video_id - -from .multiprocess import MultiprocessBase diff --git a/YouTubeMDBot/constants/app_constants.py b/YouTubeMDBot/constants/app_constants.py index 08d4edb..383f34c 100755 --- a/YouTubeMDBot/constants/app_constants.py +++ b/YouTubeMDBot/constants/app_constants.py @@ -47,8 +47,8 @@ MAX_PROCESS = cpu_count() # Database constants -DB_NAME = os.environ["DATABASE_NAME"] -DB_USER = os.environ["DATABASE_USER"] -DB_PASSWORD = os.environ["DATABASE_PASSWORD"] +DB_NAME = True # os.environ["DATABASE_NAME"] +DB_USER = True # os.environ["DATABASE_USER"] +DB_PASSWORD = True # os.environ["DATABASE_PASSWORD"] DB_HOST = "127.0.0.1" DB_PORT = 5432 diff --git a/YouTubeMDBot/database/psql.py b/YouTubeMDBot/database/psql.py index 10a427d..8c86398 100644 --- a/YouTubeMDBot/database/psql.py +++ b/YouTubeMDBot/database/psql.py @@ -20,13 +20,12 @@ from psycopg2.pool import PoolError from . import Query -from .. import MAX_PROCESS -from .. import DB_USER -from .. import DB_PORT -from .. import DB_PASSWORD -from .. import DB_NAME from .. import DB_HOST -from .. import MultiprocessBase +from .. import DB_NAME +from .. import DB_PASSWORD +from .. import DB_PORT +from .. import DB_USER +from .. import MAX_PROCESS class PostgreSQL(MultiprocessBase): diff --git a/YouTubeMDBot/downloader/__init__.py b/YouTubeMDBot/downloader/__init__.py index 89c3cee..abec918 100755 --- a/YouTubeMDBot/downloader/__init__.py +++ b/YouTubeMDBot/downloader/__init__.py @@ -13,4 +13,5 @@ # # You should have received a copy of the GNU General Public License # along with this program. If not, see . +from ..downloader.youtube_downloader import MultipleYouTubeDownloader from ..downloader.youtube_downloader import YouTubeDownloader diff --git a/YouTubeMDBot/downloader/youtube_downloader.py b/YouTubeMDBot/downloader/youtube_downloader.py index 69c8edb..af6d51b 100755 --- a/YouTubeMDBot/downloader/youtube_downloader.py +++ b/YouTubeMDBot/downloader/youtube_downloader.py @@ -14,8 +14,11 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . from io import BytesIO +from typing import Any +from typing import Callable from typing import Tuple +from .. import ThreadPoolBase from ..constants.app_constants import YDL_CLI_OPTIONS @@ -23,9 +26,11 @@ class YouTubeDownloader: """ Download a YouTube video directly into memory. """ + def __init__(self, url: str): """ - Creates the YouTubeDownloader object. Call "download" for obtaining the video. + Creates the YouTubeDownloader object. Call "download" for obtaining + the video. :param url: the video URL. """ self.__url: str = url @@ -56,3 +61,22 @@ def get_url(self) -> str: :return: str with the URL. """ return self.__url + + +class MultipleYouTubeDownloader(ThreadPoolBase): + def __new__(cls, + max_processes: int = 4, + name: str = "YouTubeDownloader", + **kwargs): + return super().__new__(cls, max_processes, name, **kwargs) + + def download(self, yt_obj: YouTubeDownloader) -> Tuple[BytesIO, bytes]: + return super().wait_execute(yt_obj.download) + + def download_async(self, + yt_obj: YouTubeDownloader, + callback: Callable[[Any], Any] = None, + error_callback: Callable[[Any], Any] = None): + return super().execute(yt_obj.download, + callback=callback, + err_callback=error_callback) diff --git a/YouTubeMDBot/multiprocess/__init__.py b/YouTubeMDBot/multiprocess/__init__.py index be565a8..c3ba667 100644 --- a/YouTubeMDBot/multiprocess/__init__.py +++ b/YouTubeMDBot/multiprocess/__init__.py @@ -13,4 +13,4 @@ # # You should have received a copy of the GNU General Public License # along with this program. If not, see . -from ..multiprocess.abcprocess import MultiprocessBase +from ..multiprocess.abcprocess import ThreadPoolBase diff --git a/YouTubeMDBot/multiprocess/abcprocess.py b/YouTubeMDBot/multiprocess/abcprocess.py index db26516..cf5a602 100644 --- a/YouTubeMDBot/multiprocess/abcprocess.py +++ b/YouTubeMDBot/multiprocess/abcprocess.py @@ -14,105 +14,65 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . from abc import ABC -from abc import abstractmethod - +from multiprocessing.pool import ThreadPool +from threading import Lock from typing import Any -from typing import Dict -from typing import Optional from typing import Callable -from queue import Queue - -from threading import Lock -from threading import Thread -from threading import Condition - -from .. import MAX_PROCESS from .. import FinishedException +from .. import MAX_PROCESS -class MultiprocessBase(ABC): +class ThreadPoolBase(ABC): __instance = None def __new__(cls, - maxsize: int = 0, max_processes: int = MAX_PROCESS, + name: str = "ThreadBase", **kwargs): - if MultiprocessBase.__instance is None: + if ThreadPoolBase.__instance is None: cls.__instance = object.__new__(cls) - cls.__instance.waiting_processes = Queue(maxsize) - cls.__instance.running_processes = 0 - cls.__instance.lock = Lock() + cls.__instance.__pool = ThreadPool(processes=max_processes) + cls.__instance.__lock = Lock() cls.__instance.__finished = False - cls.__instance.max_processes = max_processes - cls.__instance.__condition = Condition() - cls.__instance.queue_consumer = \ - Thread(target=cls.__instance.__consumer) - cls.__instance.queue_consumer.start() + cls.__instance.name = name for key, value in kwargs.items(): setattr(cls.__instance, key, value) return cls.__instance - def __process_ready(self) -> bool: - return self.running_processes < self.max_processes and not \ - self.waiting_processes.empty() or self.finished - - def __consumer(self): - while not self.finished: - with self.__condition: - self.__condition.wait_for(self.__process_ready) - if not self.finished: - process = self.waiting_processes.get() - self.__spawn(process) - return - - def __spawn(self, process: Dict[str, Any]): - connection = self.get_connection() - child_process = Thread(target=self.__run, - args=(process, connection,)) - child_process.start() - - @abstractmethod - def get_connection(self) -> Optional[Any]: - with self.lock: - if self.running_processes <= self.max_processes: - self.running_processes += 1 - return None - - @abstractmethod - def free_connection(self, connection): - with self.lock: - self.running_processes -= 1 - with self.__condition: - self.__condition.notify_all() - - def __run(self, *args) -> Optional[Any]: - fn = args[0]["fn"] - fn_args = args[0]["args"] - result = fn(*fn_args, args[1]) if args[1] is not None else fn(*fn_args) - self.free_connection(args[1]) - return result - - def new(self, fn: Callable, *args): - if not self.finished: - self.waiting_processes.put({"fn": fn, "args": args}) - with self.__condition: - self.__condition.notify_all() - else: - raise FinishedException("The process has finished") - @property def finished(self) -> bool: - with self.lock: + with self.__lock: return self.__finished @finished.setter def finished(self, value: bool): - with self.lock: + with self.__lock: self.__finished = value def __del__(self): if not self.finished: - while not self.waiting_processes.empty(): - continue + self.__pool.close() + self.__pool.join() self.finished = True + + def wait_execute(self, func: Callable, *args, **kwargs) -> Any: + if not self.finished: + return self.__pool.apply(func=func, args=args, kwds=kwargs) + else: + raise FinishedException(f"The thread pool {self.name} has finished") + + def execute(self, + func: Callable, + args=(), + kwds={}, + callback: Callable[[Any], Any] = None, + err_callback: Callable[[Any], Any] = None): + if not self.finished: + return self.__pool.apply_async(func=func, + args=args, + kwds=kwds, + callback=callback, + error_callback=err_callback) + else: + raise FinishedException(f"The thread pool {self.name} has finished") diff --git a/YouTubeMDBot/requirements.txt b/YouTubeMDBot/requirements.txt index a4bdddc..a0fb6bf 100755 --- a/YouTubeMDBot/requirements.txt +++ b/YouTubeMDBot/requirements.txt @@ -4,6 +4,5 @@ google-api-python-client musicbrainzngs ujson youtube_dl -pyacoustid python-telegram-bot -psycopg2 +psycopg2-binary diff --git a/YouTubeMDBot/tests/downloader.py b/YouTubeMDBot/tests/downloader.py index cb53f74..72b4b8f 100755 --- a/YouTubeMDBot/tests/downloader.py +++ b/YouTubeMDBot/tests/downloader.py @@ -2,6 +2,7 @@ import unittest from time import sleep +from YouTubeMDBot.downloader import MultipleYouTubeDownloader from YouTubeMDBot.downloader import YouTubeDownloader @@ -11,14 +12,25 @@ class DownloadTest(unittest.TestCase): _lock = threading.Lock() def test_multithread_download(self): - yt1 = YouTubeDownloader(url="https://www.youtube.com/watch?v=Inm-N5rLUSI") - yt2 = YouTubeDownloader(url="https://www.youtube.com/watch?v=-_ZwpOdXXcA") - yt3 = YouTubeDownloader(url="https://www.youtube.com/watch?v=WOGWZD5iT10") - yt4 = YouTubeDownloader(url="https://www.youtube.com/watch?v=9HfoNUjw5u8") - t1 = threading.Thread(target=self.write_to_file, args=(yt1, "v1.m4a",)) - t2 = threading.Thread(target=self.write_to_file, args=(yt2, "v2.m4a",)) - t3 = threading.Thread(target=self.write_to_file, args=(yt3, "v3.m4a",)) - t4 = threading.Thread(target=self.write_to_file, args=(yt4, "v4.m4a",)) + yt1 = YouTubeDownloader( + url="https://www.youtube.com/watch?v=Inm-N5rLUSI") + yt2 = YouTubeDownloader( + url="https://www.youtube.com/watch?v=-_ZwpOdXXcA") + yt3 = YouTubeDownloader( + url="https://www.youtube.com/watch?v=WOGWZD5iT10") + yt4 = YouTubeDownloader( + url="https://www.youtube.com/watch?v=9HfoNUjw5u8") + + ytdl = MultipleYouTubeDownloader() + ft1 = ytdl.download_async(yt1) + ft2 = ytdl.download_async(yt2) + ft3 = ytdl.download_async(yt3) + ft4 = ytdl.download_async(yt4) + + t1 = threading.Thread(target=self.write_to_file, args=(ft1, "v1.m4a",)) + t2 = threading.Thread(target=self.write_to_file, args=(ft2, "v2.m4a",)) + t3 = threading.Thread(target=self.write_to_file, args=(ft3, "v3.m4a",)) + t4 = threading.Thread(target=self.write_to_file, args=(ft4, "v4.m4a",)) self._max = 4 @@ -30,12 +42,14 @@ def test_multithread_download(self): while self._elements < self._max: sleep(1) + del ytdl + def barrier(self): with self._lock: self._elements += 1 - def write_to_file(self, yt: YouTubeDownloader, name: str): - _, data = yt.download() + def write_to_file(self, future, name: str): + _, data = future.get() print(name + " downloaded") with open(name, "wb") as f: f.write(data) diff --git a/YouTubeMDBot/tests/identifier.py b/YouTubeMDBot/tests/identifier.py index 747f4a0..bad5457 100755 --- a/YouTubeMDBot/tests/identifier.py +++ b/YouTubeMDBot/tests/identifier.py @@ -1,11 +1,12 @@ import threading import unittest +from io import BytesIO from pprint import pprint from time import sleep from time import time from typing import Tuple -from io import BytesIO +from YouTubeMDBot.downloader import MultipleYouTubeDownloader from YouTubeMDBot.downloader import YouTubeDownloader from YouTubeMDBot.metadata import YouTubeMetadataIdentifier @@ -22,7 +23,8 @@ def test_identification(self): audio, data = downloader.download() with open("hello.m4a", "wb") as song: song.write(data) - identifier = YouTubeMetadataIdentifier(audio=data, downloader=downloader) + identifier = YouTubeMetadataIdentifier(audio=data, + downloader=downloader) valid = identifier.identify_audio() assert valid @@ -36,19 +38,34 @@ def test_identification(self): cover.write(identifier.cover) def test_multiple_download_identification(self): - yt1 = YouTubeDownloader(url="https://www.youtube.com/watch?v=Inm-N5rLUSI") - yt2 = YouTubeDownloader(url="https://www.youtube.com/watch?v=-_ZwpOdXXcA") - yt3 = YouTubeDownloader(url="https://www.youtube.com/watch?v=WOGWZD5iT10") - yt4 = YouTubeDownloader(url="https://www.youtube.com/watch?v=GfKV9KaNJXc") - yt5 = YouTubeDownloader(url="https://www.youtube.com/watch?v=DiItGE3eAyQ") - yt6 = YouTubeDownloader(url="https://www.youtube.com/watch?v=GuZzuQvv7uc") - - t1 = threading.Thread(target=self.find_metadata, args=(yt1,)) - t2 = threading.Thread(target=self.find_metadata, args=(yt2,)) - t3 = threading.Thread(target=self.find_metadata, args=(yt3,)) - t4 = threading.Thread(target=self.find_metadata, args=(yt4,)) - t5 = threading.Thread(target=self.find_metadata, args=(yt5,)) - t6 = threading.Thread(target=self.find_metadata, args=(yt6,)) + yt1 = YouTubeDownloader( + url="https://www.youtube.com/watch?v=Inm-N5rLUSI") + yt2 = YouTubeDownloader( + url="https://www.youtube.com/watch?v=-_ZwpOdXXcA") + yt3 = YouTubeDownloader( + url="https://www.youtube.com/watch?v=WOGWZD5iT10") + yt4 = YouTubeDownloader( + url="https://www.youtube.com/watch?v=GfKV9KaNJXc") + yt5 = YouTubeDownloader( + url="https://www.youtube.com/watch?v=DiItGE3eAyQ") + yt6 = YouTubeDownloader( + url="https://www.youtube.com/watch?v=GuZzuQvv7uc") + + ytdl = MultipleYouTubeDownloader() + + f1 = ytdl.download_async(yt1) + f2 = ytdl.download_async(yt2) + f3 = ytdl.download_async(yt3) + f4 = ytdl.download_async(yt4) + f5 = ytdl.download_async(yt5) + f6 = ytdl.download_async(yt6) + + t1 = threading.Thread(target=self.find_metadata, args=(f1, yt1,)) + t2 = threading.Thread(target=self.find_metadata, args=(f2, yt2,)) + t3 = threading.Thread(target=self.find_metadata, args=(f3, yt3,)) + t4 = threading.Thread(target=self.find_metadata, args=(f4, yt4,)) + t5 = threading.Thread(target=self.find_metadata, args=(f5, yt5,)) + t6 = threading.Thread(target=self.find_metadata, args=(f6, yt6,)) self.max = 6 @@ -62,9 +79,10 @@ def test_multiple_download_identification(self): while self.threads < self.max: sleep(1) - # pprint(self.song_info) pprint("Finished") + del ytdl + def barrier(self): with self.lock: self.threads += 1 @@ -73,33 +91,37 @@ def getThreads(self): with self.lock: return self.threads - def find_metadata(self, downloader: YouTubeDownloader) -> Tuple[BytesIO, bytes]: + def find_metadata(self, future, downloader) -> Tuple[BytesIO, bytes, dict]: st_dl_t = time() - io, data = downloader.download() + io, data = future.get() f_dl_t = time() - print("Downloaded {} - elapsed time: {:.1f}s".format(downloader.get_url(), - f_dl_t - st_dl_t)) - identifier = YouTubeMetadataIdentifier(audio=data, downloader=downloader) + print("Downloaded {} - elapsed time: {:.1f}s" + .format(downloader.get_url(), f_dl_t - st_dl_t)) + identifier = \ + YouTubeMetadataIdentifier(audio=data, downloader=downloader) valid = identifier.identify_audio() assert valid - self.song_info[downloader.get_url()] = { + song_info = {downloader.get_url(): { "title": identifier.title, "artist": identifier.artist, "cover": identifier.cover - } + }} if not identifier.youtube_data: - self.song_info[downloader.get_url()]["score"] = identifier.score - self.song_info[downloader.get_url()]["record_id"] = \ - "https://musicbrainz.org/recording/{0}".format(identifier.recording_id) - self.song_info[downloader.get_url()]["release_id"] = \ - "https://musicbrainz.org/release/{0}".format(identifier.release_id) - self.song_info[downloader.get_url()]["album"] = identifier.album + song_info[downloader.get_url()]["score"] = identifier.score + song_info[downloader.get_url()]["record_id"] = \ + "https://musicbrainz.org/recording/{0}".format( + identifier.recording_id) + song_info[downloader.get_url()]["release_id"] = \ + "https://musicbrainz.org/release/{0}".format( + identifier.release_id) + song_info[downloader.get_url()]["album"] = identifier.album else: - self.song_info[downloader.get_url()]["duration"] = identifier.duration - self.song_info[downloader.get_url()]["id"] = identifier.youtube_id - self.song_info[downloader.get_url()]["youtube_data"] = True + song_info[downloader.get_url()][ + "duration"] = identifier.duration + song_info[downloader.get_url()]["id"] = identifier.youtube_id + song_info[downloader.get_url()]["youtube_data"] = True self.barrier() - return io, data + return io, data, song_info if __name__ == '__main__': diff --git a/YouTubeMDBot/tests/multiprocess_tests.py b/YouTubeMDBot/tests/multiprocess_tests.py index 22c491c..b4c7176 100644 --- a/YouTubeMDBot/tests/multiprocess_tests.py +++ b/YouTubeMDBot/tests/multiprocess_tests.py @@ -14,19 +14,12 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . import os -import threading -from typing import Optional, Any, Callable +from .. import ThreadPoolBase -from .. import MultiprocessBase - -class MPTest(MultiprocessBase): - def get_connection(self) -> Optional[Any]: - return super().get_connection() - - def free_connection(self, connection): - super().free_connection(connection) +class MPTest(ThreadPoolBase): + pass def main(): @@ -53,13 +46,17 @@ def pinfo(x): for i in range(20): # print(f"Giving new function {i}") f = pinfo - test.new(f, i) + test.execute(f, i) - while not test.waiting_processes.empty(): - print(" ", end="\r") - print(f"Threads: {threading.active_count() - 2}", end="\r") - sleep(0.1) + # while not test.waiting_processes.empty(): + # print(" ", end="\r") + # print(f"Threads: {threading.active_count() - 2}", end="\r") + # sleep(0.1) # del test - test.finished = True + # test.finished = True + del test print(f"Main finished: {os.getpid()}") return + + +main() diff --git a/YouTubeMDBot/tests/tagger.py b/YouTubeMDBot/tests/tagger.py index 7d8ed02..1e5fdbe 100755 --- a/YouTubeMDBot/tests/tagger.py +++ b/YouTubeMDBot/tests/tagger.py @@ -1,34 +1,33 @@ import unittest -import mutagen - -from typing import Tuple from io import BytesIO +from typing import Tuple + +import mutagen -from YouTubeMDBot.tests.identifier import IdentifierTest -from YouTubeMDBot.downloader import YouTubeDownloader from YouTubeMDBot.metadata import AudioMetadata +from YouTubeMDBot.tests.identifier import IdentifierTest from YouTubeMDBot.utils import youtube_utils class TaggerTest(IdentifierTest): - def find_metadata(self, downloader: YouTubeDownloader) -> Tuple[BytesIO, bytes]: - io, data = super().find_metadata(downloader) + def find_metadata(self, future, downloader) -> Tuple[BytesIO, bytes, dict]: + io, data, song_info = super().find_metadata(future, downloader) tagger = AudioMetadata(io) url = downloader.get_url() - tagger.set_title(super().song_info[url]["title"]) - tagger.set_artist(super().song_info[url]["artist"]) - tagger.set_cover(super().song_info[url]["cover"]) + tagger.set_title(song_info[url]["title"]) + tagger.set_artist(song_info[url]["artist"]) + tagger.set_cover(song_info[url]["cover"]) extra = ["YouTube URL: " + url] - if not super().song_info[url].get("youtube_data"): - tagger.set_album(super().song_info[url]["album"]) - extra.append("MusicBrainz Record ID: " + super().song_info[url][ + if not song_info[url].get("youtube_data"): + tagger.set_album(song_info[url]["album"]) + extra.append("MusicBrainz Record ID: " + song_info[url][ "record_id"]) - extra.append("MusicBrainz Release ID: " + super().song_info[url][ + extra.append("MusicBrainz Release ID: " + song_info[url][ "release_id"]) tagger.set_extras(extra) else: - tagger.set_extras(["YouTube ID: {}".format(super().song_info[url]["id"])]) + tagger.set_extras(["YouTube ID: {}".format(song_info[url]["id"])]) yid = youtube_utils.get_yt_video_id(url) rs = tagger.save() rs.seek(0) @@ -37,7 +36,7 @@ def find_metadata(self, downloader: YouTubeDownloader) -> Tuple[BytesIO, bytes]: with open(yid + ".m4a", "wb") as f: f.write(rs.read()) rs.seek(0) - return rs, rs.read() + return rs, rs.read(), song_info if __name__ == '__main__':