Skip to content

Commit

Permalink
PostgreSQL class wrapper ("PostgreSQLBase") and sizeable queue
Browse files Browse the repository at this point in the history
Included the necessary methods for managing the connection with the database. In addition, a subclass of `multithreading.queue` was created in order to know the amount of elements inside that queue
  • Loading branch information
Javinator9889 committed Feb 15, 2020
1 parent 00c7471 commit 446847a
Show file tree
Hide file tree
Showing 9 changed files with 204 additions and 100 deletions.
2 changes: 1 addition & 1 deletion .gitlab-ci.yml
Expand Up @@ -19,7 +19,7 @@ cache:
before_script:
- python -V # Print out python version for debugging
- apt update
- apt install -y libchromaprint-tools ffmpeg
- apt install -y libchromaprint-tools ffmpeg libpq-dev
- git clone https://github.com/beetbox/pyacoustid.git && cd pyacoustid
- python setup.py install
- cd .. && rm -rf pyacoustid/
Expand Down
2 changes: 1 addition & 1 deletion Design/Database/psql_model.sql
Expand Up @@ -158,7 +158,7 @@ CREATE INDEX youtubemd.user_preferences_ix ON youtubemd.Preferences ("user_id");
CREATE INDEX youtubemd.video_metadata_ix ON youtubemd.Video_Has_Metadata ("id", "metadata_id");
CREATE INDEX youtubemd.history_ix ON youtubemd.History ("id", "file_id", "user_id", "metadata_id");

-- Trigger that updated the different stats
-- Trigger that updates different stats
CREATE FUNCTION youtubemd.process_stats() RETURNS trigger AS
$$
DECLARE
Expand Down
1 change: 1 addition & 0 deletions YouTubeMDBot/__init__.py
Expand Up @@ -38,6 +38,7 @@

from .commands import StartHandler

from .utils import CQueue
from .utils import get_yt_video_id

from .downloader import YouTubeDownloader
Expand Down
2 changes: 0 additions & 2 deletions YouTubeMDBot/database/__init__.py
Expand Up @@ -13,5 +13,3 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from ..database.psql import PostgreSQL
from ..database.query import Query
216 changes: 136 additions & 80 deletions YouTubeMDBot/database/psql.py
Expand Up @@ -13,94 +13,150 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import psycopg2

from abc import ABC
from abc import abstractmethod

from typing import Any
from typing import Optional

from psycopg2 import pool
from psycopg2.pool import PoolError
from threading import Lock
from threading import Thread
from threading import Condition

from . import Query
from .. import CQueue
from .. import DB_HOST
from .. import DB_NAME
from .. import DB_PASSWORD
from .. import DB_PORT
from .. import DB_USER
from .. import MAX_PROCESS


class PostgreSQL(MultiprocessBase):
def __new__(cls, **kwargs):
if cls.__instance is None:
connection = pool.ThreadedConnectionPool(minconn=1,
maxconn=MAX_PROCESS,
user=DB_USER,
password=DB_PASSWORD,
dbname=DB_NAME,
host=DB_HOST,
port=DB_PORT)
return super().__new__(cls, connection=connection)
return super().__new__(cls)

def free_connection(self, connection):
super().free_connection(connection)
self.connection.putconn(connection)

def get_connection(self) -> Optional[Any]:
super().get_connection()
try:
return self.connection.getconn()
except PoolError:
return None

def execute(self, query: Query):
super().new(self._execute, query)

def fetchone(self, query: Query):
super().new(self._fetchone, query)

def fetchall(self, query: Query):
super().new(self._fetchall, query)

def fetchiter(self, query: Query):
super().new(self._fetchiter, query)

@staticmethod
def _execute(self, query: Query, connection):
with connection.cursor() as cursor:
cursor.execute(query.query)
query._result = cursor.rowcount
query.is_completed = True

@staticmethod
def _fetchone(query: Query, connection):
with connection.cursor() as cursor:
cursor.execute(query.query)
query._result = cursor.fetchone()
query.is_completed = True

@staticmethod
def _fetchall(query: Query, connection):
with connection.cursor() as cursor:
cursor.execute(query.query)
query._result = cursor.fetchall()
query.is_completed = True

@staticmethod
def _fetchiter(query: Query, connection):
with connection.cursor() as cursor:
cursor.execute(query.query)

def generate():
while True:
items = cursor.fetchmany()
if not items:
break
for item in items:
yield item

query._result = generate()
query.is_completed = True


class Query:
def __init__(self, statement: str, values: tuple = None):
self.statement = statement
self.values = values


class PostgreSQLBase(ABC):
__instance = None

def __new__(cls,
min_ops: int = 100,
**kwargs):
if PostgreSQLBase.__instance is None:
cls.__instance = object.__new__(cls)
cls.__instance.connection = psycopg2.connect(user=DB_USER,
password=DB_PASSWORD,
host=DB_HOST,
port=DB_PORT,
dbname=DB_NAME)
cls.__instance.min_ops = min_ops
cls.__instance._iuthread = Thread(target=cls.__iuhandler,
name="iuthread")
cls.__instance._qthread = Thread(name="qthread")
cls.__instance.lock = Lock()
cls.__instance.__close = False
cls.__instance.pending_ops = CQueue()
cls.__instance.waiting_ops = CQueue()
cls.__instance.updating_database = False
cls.__instance.iucond = Condition()
cls.__instance.qcond = Condition()
cls.__instance._iuthread.start()
for key, value in kwargs.items():
setattr(cls.__instance, key, value)
return cls.__instance

@property
def close(self) -> bool:
with self.lock:
return self.__close

@close.setter
def close(self, value: bool):
with self.lock:
self.__close = value

@property
def updating_database(self) -> bool:
with self.lock:
return self.__updating_database

@updating_database.setter
def updating_database(self, value: bool):
with self.lock:
self.__updating_database = value

def __iuhandler(self):
while not self.close:
with self.iucond:
self.iucond.wait_for(
lambda: self.pending_ops.qsize() >= self.min_ops or
self.close
)
self.updating_database = True
with self.connection.cursor() as cursor:
for query in self.pending_ops:
cursor.execute(query.statement, query.values)
self.connection.commit()
self.updating_database = False
self.qcond.notify_all()

def __qhandler(self):
while not self.close:
with self.qcond:
self.qcond.wait_for(
lambda: not self.waiting_ops.empty() and
not self.updating_database or self.close
)
for query in self.waiting_ops:
self.pending_ops.put(query)
self.iucond.notify_all()

def insert(self, query: str, args=()):
if not self.close:
insert_query = Query(query, args)
self.waiting_ops.put(insert_query)
self.qcond.notify_all()

def update(self, query: str, args=()):
if not self.close:
update_query = Query(query, args)
self.waiting_ops.put(update_query)
self.qcond.notify_all()

def fetchone(self, query: str, args=()):
if not self.close:
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchone()

def fetchmany(self, query: str, rows: int, args=()):
if not self.close:
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchmany(rows)

def fetchall(self, query: str, args=()):
if not self.close:
with self.connection.cursor() as cursor:
cursor.execute(query, args)
return cursor.fetchall()

def __del__(self):
super().__del__()
self.connection.closeall()
self.close = True
if not self.waiting_ops.empty():
self.qcond.notify_all()
self._qthread.join()
if not self.pending_ops.empty():
self.iucond.notify_all()
self._iuthread.join()
self.connection.close()

del self.waiting_ops
del self.pending_ops


class YouTubeDB(PostgreSQLBase):
pass
2 changes: 1 addition & 1 deletion YouTubeMDBot/requirements.txt
Expand Up @@ -5,4 +5,4 @@ musicbrainzngs
ujson
youtube_dl
python-telegram-bot
psycopg2-binary
psycopg2
1 change: 1 addition & 0 deletions YouTubeMDBot/utils/__init__.py
Expand Up @@ -13,4 +13,5 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from ..utils.queue import CQueue
from ..utils.youtube_utils import get_yt_video_id
@@ -1,5 +1,5 @@
# YouTubeMDBot
# Copyright (C) 2019 - Javinator9889
# Copyright (C) 2020 - Javinator9889
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
Expand All @@ -13,17 +13,4 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import threading


class Query:
def __init__(self, query: str):
self.query = query
self.is_completed = False
self._result = None
self._condition = threading.Condition()

def result(self):
with self._condition:
self._condition.wait_for(lambda: self.is_completed)
return self._result
from ..queue.cqueue import CQueue
61 changes: 61 additions & 0 deletions YouTubeMDBot/utils/queue/cqueue.py
@@ -0,0 +1,61 @@
# YouTubeMDBot
# Copyright (C) 2020 - Javinator9889
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import multiprocessing

from threading import Lock

from typing import Any
from typing import Optional

from multiprocessing import Queue


class CQueue(Queue):
def __init__(self, maxsize=0):
super().__init__(maxsize=maxsize, ctx=multiprocessing.get_context())
self._lock = Lock()
self.size = 0

@property
def size(self) -> int:
with self._lock:
return self.__size

@size.setter
def size(self, value):
with self._lock:
self.__size = value

def put(self,
obj: Any,
block: bool = ...,
timeout: Optional[float] = ...) -> None:
self.size += 1
super().put(obj, block, timeout)

def get(self, block: bool = ..., timeout: Optional[float] = ...) -> Any:
self.size -= 1
super().get(block, timeout)

def qsize(self) -> int:
return self.size

def empty(self) -> bool:
return not self.qsize()

def __iter__(self):
while not self.empty():
yield self.get()

0 comments on commit 446847a

Please sign in to comment.