diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml
index 7e1fb62..e7f5c82 100755
--- a/.gitlab-ci.yml
+++ b/.gitlab-ci.yml
@@ -19,7 +19,7 @@ cache:
before_script:
- python -V # Print out python version for debugging
- apt update
- - apt install -y libchromaprint-tools ffmpeg
+ - apt install -y libchromaprint-tools ffmpeg libpq-dev
- git clone https://github.com/beetbox/pyacoustid.git && cd pyacoustid
- python setup.py install
- cd .. && rm -rf pyacoustid/
diff --git a/Design/Database/psql_model.sql b/Design/Database/psql_model.sql
index 7fffb89..83c3361 100644
--- a/Design/Database/psql_model.sql
+++ b/Design/Database/psql_model.sql
@@ -158,7 +158,7 @@ CREATE INDEX youtubemd.user_preferences_ix ON youtubemd.Preferences ("user_id");
CREATE INDEX youtubemd.video_metadata_ix ON youtubemd.Video_Has_Metadata ("id", "metadata_id");
CREATE INDEX youtubemd.history_ix ON youtubemd.History ("id", "file_id", "user_id", "metadata_id");
--- Trigger that updated the different stats
+-- Trigger that updates different stats
CREATE FUNCTION youtubemd.process_stats() RETURNS trigger AS
$$
DECLARE
diff --git a/YouTubeMDBot/__init__.py b/YouTubeMDBot/__init__.py
index 4f6844f..25f0ea6 100755
--- a/YouTubeMDBot/__init__.py
+++ b/YouTubeMDBot/__init__.py
@@ -38,6 +38,7 @@
from .commands import StartHandler
+from .utils import CQueue
from .utils import get_yt_video_id
from .downloader import YouTubeDownloader
diff --git a/YouTubeMDBot/database/__init__.py b/YouTubeMDBot/database/__init__.py
index 1ab0139..8a858f1 100644
--- a/YouTubeMDBot/database/__init__.py
+++ b/YouTubeMDBot/database/__init__.py
@@ -13,5 +13,3 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
-from ..database.psql import PostgreSQL
-from ..database.query import Query
diff --git a/YouTubeMDBot/database/psql.py b/YouTubeMDBot/database/psql.py
index 8c86398..9e0d4d2 100644
--- a/YouTubeMDBot/database/psql.py
+++ b/YouTubeMDBot/database/psql.py
@@ -13,94 +13,150 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
+import psycopg2
+
+from abc import ABC
+from abc import abstractmethod
+
from typing import Any
from typing import Optional
-from psycopg2 import pool
-from psycopg2.pool import PoolError
+from threading import Lock
+from threading import Thread
+from threading import Condition
-from . import Query
+from .. import CQueue
from .. import DB_HOST
from .. import DB_NAME
from .. import DB_PASSWORD
from .. import DB_PORT
from .. import DB_USER
-from .. import MAX_PROCESS
-
-
-class PostgreSQL(MultiprocessBase):
- def __new__(cls, **kwargs):
- if cls.__instance is None:
- connection = pool.ThreadedConnectionPool(minconn=1,
- maxconn=MAX_PROCESS,
- user=DB_USER,
- password=DB_PASSWORD,
- dbname=DB_NAME,
- host=DB_HOST,
- port=DB_PORT)
- return super().__new__(cls, connection=connection)
- return super().__new__(cls)
-
- def free_connection(self, connection):
- super().free_connection(connection)
- self.connection.putconn(connection)
-
- def get_connection(self) -> Optional[Any]:
- super().get_connection()
- try:
- return self.connection.getconn()
- except PoolError:
- return None
-
- def execute(self, query: Query):
- super().new(self._execute, query)
-
- def fetchone(self, query: Query):
- super().new(self._fetchone, query)
-
- def fetchall(self, query: Query):
- super().new(self._fetchall, query)
-
- def fetchiter(self, query: Query):
- super().new(self._fetchiter, query)
-
- @staticmethod
- def _execute(self, query: Query, connection):
- with connection.cursor() as cursor:
- cursor.execute(query.query)
- query._result = cursor.rowcount
- query.is_completed = True
-
- @staticmethod
- def _fetchone(query: Query, connection):
- with connection.cursor() as cursor:
- cursor.execute(query.query)
- query._result = cursor.fetchone()
- query.is_completed = True
-
- @staticmethod
- def _fetchall(query: Query, connection):
- with connection.cursor() as cursor:
- cursor.execute(query.query)
- query._result = cursor.fetchall()
- query.is_completed = True
-
- @staticmethod
- def _fetchiter(query: Query, connection):
- with connection.cursor() as cursor:
- cursor.execute(query.query)
-
- def generate():
- while True:
- items = cursor.fetchmany()
- if not items:
- break
- for item in items:
- yield item
-
- query._result = generate()
- query.is_completed = True
+
+
+class Query:
+ def __init__(self, statement: str, values: tuple = None):
+ self.statement = statement
+ self.values = values
+
+
+class PostgreSQLBase(ABC):
+ __instance = None
+
+ def __new__(cls,
+ min_ops: int = 100,
+ **kwargs):
+ if PostgreSQLBase.__instance is None:
+ cls.__instance = object.__new__(cls)
+ cls.__instance.connection = psycopg2.connect(user=DB_USER,
+ password=DB_PASSWORD,
+ host=DB_HOST,
+ port=DB_PORT,
+ dbname=DB_NAME)
+ cls.__instance.min_ops = min_ops
+ cls.__instance._iuthread = Thread(target=cls.__iuhandler,
+ name="iuthread")
+ cls.__instance._qthread = Thread(name="qthread")
+ cls.__instance.lock = Lock()
+ cls.__instance.__close = False
+ cls.__instance.pending_ops = CQueue()
+ cls.__instance.waiting_ops = CQueue()
+ cls.__instance.updating_database = False
+ cls.__instance.iucond = Condition()
+ cls.__instance.qcond = Condition()
+ cls.__instance._iuthread.start()
+ for key, value in kwargs.items():
+ setattr(cls.__instance, key, value)
+ return cls.__instance
+
+ @property
+ def close(self) -> bool:
+ with self.lock:
+ return self.__close
+
+ @close.setter
+ def close(self, value: bool):
+ with self.lock:
+ self.__close = value
+
+ @property
+ def updating_database(self) -> bool:
+ with self.lock:
+ return self.__updating_database
+
+ @updating_database.setter
+ def updating_database(self, value: bool):
+ with self.lock:
+ self.__updating_database = value
+
+ def __iuhandler(self):
+ while not self.close:
+ with self.iucond:
+ self.iucond.wait_for(
+ lambda: self.pending_ops.qsize() >= self.min_ops or
+ self.close
+ )
+ self.updating_database = True
+ with self.connection.cursor() as cursor:
+ for query in self.pending_ops:
+ cursor.execute(query.statement, query.values)
+ self.connection.commit()
+ self.updating_database = False
+ self.qcond.notify_all()
+
+ def __qhandler(self):
+ while not self.close:
+ with self.qcond:
+ self.qcond.wait_for(
+ lambda: not self.waiting_ops.empty() and
+ not self.updating_database or self.close
+ )
+ for query in self.waiting_ops:
+ self.pending_ops.put(query)
+ self.iucond.notify_all()
+
+ def insert(self, query: str, args=()):
+ if not self.close:
+ insert_query = Query(query, args)
+ self.waiting_ops.put(insert_query)
+ self.qcond.notify_all()
+
+ def update(self, query: str, args=()):
+ if not self.close:
+ update_query = Query(query, args)
+ self.waiting_ops.put(update_query)
+ self.qcond.notify_all()
+
+ def fetchone(self, query: str, args=()):
+ if not self.close:
+ with self.connection.cursor() as cursor:
+ cursor.execute(query, args)
+ return cursor.fetchone()
+
+ def fetchmany(self, query: str, rows: int, args=()):
+ if not self.close:
+ with self.connection.cursor() as cursor:
+ cursor.execute(query, args)
+ return cursor.fetchmany(rows)
+
+ def fetchall(self, query: str, args=()):
+ if not self.close:
+ with self.connection.cursor() as cursor:
+ cursor.execute(query, args)
+ return cursor.fetchall()
def __del__(self):
- super().__del__()
- self.connection.closeall()
+ self.close = True
+ if not self.waiting_ops.empty():
+ self.qcond.notify_all()
+ self._qthread.join()
+ if not self.pending_ops.empty():
+ self.iucond.notify_all()
+ self._iuthread.join()
+ self.connection.close()
+
+ del self.waiting_ops
+ del self.pending_ops
+
+
+class YouTubeDB(PostgreSQLBase):
+ pass
diff --git a/YouTubeMDBot/requirements.txt b/YouTubeMDBot/requirements.txt
index a0fb6bf..7e8f024 100755
--- a/YouTubeMDBot/requirements.txt
+++ b/YouTubeMDBot/requirements.txt
@@ -5,4 +5,4 @@ musicbrainzngs
ujson
youtube_dl
python-telegram-bot
-psycopg2-binary
+psycopg2
diff --git a/YouTubeMDBot/utils/__init__.py b/YouTubeMDBot/utils/__init__.py
index 9fe1f3e..1a5b972 100755
--- a/YouTubeMDBot/utils/__init__.py
+++ b/YouTubeMDBot/utils/__init__.py
@@ -13,4 +13,5 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
+from ..utils.queue import CQueue
from ..utils.youtube_utils import get_yt_video_id
diff --git a/YouTubeMDBot/database/query.py b/YouTubeMDBot/utils/queue/__init__.py
similarity index 64%
rename from YouTubeMDBot/database/query.py
rename to YouTubeMDBot/utils/queue/__init__.py
index 8b89323..fd86e2b 100644
--- a/YouTubeMDBot/database/query.py
+++ b/YouTubeMDBot/utils/queue/__init__.py
@@ -1,5 +1,5 @@
# YouTubeMDBot
-# Copyright (C) 2019 - Javinator9889
+# Copyright (C) 2020 - Javinator9889
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -13,17 +13,4 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
-import threading
-
-
-class Query:
- def __init__(self, query: str):
- self.query = query
- self.is_completed = False
- self._result = None
- self._condition = threading.Condition()
-
- def result(self):
- with self._condition:
- self._condition.wait_for(lambda: self.is_completed)
- return self._result
+from ..queue.cqueue import CQueue
diff --git a/YouTubeMDBot/utils/queue/cqueue.py b/YouTubeMDBot/utils/queue/cqueue.py
new file mode 100644
index 0000000..2fca298
--- /dev/null
+++ b/YouTubeMDBot/utils/queue/cqueue.py
@@ -0,0 +1,61 @@
+# YouTubeMDBot
+# Copyright (C) 2020 - Javinator9889
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+import multiprocessing
+
+from threading import Lock
+
+from typing import Any
+from typing import Optional
+
+from multiprocessing import Queue
+
+
+class CQueue(Queue):
+ def __init__(self, maxsize=0):
+ super().__init__(maxsize=maxsize, ctx=multiprocessing.get_context())
+ self._lock = Lock()
+ self.size = 0
+
+ @property
+ def size(self) -> int:
+ with self._lock:
+ return self.__size
+
+ @size.setter
+ def size(self, value):
+ with self._lock:
+ self.__size = value
+
+ def put(self,
+ obj: Any,
+ block: bool = ...,
+ timeout: Optional[float] = ...) -> None:
+ self.size += 1
+ super().put(obj, block, timeout)
+
+ def get(self, block: bool = ..., timeout: Optional[float] = ...) -> Any:
+ self.size -= 1
+ super().get(block, timeout)
+
+ def qsize(self) -> int:
+ return self.size
+
+ def empty(self) -> bool:
+ return not self.qsize()
+
+ def __iter__(self):
+ while not self.empty():
+ yield self.get()