Skip to content

Commit

Permalink
Updated tests and close method
Browse files Browse the repository at this point in the history
Trying to do close operations inside the "__del__" method is dangerous as some items may be garbage collected, so this can lead into deadlocks and race conditions, even class not being properly deleted. This way, the deletion method was moved to "close" or "stop" method and "__del__" tries to call them safely wrapping inside a try/catch block. This behaviour may be removed in a future, making necessary to call the proper close method raising an exception if it was not called
  • Loading branch information
Javinator9889 committed May 17, 2020
1 parent 1973ad0 commit 831d2a5
Show file tree
Hide file tree
Showing 9 changed files with 65 additions and 66 deletions.
6 changes: 3 additions & 3 deletions YouTubeMDBot/audio/ffmpeg.py
Expand Up @@ -19,9 +19,9 @@
from subprocess import Popen
from typing import List

from ..constants import FFMPEG_CONVERTER
from ..constants import FFMPEG_OPENER
from ..constants import FFMPEG_VOLUME
from .. import FFMPEG_CONVERTER
from .. import FFMPEG_OPENER
from .. import FFMPEG_VOLUME


def ffmpeg_available() -> bool:
Expand Down
31 changes: 15 additions & 16 deletions YouTubeMDBot/database/psql.py
Expand Up @@ -26,7 +26,6 @@
import os
import psycopg2

from .. import CQueue
from .. import DB_HOST
from .. import DB_NAME
from .. import DB_PASSWORD
Expand All @@ -44,7 +43,7 @@ def __init__(self,
self.statement = statement
self.values = values
self.returning_id = returning_id
self.return_value: Future = Future()
self.return_value = Future()


class PostgreSQLItem:
Expand All @@ -59,8 +58,6 @@ def __new__(cls,
return cls.__instance

def __init__(self, min_ops: int = 100, **kwargs):
print("init called")
print(f"Must init?: {self.must_initialize}")
if self.must_initialize:
self.connection = psycopg2.connect(user=DB_USER,
password=DB_PASSWORD,
Expand All @@ -69,7 +66,7 @@ def __init__(self, min_ops: int = 100, **kwargs):
dbname=DB_NAME)
self.min_ops = min_ops
self.lock = Lock()
self.__close = False
self.close = False
self.pending_ops = deque()
self.waiting_ops = deque()
self.updating_database = False
Expand Down Expand Up @@ -114,9 +111,8 @@ def __iuhandler(self):
self.iucond.wait_for(
lambda: len(self.pending_ops) >= self.min_ops or self.close
)
print(
f"pending_ops {len(self.pending_ops)} >= min_ops {self.min_ops}")
print(hex(id(self.pending_ops)))
print(f"[iuhandler] - condition is true: pending ops: "
f"{len(self.pending_ops)} or closed: {self.close}")
self.updating_database = True
with self.connection.cursor() as cursor:
while len(self.pending_ops) > 0:
Expand All @@ -130,6 +126,7 @@ def __iuhandler(self):
self.updating_database = False
with self.qcond:
self.qcond.notify_all()
print("iuhandler exited")

def __qhandler(self):
while not self.close:
Expand All @@ -138,17 +135,17 @@ def __qhandler(self):
lambda: len(self.waiting_ops) > 0 and
not self.updating_database or self.close
)
print("qhandler - new item inserted")
print(hex(id(self.waiting_ops)))
print(hex(id(self.pending_ops)))
print(f"[qhandler] - condition is true: waiting ops: "
f"{len(self.waiting_ops)} and not {self.updating_database} "
f"or closed: {self.close}")
while len(self.waiting_ops) > 0:
query = self.waiting_ops.pop()
if query is None:
continue
print(f"inserting item: {query}")
self.pending_ops.append(query)
with self.iucond:
self.iucond.notify_all()
print("qhandler exited")

def insert(self, query: str, args=(), returning_id: bool = False) -> Query:
if not self.close:
Expand Down Expand Up @@ -195,7 +192,7 @@ def callproc(self, proc: str, args=()) -> list:
cursor.callproc(proc, args)
return cursor.fetchall()

def __del__(self):
def stop(self):
print("deleting class")
self.close = True
print(f"is there any waiting operation? {len(self.waiting_ops) > 0}")
Expand All @@ -211,9 +208,11 @@ def __del__(self):
print("closing db connection")
self.connection.close()

print("removing queues")
del self.waiting_ops
del self.pending_ops
def __del__(self):
try:
self.stop()
except Exception as e:
print(e)


class PostgreSQLBase(ABC):
Expand Down
6 changes: 0 additions & 6 deletions YouTubeMDBot/downloader/youtube_downloader.py
Expand Up @@ -88,12 +88,6 @@ def download(self) -> Tuple[BytesIO, bytes]:


class MultipleYouTubeDownloader(ThreadPoolBase):
def __new__(cls,
max_processes: int = 4,
name: str = "YouTubeDownloader",
**kwargs):
return super().__new__(cls, max_processes, name, **kwargs)

def download(self, yt_obj: YouTubeDownloader) -> Tuple[BytesIO, bytes]:
return super().wait_execute(yt_obj.download)

Expand Down
51 changes: 31 additions & 20 deletions YouTubeMDBot/multiprocess/abcprocess.py
Expand Up @@ -26,20 +26,26 @@
class ThreadPoolBase(ABC):
__instance = None

def __new__(cls,
max_processes: int = MAX_PROCESS,
name: str = "ThreadBase",
**kwargs):
def __new__(cls, **kwargs):
if ThreadPoolBase.__instance is None:
cls.__instance = object.__new__(cls)
cls.__instance.__pool = ThreadPool(processes=max_processes)
cls.__instance.__lock = Lock()
cls.__instance.__finished = False
cls.__instance.name = name
for key, value in kwargs.items():
setattr(cls.__instance, key, value)
cls.__instance.must_initialize = True
return cls.__instance

def __init__(self,
max_processes: int = MAX_PROCESS,
name: str = "ThreadBase",
**kwargs):
if self.must_initialize:
self.__pool = ThreadPool(processes=max_processes)
self.__lock = Lock()
self.finished = False
self.name = name
self.must_initialize = False

for key, value in kwargs.items():
setattr(self, key, value)

@property
def finished(self) -> bool:
with self.__lock:
Expand All @@ -50,12 +56,6 @@ def finished(self, value: bool):
with self.__lock:
self.__finished = value

def __del__(self):
if not self.finished:
self.__pool.close()
self.__pool.join()
self.finished = True

def wait_execute(self, func: Callable, *args, **kwargs) -> Any:
if not self.finished:
return self.__pool.apply(func=func, args=args, kwds=kwargs)
Expand All @@ -64,15 +64,26 @@ def wait_execute(self, func: Callable, *args, **kwargs) -> Any:

def execute(self,
func: Callable,
args=(),
kwds={},
callback: Callable[[Any], Any] = None,
err_callback: Callable[[Any], Any] = None):
err_callback: Callable[[Any], Any] = None,
*args, **kwargs):
if not self.finished:
return self.__pool.apply_async(func=func,
args=args,
kwds=kwds,
kwds=kwargs,
callback=callback,
error_callback=err_callback)
else:
raise FinishedException(f"The thread pool {self.name} has finished")

def close(self):
if not self.finished:
self.__pool.close()
self.__pool.join()
self.finished = True

def __del__(self):
try:
self.close()
except Exception as e:
print(e)
7 changes: 3 additions & 4 deletions YouTubeMDBot/tests/converter.py
Expand Up @@ -4,12 +4,11 @@
from io import BytesIO
from typing import Tuple

from YouTubeMDBot.tests.tagger import TaggerTest
# from YouTubeMDBot.tests.tagger import TaggerTest
from YouTubeMDBot.audio import FFmpegMP3
from YouTubeMDBot.audio import FFmpegOGG


class MyTestCase(TaggerTest):
"""class MyTestCase(TaggerTest):
def find_metadata(self, future, downloader) -> Tuple[BytesIO, bytes, dict]:
print(f"Running test: find_metadata in {__file__}")
io, data, song_info = super().find_metadata(future, downloader)
Expand All @@ -33,4 +32,4 @@ def find_metadata(self, future, downloader) -> Tuple[BytesIO, bytes, dict]:
if __name__ == '__main__':
unittest.main()
unittest.main()"""
18 changes: 8 additions & 10 deletions YouTubeMDBot/tests/database.py
Expand Up @@ -6,14 +6,10 @@

class DatabaseTesting(unittest.TestCase):
def test_creation(self):
db_item = PostgreSQLItem(min_ops=3)
db_item = PostgreSQLItem()
db = Initializer(db_item)
db.init()

db_item2 = PostgreSQLItem()
db_item3 = PostgreSQLItem()
db_item4 = PostgreSQLItem()

user_db = UserDB(db_item)
print(hex(id(db)))
print(hex(id(user_db)))
Expand All @@ -30,13 +26,15 @@ def test_creation(self):
user_db.register_new_user(12333, "test", "test", "en")
user_db.register_new_user(12344, "test", "test", "en")

time.sleep(1)
# time.sleep(1)

# for uid in (12334, 12335, 12336, 12337, 12338, 12339, 12330, 12331,
# 12332, 12333, 12344):
# print(user_db.get_user_information(uid))

for uid in (12334, 12335, 12336, 12337, 12338, 12339, 12330, 12331,
12332, 12333, 12344):
print(user_db.get_user_information(uid))
db_item.stop()

del db_item
# del db_item


if __name__ == '__main__':
Expand Down
2 changes: 1 addition & 1 deletion YouTubeMDBot/tests/downloader.py
Expand Up @@ -40,7 +40,7 @@ def test_multithread_download(self):

self._barrier.wait()

del ytdl
ytdl.close()

def write_to_file(self, future, name: str):
_, data = future.get()
Expand Down
5 changes: 1 addition & 4 deletions YouTubeMDBot/tests/identifier.py
Expand Up @@ -2,7 +2,6 @@
import unittest
from io import BytesIO
from pprint import pprint
from time import sleep
from time import time
from typing import Tuple
from threading import Barrier
Expand Down Expand Up @@ -80,10 +79,8 @@ def test_multiple_download_identification(self):
t6.start()

self.barrier.wait()

pprint("Finished")

del ytdl
ytdl.close()

def find_metadata(self, future, downloader) -> Tuple[BytesIO, bytes, dict]:
st_dl_t = time()
Expand Down
5 changes: 3 additions & 2 deletions YouTubeMDBot/tests/tagger.py
Expand Up @@ -9,7 +9,8 @@
from YouTubeMDBot.utils import youtube_utils


class TaggerTest(IdentifierTest):
"""class TaggerTest(IdentifierTest):
def find_metadata(self, future, downloader) -> Tuple[BytesIO, bytes, dict]:
print(f"Running test: find_metadata in {__file__}")
io, data, song_info = super().find_metadata(future, downloader)
Expand Down Expand Up @@ -41,4 +42,4 @@ def find_metadata(self, future, downloader) -> Tuple[BytesIO, bytes, dict]:
if __name__ == '__main__':
unittest.main()
unittest.main()"""

0 comments on commit 831d2a5

Please sign in to comment.