From 831d2a58b0f26a3c36d2bb9f9dcf21300c5ac78d Mon Sep 17 00:00:00 2001 From: Javinator9889 Date: Sun, 17 May 2020 13:36:04 +0200 Subject: [PATCH] Updated tests and close method Trying to do close operations inside the "__del__" method is dangerous as some items may be garbage collected, so this can lead into deadlocks and race conditions, even class not being properly deleted. This way, the deletion method was moved to "close" or "stop" method and "__del__" tries to call them safely wrapping inside a try/catch block. This behaviour may be removed in a future, making necessary to call the proper close method raising an exception if it was not called --- YouTubeMDBot/audio/ffmpeg.py | 6 +-- YouTubeMDBot/database/psql.py | 31 ++++++----- YouTubeMDBot/downloader/youtube_downloader.py | 6 --- YouTubeMDBot/multiprocess/abcprocess.py | 51 +++++++++++-------- YouTubeMDBot/tests/converter.py | 7 ++- YouTubeMDBot/tests/database.py | 18 +++---- YouTubeMDBot/tests/downloader.py | 2 +- YouTubeMDBot/tests/identifier.py | 5 +- YouTubeMDBot/tests/tagger.py | 5 +- 9 files changed, 65 insertions(+), 66 deletions(-) diff --git a/YouTubeMDBot/audio/ffmpeg.py b/YouTubeMDBot/audio/ffmpeg.py index 43a0b51..89d47e9 100755 --- a/YouTubeMDBot/audio/ffmpeg.py +++ b/YouTubeMDBot/audio/ffmpeg.py @@ -19,9 +19,9 @@ from subprocess import Popen from typing import List -from ..constants import FFMPEG_CONVERTER -from ..constants import FFMPEG_OPENER -from ..constants import FFMPEG_VOLUME +from .. import FFMPEG_CONVERTER +from .. import FFMPEG_OPENER +from .. import FFMPEG_VOLUME def ffmpeg_available() -> bool: diff --git a/YouTubeMDBot/database/psql.py b/YouTubeMDBot/database/psql.py index b669eda..80a3d45 100644 --- a/YouTubeMDBot/database/psql.py +++ b/YouTubeMDBot/database/psql.py @@ -26,7 +26,6 @@ import os import psycopg2 -from .. import CQueue from .. import DB_HOST from .. import DB_NAME from .. import DB_PASSWORD @@ -44,7 +43,7 @@ def __init__(self, self.statement = statement self.values = values self.returning_id = returning_id - self.return_value: Future = Future() + self.return_value = Future() class PostgreSQLItem: @@ -59,8 +58,6 @@ def __new__(cls, return cls.__instance def __init__(self, min_ops: int = 100, **kwargs): - print("init called") - print(f"Must init?: {self.must_initialize}") if self.must_initialize: self.connection = psycopg2.connect(user=DB_USER, password=DB_PASSWORD, @@ -69,7 +66,7 @@ def __init__(self, min_ops: int = 100, **kwargs): dbname=DB_NAME) self.min_ops = min_ops self.lock = Lock() - self.__close = False + self.close = False self.pending_ops = deque() self.waiting_ops = deque() self.updating_database = False @@ -114,9 +111,8 @@ def __iuhandler(self): self.iucond.wait_for( lambda: len(self.pending_ops) >= self.min_ops or self.close ) - print( - f"pending_ops {len(self.pending_ops)} >= min_ops {self.min_ops}") - print(hex(id(self.pending_ops))) + print(f"[iuhandler] - condition is true: pending ops: " + f"{len(self.pending_ops)} or closed: {self.close}") self.updating_database = True with self.connection.cursor() as cursor: while len(self.pending_ops) > 0: @@ -130,6 +126,7 @@ def __iuhandler(self): self.updating_database = False with self.qcond: self.qcond.notify_all() + print("iuhandler exited") def __qhandler(self): while not self.close: @@ -138,17 +135,17 @@ def __qhandler(self): lambda: len(self.waiting_ops) > 0 and not self.updating_database or self.close ) - print("qhandler - new item inserted") - print(hex(id(self.waiting_ops))) - print(hex(id(self.pending_ops))) + print(f"[qhandler] - condition is true: waiting ops: " + f"{len(self.waiting_ops)} and not {self.updating_database} " + f"or closed: {self.close}") while len(self.waiting_ops) > 0: query = self.waiting_ops.pop() if query is None: continue - print(f"inserting item: {query}") self.pending_ops.append(query) with self.iucond: self.iucond.notify_all() + print("qhandler exited") def insert(self, query: str, args=(), returning_id: bool = False) -> Query: if not self.close: @@ -195,7 +192,7 @@ def callproc(self, proc: str, args=()) -> list: cursor.callproc(proc, args) return cursor.fetchall() - def __del__(self): + def stop(self): print("deleting class") self.close = True print(f"is there any waiting operation? {len(self.waiting_ops) > 0}") @@ -211,9 +208,11 @@ def __del__(self): print("closing db connection") self.connection.close() - print("removing queues") - del self.waiting_ops - del self.pending_ops + def __del__(self): + try: + self.stop() + except Exception as e: + print(e) class PostgreSQLBase(ABC): diff --git a/YouTubeMDBot/downloader/youtube_downloader.py b/YouTubeMDBot/downloader/youtube_downloader.py index 8e0b050..de67799 100755 --- a/YouTubeMDBot/downloader/youtube_downloader.py +++ b/YouTubeMDBot/downloader/youtube_downloader.py @@ -88,12 +88,6 @@ def download(self) -> Tuple[BytesIO, bytes]: class MultipleYouTubeDownloader(ThreadPoolBase): - def __new__(cls, - max_processes: int = 4, - name: str = "YouTubeDownloader", - **kwargs): - return super().__new__(cls, max_processes, name, **kwargs) - def download(self, yt_obj: YouTubeDownloader) -> Tuple[BytesIO, bytes]: return super().wait_execute(yt_obj.download) diff --git a/YouTubeMDBot/multiprocess/abcprocess.py b/YouTubeMDBot/multiprocess/abcprocess.py index cf5a602..6fac15e 100644 --- a/YouTubeMDBot/multiprocess/abcprocess.py +++ b/YouTubeMDBot/multiprocess/abcprocess.py @@ -26,20 +26,26 @@ class ThreadPoolBase(ABC): __instance = None - def __new__(cls, - max_processes: int = MAX_PROCESS, - name: str = "ThreadBase", - **kwargs): + def __new__(cls, **kwargs): if ThreadPoolBase.__instance is None: cls.__instance = object.__new__(cls) - cls.__instance.__pool = ThreadPool(processes=max_processes) - cls.__instance.__lock = Lock() - cls.__instance.__finished = False - cls.__instance.name = name - for key, value in kwargs.items(): - setattr(cls.__instance, key, value) + cls.__instance.must_initialize = True return cls.__instance + def __init__(self, + max_processes: int = MAX_PROCESS, + name: str = "ThreadBase", + **kwargs): + if self.must_initialize: + self.__pool = ThreadPool(processes=max_processes) + self.__lock = Lock() + self.finished = False + self.name = name + self.must_initialize = False + + for key, value in kwargs.items(): + setattr(self, key, value) + @property def finished(self) -> bool: with self.__lock: @@ -50,12 +56,6 @@ def finished(self, value: bool): with self.__lock: self.__finished = value - def __del__(self): - if not self.finished: - self.__pool.close() - self.__pool.join() - self.finished = True - def wait_execute(self, func: Callable, *args, **kwargs) -> Any: if not self.finished: return self.__pool.apply(func=func, args=args, kwds=kwargs) @@ -64,15 +64,26 @@ def wait_execute(self, func: Callable, *args, **kwargs) -> Any: def execute(self, func: Callable, - args=(), - kwds={}, callback: Callable[[Any], Any] = None, - err_callback: Callable[[Any], Any] = None): + err_callback: Callable[[Any], Any] = None, + *args, **kwargs): if not self.finished: return self.__pool.apply_async(func=func, args=args, - kwds=kwds, + kwds=kwargs, callback=callback, error_callback=err_callback) else: raise FinishedException(f"The thread pool {self.name} has finished") + + def close(self): + if not self.finished: + self.__pool.close() + self.__pool.join() + self.finished = True + + def __del__(self): + try: + self.close() + except Exception as e: + print(e) diff --git a/YouTubeMDBot/tests/converter.py b/YouTubeMDBot/tests/converter.py index b076c65..0cf6e43 100644 --- a/YouTubeMDBot/tests/converter.py +++ b/YouTubeMDBot/tests/converter.py @@ -4,12 +4,11 @@ from io import BytesIO from typing import Tuple -from YouTubeMDBot.tests.tagger import TaggerTest +# from YouTubeMDBot.tests.tagger import TaggerTest from YouTubeMDBot.audio import FFmpegMP3 from YouTubeMDBot.audio import FFmpegOGG - -class MyTestCase(TaggerTest): +"""class MyTestCase(TaggerTest): def find_metadata(self, future, downloader) -> Tuple[BytesIO, bytes, dict]: print(f"Running test: find_metadata in {__file__}") io, data, song_info = super().find_metadata(future, downloader) @@ -33,4 +32,4 @@ def find_metadata(self, future, downloader) -> Tuple[BytesIO, bytes, dict]: if __name__ == '__main__': - unittest.main() + unittest.main()""" diff --git a/YouTubeMDBot/tests/database.py b/YouTubeMDBot/tests/database.py index 137d192..e7b12ab 100644 --- a/YouTubeMDBot/tests/database.py +++ b/YouTubeMDBot/tests/database.py @@ -6,14 +6,10 @@ class DatabaseTesting(unittest.TestCase): def test_creation(self): - db_item = PostgreSQLItem(min_ops=3) + db_item = PostgreSQLItem() db = Initializer(db_item) db.init() - db_item2 = PostgreSQLItem() - db_item3 = PostgreSQLItem() - db_item4 = PostgreSQLItem() - user_db = UserDB(db_item) print(hex(id(db))) print(hex(id(user_db))) @@ -30,13 +26,15 @@ def test_creation(self): user_db.register_new_user(12333, "test", "test", "en") user_db.register_new_user(12344, "test", "test", "en") - time.sleep(1) + # time.sleep(1) + + # for uid in (12334, 12335, 12336, 12337, 12338, 12339, 12330, 12331, + # 12332, 12333, 12344): + # print(user_db.get_user_information(uid)) - for uid in (12334, 12335, 12336, 12337, 12338, 12339, 12330, 12331, - 12332, 12333, 12344): - print(user_db.get_user_information(uid)) + db_item.stop() - del db_item + # del db_item if __name__ == '__main__': diff --git a/YouTubeMDBot/tests/downloader.py b/YouTubeMDBot/tests/downloader.py index d7abc63..c49c308 100755 --- a/YouTubeMDBot/tests/downloader.py +++ b/YouTubeMDBot/tests/downloader.py @@ -40,7 +40,7 @@ def test_multithread_download(self): self._barrier.wait() - del ytdl + ytdl.close() def write_to_file(self, future, name: str): _, data = future.get() diff --git a/YouTubeMDBot/tests/identifier.py b/YouTubeMDBot/tests/identifier.py index f3b1471..7d084bd 100755 --- a/YouTubeMDBot/tests/identifier.py +++ b/YouTubeMDBot/tests/identifier.py @@ -2,7 +2,6 @@ import unittest from io import BytesIO from pprint import pprint -from time import sleep from time import time from typing import Tuple from threading import Barrier @@ -80,10 +79,8 @@ def test_multiple_download_identification(self): t6.start() self.barrier.wait() - pprint("Finished") - - del ytdl + ytdl.close() def find_metadata(self, future, downloader) -> Tuple[BytesIO, bytes, dict]: st_dl_t = time() diff --git a/YouTubeMDBot/tests/tagger.py b/YouTubeMDBot/tests/tagger.py index 2070d8d..d4352cf 100755 --- a/YouTubeMDBot/tests/tagger.py +++ b/YouTubeMDBot/tests/tagger.py @@ -9,7 +9,8 @@ from YouTubeMDBot.utils import youtube_utils -class TaggerTest(IdentifierTest): +"""class TaggerTest(IdentifierTest): + def find_metadata(self, future, downloader) -> Tuple[BytesIO, bytes, dict]: print(f"Running test: find_metadata in {__file__}") io, data, song_info = super().find_metadata(future, downloader) @@ -41,4 +42,4 @@ def find_metadata(self, future, downloader) -> Tuple[BytesIO, bytes, dict]: if __name__ == '__main__': - unittest.main() + unittest.main()"""