From 446847adfcc3ed82a745062b76a9fef60bf2f1ee Mon Sep 17 00:00:00 2001 From: Javinator9889 Date: Sat, 15 Feb 2020 13:19:28 +0100 Subject: [PATCH] PostgreSQL class wrapper ("PostgreSQLBase") and sizeable queue Included the necessary methods for managing the connection with the database. In addition, a subclass of `multithreading.queue` was created in order to know the amount of elements inside that queue --- .gitlab-ci.yml | 2 +- Design/Database/psql_model.sql | 2 +- YouTubeMDBot/__init__.py | 1 + YouTubeMDBot/database/__init__.py | 2 - YouTubeMDBot/database/psql.py | 216 +++++++++++------- YouTubeMDBot/requirements.txt | 2 +- YouTubeMDBot/utils/__init__.py | 1 + .../query.py => utils/queue/__init__.py} | 17 +- YouTubeMDBot/utils/queue/cqueue.py | 61 +++++ 9 files changed, 204 insertions(+), 100 deletions(-) rename YouTubeMDBot/{database/query.py => utils/queue/__init__.py} (64%) create mode 100644 YouTubeMDBot/utils/queue/cqueue.py diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 7e1fb62..e7f5c82 100755 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -19,7 +19,7 @@ cache: before_script: - python -V # Print out python version for debugging - apt update - - apt install -y libchromaprint-tools ffmpeg + - apt install -y libchromaprint-tools ffmpeg libpq-dev - git clone https://github.com/beetbox/pyacoustid.git && cd pyacoustid - python setup.py install - cd .. && rm -rf pyacoustid/ diff --git a/Design/Database/psql_model.sql b/Design/Database/psql_model.sql index 7fffb89..83c3361 100644 --- a/Design/Database/psql_model.sql +++ b/Design/Database/psql_model.sql @@ -158,7 +158,7 @@ CREATE INDEX youtubemd.user_preferences_ix ON youtubemd.Preferences ("user_id"); CREATE INDEX youtubemd.video_metadata_ix ON youtubemd.Video_Has_Metadata ("id", "metadata_id"); CREATE INDEX youtubemd.history_ix ON youtubemd.History ("id", "file_id", "user_id", "metadata_id"); --- Trigger that updated the different stats +-- Trigger that updates different stats CREATE FUNCTION youtubemd.process_stats() RETURNS trigger AS $$ DECLARE diff --git a/YouTubeMDBot/__init__.py b/YouTubeMDBot/__init__.py index 4f6844f..25f0ea6 100755 --- a/YouTubeMDBot/__init__.py +++ b/YouTubeMDBot/__init__.py @@ -38,6 +38,7 @@ from .commands import StartHandler +from .utils import CQueue from .utils import get_yt_video_id from .downloader import YouTubeDownloader diff --git a/YouTubeMDBot/database/__init__.py b/YouTubeMDBot/database/__init__.py index 1ab0139..8a858f1 100644 --- a/YouTubeMDBot/database/__init__.py +++ b/YouTubeMDBot/database/__init__.py @@ -13,5 +13,3 @@ # # You should have received a copy of the GNU General Public License # along with this program. If not, see . -from ..database.psql import PostgreSQL -from ..database.query import Query diff --git a/YouTubeMDBot/database/psql.py b/YouTubeMDBot/database/psql.py index 8c86398..9e0d4d2 100644 --- a/YouTubeMDBot/database/psql.py +++ b/YouTubeMDBot/database/psql.py @@ -13,94 +13,150 @@ # # You should have received a copy of the GNU General Public License # along with this program. If not, see . +import psycopg2 + +from abc import ABC +from abc import abstractmethod + from typing import Any from typing import Optional -from psycopg2 import pool -from psycopg2.pool import PoolError +from threading import Lock +from threading import Thread +from threading import Condition -from . import Query +from .. import CQueue from .. import DB_HOST from .. import DB_NAME from .. import DB_PASSWORD from .. import DB_PORT from .. import DB_USER -from .. import MAX_PROCESS - - -class PostgreSQL(MultiprocessBase): - def __new__(cls, **kwargs): - if cls.__instance is None: - connection = pool.ThreadedConnectionPool(minconn=1, - maxconn=MAX_PROCESS, - user=DB_USER, - password=DB_PASSWORD, - dbname=DB_NAME, - host=DB_HOST, - port=DB_PORT) - return super().__new__(cls, connection=connection) - return super().__new__(cls) - - def free_connection(self, connection): - super().free_connection(connection) - self.connection.putconn(connection) - - def get_connection(self) -> Optional[Any]: - super().get_connection() - try: - return self.connection.getconn() - except PoolError: - return None - - def execute(self, query: Query): - super().new(self._execute, query) - - def fetchone(self, query: Query): - super().new(self._fetchone, query) - - def fetchall(self, query: Query): - super().new(self._fetchall, query) - - def fetchiter(self, query: Query): - super().new(self._fetchiter, query) - - @staticmethod - def _execute(self, query: Query, connection): - with connection.cursor() as cursor: - cursor.execute(query.query) - query._result = cursor.rowcount - query.is_completed = True - - @staticmethod - def _fetchone(query: Query, connection): - with connection.cursor() as cursor: - cursor.execute(query.query) - query._result = cursor.fetchone() - query.is_completed = True - - @staticmethod - def _fetchall(query: Query, connection): - with connection.cursor() as cursor: - cursor.execute(query.query) - query._result = cursor.fetchall() - query.is_completed = True - - @staticmethod - def _fetchiter(query: Query, connection): - with connection.cursor() as cursor: - cursor.execute(query.query) - - def generate(): - while True: - items = cursor.fetchmany() - if not items: - break - for item in items: - yield item - - query._result = generate() - query.is_completed = True + + +class Query: + def __init__(self, statement: str, values: tuple = None): + self.statement = statement + self.values = values + + +class PostgreSQLBase(ABC): + __instance = None + + def __new__(cls, + min_ops: int = 100, + **kwargs): + if PostgreSQLBase.__instance is None: + cls.__instance = object.__new__(cls) + cls.__instance.connection = psycopg2.connect(user=DB_USER, + password=DB_PASSWORD, + host=DB_HOST, + port=DB_PORT, + dbname=DB_NAME) + cls.__instance.min_ops = min_ops + cls.__instance._iuthread = Thread(target=cls.__iuhandler, + name="iuthread") + cls.__instance._qthread = Thread(name="qthread") + cls.__instance.lock = Lock() + cls.__instance.__close = False + cls.__instance.pending_ops = CQueue() + cls.__instance.waiting_ops = CQueue() + cls.__instance.updating_database = False + cls.__instance.iucond = Condition() + cls.__instance.qcond = Condition() + cls.__instance._iuthread.start() + for key, value in kwargs.items(): + setattr(cls.__instance, key, value) + return cls.__instance + + @property + def close(self) -> bool: + with self.lock: + return self.__close + + @close.setter + def close(self, value: bool): + with self.lock: + self.__close = value + + @property + def updating_database(self) -> bool: + with self.lock: + return self.__updating_database + + @updating_database.setter + def updating_database(self, value: bool): + with self.lock: + self.__updating_database = value + + def __iuhandler(self): + while not self.close: + with self.iucond: + self.iucond.wait_for( + lambda: self.pending_ops.qsize() >= self.min_ops or + self.close + ) + self.updating_database = True + with self.connection.cursor() as cursor: + for query in self.pending_ops: + cursor.execute(query.statement, query.values) + self.connection.commit() + self.updating_database = False + self.qcond.notify_all() + + def __qhandler(self): + while not self.close: + with self.qcond: + self.qcond.wait_for( + lambda: not self.waiting_ops.empty() and + not self.updating_database or self.close + ) + for query in self.waiting_ops: + self.pending_ops.put(query) + self.iucond.notify_all() + + def insert(self, query: str, args=()): + if not self.close: + insert_query = Query(query, args) + self.waiting_ops.put(insert_query) + self.qcond.notify_all() + + def update(self, query: str, args=()): + if not self.close: + update_query = Query(query, args) + self.waiting_ops.put(update_query) + self.qcond.notify_all() + + def fetchone(self, query: str, args=()): + if not self.close: + with self.connection.cursor() as cursor: + cursor.execute(query, args) + return cursor.fetchone() + + def fetchmany(self, query: str, rows: int, args=()): + if not self.close: + with self.connection.cursor() as cursor: + cursor.execute(query, args) + return cursor.fetchmany(rows) + + def fetchall(self, query: str, args=()): + if not self.close: + with self.connection.cursor() as cursor: + cursor.execute(query, args) + return cursor.fetchall() def __del__(self): - super().__del__() - self.connection.closeall() + self.close = True + if not self.waiting_ops.empty(): + self.qcond.notify_all() + self._qthread.join() + if not self.pending_ops.empty(): + self.iucond.notify_all() + self._iuthread.join() + self.connection.close() + + del self.waiting_ops + del self.pending_ops + + +class YouTubeDB(PostgreSQLBase): + pass diff --git a/YouTubeMDBot/requirements.txt b/YouTubeMDBot/requirements.txt index a0fb6bf..7e8f024 100755 --- a/YouTubeMDBot/requirements.txt +++ b/YouTubeMDBot/requirements.txt @@ -5,4 +5,4 @@ musicbrainzngs ujson youtube_dl python-telegram-bot -psycopg2-binary +psycopg2 diff --git a/YouTubeMDBot/utils/__init__.py b/YouTubeMDBot/utils/__init__.py index 9fe1f3e..1a5b972 100755 --- a/YouTubeMDBot/utils/__init__.py +++ b/YouTubeMDBot/utils/__init__.py @@ -13,4 +13,5 @@ # # You should have received a copy of the GNU General Public License # along with this program. If not, see . +from ..utils.queue import CQueue from ..utils.youtube_utils import get_yt_video_id diff --git a/YouTubeMDBot/database/query.py b/YouTubeMDBot/utils/queue/__init__.py similarity index 64% rename from YouTubeMDBot/database/query.py rename to YouTubeMDBot/utils/queue/__init__.py index 8b89323..fd86e2b 100644 --- a/YouTubeMDBot/database/query.py +++ b/YouTubeMDBot/utils/queue/__init__.py @@ -1,5 +1,5 @@ # YouTubeMDBot -# Copyright (C) 2019 - Javinator9889 +# Copyright (C) 2020 - Javinator9889 # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -13,17 +13,4 @@ # # You should have received a copy of the GNU General Public License # along with this program. If not, see . -import threading - - -class Query: - def __init__(self, query: str): - self.query = query - self.is_completed = False - self._result = None - self._condition = threading.Condition() - - def result(self): - with self._condition: - self._condition.wait_for(lambda: self.is_completed) - return self._result +from ..queue.cqueue import CQueue diff --git a/YouTubeMDBot/utils/queue/cqueue.py b/YouTubeMDBot/utils/queue/cqueue.py new file mode 100644 index 0000000..2fca298 --- /dev/null +++ b/YouTubeMDBot/utils/queue/cqueue.py @@ -0,0 +1,61 @@ +# YouTubeMDBot +# Copyright (C) 2020 - Javinator9889 +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +import multiprocessing + +from threading import Lock + +from typing import Any +from typing import Optional + +from multiprocessing import Queue + + +class CQueue(Queue): + def __init__(self, maxsize=0): + super().__init__(maxsize=maxsize, ctx=multiprocessing.get_context()) + self._lock = Lock() + self.size = 0 + + @property + def size(self) -> int: + with self._lock: + return self.__size + + @size.setter + def size(self, value): + with self._lock: + self.__size = value + + def put(self, + obj: Any, + block: bool = ..., + timeout: Optional[float] = ...) -> None: + self.size += 1 + super().put(obj, block, timeout) + + def get(self, block: bool = ..., timeout: Optional[float] = ...) -> Any: + self.size -= 1 + super().get(block, timeout) + + def qsize(self) -> int: + return self.size + + def empty(self) -> bool: + return not self.qsize() + + def __iter__(self): + while not self.empty(): + yield self.get()