diff --git a/YouTubeMDBot/audio/ffmpeg.py b/YouTubeMDBot/audio/ffmpeg.py index 43a0b51..89d47e9 100755 --- a/YouTubeMDBot/audio/ffmpeg.py +++ b/YouTubeMDBot/audio/ffmpeg.py @@ -19,9 +19,9 @@ from subprocess import Popen from typing import List -from ..constants import FFMPEG_CONVERTER -from ..constants import FFMPEG_OPENER -from ..constants import FFMPEG_VOLUME +from .. import FFMPEG_CONVERTER +from .. import FFMPEG_OPENER +from .. import FFMPEG_VOLUME def ffmpeg_available() -> bool: diff --git a/YouTubeMDBot/database/psql.py b/YouTubeMDBot/database/psql.py index b669eda..80a3d45 100644 --- a/YouTubeMDBot/database/psql.py +++ b/YouTubeMDBot/database/psql.py @@ -26,7 +26,6 @@ import os import psycopg2 -from .. import CQueue from .. import DB_HOST from .. import DB_NAME from .. import DB_PASSWORD @@ -44,7 +43,7 @@ def __init__(self, self.statement = statement self.values = values self.returning_id = returning_id - self.return_value: Future = Future() + self.return_value = Future() class PostgreSQLItem: @@ -59,8 +58,6 @@ def __new__(cls, return cls.__instance def __init__(self, min_ops: int = 100, **kwargs): - print("init called") - print(f"Must init?: {self.must_initialize}") if self.must_initialize: self.connection = psycopg2.connect(user=DB_USER, password=DB_PASSWORD, @@ -69,7 +66,7 @@ def __init__(self, min_ops: int = 100, **kwargs): dbname=DB_NAME) self.min_ops = min_ops self.lock = Lock() - self.__close = False + self.close = False self.pending_ops = deque() self.waiting_ops = deque() self.updating_database = False @@ -114,9 +111,8 @@ def __iuhandler(self): self.iucond.wait_for( lambda: len(self.pending_ops) >= self.min_ops or self.close ) - print( - f"pending_ops {len(self.pending_ops)} >= min_ops {self.min_ops}") - print(hex(id(self.pending_ops))) + print(f"[iuhandler] - condition is true: pending ops: " + f"{len(self.pending_ops)} or closed: {self.close}") self.updating_database = True with self.connection.cursor() as cursor: while len(self.pending_ops) > 0: @@ -130,6 +126,7 @@ def __iuhandler(self): self.updating_database = False with self.qcond: self.qcond.notify_all() + print("iuhandler exited") def __qhandler(self): while not self.close: @@ -138,17 +135,17 @@ def __qhandler(self): lambda: len(self.waiting_ops) > 0 and not self.updating_database or self.close ) - print("qhandler - new item inserted") - print(hex(id(self.waiting_ops))) - print(hex(id(self.pending_ops))) + print(f"[qhandler] - condition is true: waiting ops: " + f"{len(self.waiting_ops)} and not {self.updating_database} " + f"or closed: {self.close}") while len(self.waiting_ops) > 0: query = self.waiting_ops.pop() if query is None: continue - print(f"inserting item: {query}") self.pending_ops.append(query) with self.iucond: self.iucond.notify_all() + print("qhandler exited") def insert(self, query: str, args=(), returning_id: bool = False) -> Query: if not self.close: @@ -195,7 +192,7 @@ def callproc(self, proc: str, args=()) -> list: cursor.callproc(proc, args) return cursor.fetchall() - def __del__(self): + def stop(self): print("deleting class") self.close = True print(f"is there any waiting operation? {len(self.waiting_ops) > 0}") @@ -211,9 +208,11 @@ def __del__(self): print("closing db connection") self.connection.close() - print("removing queues") - del self.waiting_ops - del self.pending_ops + def __del__(self): + try: + self.stop() + except Exception as e: + print(e) class PostgreSQLBase(ABC): diff --git a/YouTubeMDBot/downloader/youtube_downloader.py b/YouTubeMDBot/downloader/youtube_downloader.py index 8e0b050..de67799 100755 --- a/YouTubeMDBot/downloader/youtube_downloader.py +++ b/YouTubeMDBot/downloader/youtube_downloader.py @@ -88,12 +88,6 @@ def download(self) -> Tuple[BytesIO, bytes]: class MultipleYouTubeDownloader(ThreadPoolBase): - def __new__(cls, - max_processes: int = 4, - name: str = "YouTubeDownloader", - **kwargs): - return super().__new__(cls, max_processes, name, **kwargs) - def download(self, yt_obj: YouTubeDownloader) -> Tuple[BytesIO, bytes]: return super().wait_execute(yt_obj.download) diff --git a/YouTubeMDBot/multiprocess/abcprocess.py b/YouTubeMDBot/multiprocess/abcprocess.py index cf5a602..6fac15e 100644 --- a/YouTubeMDBot/multiprocess/abcprocess.py +++ b/YouTubeMDBot/multiprocess/abcprocess.py @@ -26,20 +26,26 @@ class ThreadPoolBase(ABC): __instance = None - def __new__(cls, - max_processes: int = MAX_PROCESS, - name: str = "ThreadBase", - **kwargs): + def __new__(cls, **kwargs): if ThreadPoolBase.__instance is None: cls.__instance = object.__new__(cls) - cls.__instance.__pool = ThreadPool(processes=max_processes) - cls.__instance.__lock = Lock() - cls.__instance.__finished = False - cls.__instance.name = name - for key, value in kwargs.items(): - setattr(cls.__instance, key, value) + cls.__instance.must_initialize = True return cls.__instance + def __init__(self, + max_processes: int = MAX_PROCESS, + name: str = "ThreadBase", + **kwargs): + if self.must_initialize: + self.__pool = ThreadPool(processes=max_processes) + self.__lock = Lock() + self.finished = False + self.name = name + self.must_initialize = False + + for key, value in kwargs.items(): + setattr(self, key, value) + @property def finished(self) -> bool: with self.__lock: @@ -50,12 +56,6 @@ def finished(self, value: bool): with self.__lock: self.__finished = value - def __del__(self): - if not self.finished: - self.__pool.close() - self.__pool.join() - self.finished = True - def wait_execute(self, func: Callable, *args, **kwargs) -> Any: if not self.finished: return self.__pool.apply(func=func, args=args, kwds=kwargs) @@ -64,15 +64,26 @@ def wait_execute(self, func: Callable, *args, **kwargs) -> Any: def execute(self, func: Callable, - args=(), - kwds={}, callback: Callable[[Any], Any] = None, - err_callback: Callable[[Any], Any] = None): + err_callback: Callable[[Any], Any] = None, + *args, **kwargs): if not self.finished: return self.__pool.apply_async(func=func, args=args, - kwds=kwds, + kwds=kwargs, callback=callback, error_callback=err_callback) else: raise FinishedException(f"The thread pool {self.name} has finished") + + def close(self): + if not self.finished: + self.__pool.close() + self.__pool.join() + self.finished = True + + def __del__(self): + try: + self.close() + except Exception as e: + print(e) diff --git a/YouTubeMDBot/tests/converter.py b/YouTubeMDBot/tests/converter.py index b076c65..0cf6e43 100644 --- a/YouTubeMDBot/tests/converter.py +++ b/YouTubeMDBot/tests/converter.py @@ -4,12 +4,11 @@ from io import BytesIO from typing import Tuple -from YouTubeMDBot.tests.tagger import TaggerTest +# from YouTubeMDBot.tests.tagger import TaggerTest from YouTubeMDBot.audio import FFmpegMP3 from YouTubeMDBot.audio import FFmpegOGG - -class MyTestCase(TaggerTest): +"""class MyTestCase(TaggerTest): def find_metadata(self, future, downloader) -> Tuple[BytesIO, bytes, dict]: print(f"Running test: find_metadata in {__file__}") io, data, song_info = super().find_metadata(future, downloader) @@ -33,4 +32,4 @@ def find_metadata(self, future, downloader) -> Tuple[BytesIO, bytes, dict]: if __name__ == '__main__': - unittest.main() + unittest.main()""" diff --git a/YouTubeMDBot/tests/database.py b/YouTubeMDBot/tests/database.py index 137d192..e7b12ab 100644 --- a/YouTubeMDBot/tests/database.py +++ b/YouTubeMDBot/tests/database.py @@ -6,14 +6,10 @@ class DatabaseTesting(unittest.TestCase): def test_creation(self): - db_item = PostgreSQLItem(min_ops=3) + db_item = PostgreSQLItem() db = Initializer(db_item) db.init() - db_item2 = PostgreSQLItem() - db_item3 = PostgreSQLItem() - db_item4 = PostgreSQLItem() - user_db = UserDB(db_item) print(hex(id(db))) print(hex(id(user_db))) @@ -30,13 +26,15 @@ def test_creation(self): user_db.register_new_user(12333, "test", "test", "en") user_db.register_new_user(12344, "test", "test", "en") - time.sleep(1) + # time.sleep(1) + + # for uid in (12334, 12335, 12336, 12337, 12338, 12339, 12330, 12331, + # 12332, 12333, 12344): + # print(user_db.get_user_information(uid)) - for uid in (12334, 12335, 12336, 12337, 12338, 12339, 12330, 12331, - 12332, 12333, 12344): - print(user_db.get_user_information(uid)) + db_item.stop() - del db_item + # del db_item if __name__ == '__main__': diff --git a/YouTubeMDBot/tests/downloader.py b/YouTubeMDBot/tests/downloader.py index d7abc63..c49c308 100755 --- a/YouTubeMDBot/tests/downloader.py +++ b/YouTubeMDBot/tests/downloader.py @@ -40,7 +40,7 @@ def test_multithread_download(self): self._barrier.wait() - del ytdl + ytdl.close() def write_to_file(self, future, name: str): _, data = future.get() diff --git a/YouTubeMDBot/tests/identifier.py b/YouTubeMDBot/tests/identifier.py index f3b1471..7d084bd 100755 --- a/YouTubeMDBot/tests/identifier.py +++ b/YouTubeMDBot/tests/identifier.py @@ -2,7 +2,6 @@ import unittest from io import BytesIO from pprint import pprint -from time import sleep from time import time from typing import Tuple from threading import Barrier @@ -80,10 +79,8 @@ def test_multiple_download_identification(self): t6.start() self.barrier.wait() - pprint("Finished") - - del ytdl + ytdl.close() def find_metadata(self, future, downloader) -> Tuple[BytesIO, bytes, dict]: st_dl_t = time() diff --git a/YouTubeMDBot/tests/tagger.py b/YouTubeMDBot/tests/tagger.py index 2070d8d..d4352cf 100755 --- a/YouTubeMDBot/tests/tagger.py +++ b/YouTubeMDBot/tests/tagger.py @@ -9,7 +9,8 @@ from YouTubeMDBot.utils import youtube_utils -class TaggerTest(IdentifierTest): +"""class TaggerTest(IdentifierTest): + def find_metadata(self, future, downloader) -> Tuple[BytesIO, bytes, dict]: print(f"Running test: find_metadata in {__file__}") io, data, song_info = super().find_metadata(future, downloader) @@ -41,4 +42,4 @@ def find_metadata(self, future, downloader) -> Tuple[BytesIO, bytes, dict]: if __name__ == '__main__': - unittest.main() + unittest.main()"""