diff --git a/.idea/.gitignore b/.idea/.gitignore
new file mode 100644
index 0000000..0e40fe8
--- /dev/null
+++ b/.idea/.gitignore
@@ -0,0 +1,3 @@
+
+# Default ignored files
+/workspace.xml
\ No newline at end of file
diff --git a/.idea/YouTubeMDBot.iml b/.idea/YouTubeMDBot.iml
new file mode 100644
index 0000000..6711606
--- /dev/null
+++ b/.idea/YouTubeMDBot.iml
@@ -0,0 +1,11 @@
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/codeStyles/codeStyleConfig.xml b/.idea/codeStyles/codeStyleConfig.xml
new file mode 100644
index 0000000..a55e7a1
--- /dev/null
+++ b/.idea/codeStyles/codeStyleConfig.xml
@@ -0,0 +1,5 @@
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/dictionaries/javinator9889.xml b/.idea/dictionaries/javinator9889.xml
new file mode 100644
index 0000000..a59efb2
--- /dev/null
+++ b/.idea/dictionaries/javinator9889.xml
@@ -0,0 +1,7 @@
+
+
+
+ postgre
+
+
+
\ No newline at end of file
diff --git a/.idea/inspectionProfiles/profiles_settings.xml b/.idea/inspectionProfiles/profiles_settings.xml
new file mode 100644
index 0000000..105ce2d
--- /dev/null
+++ b/.idea/inspectionProfiles/profiles_settings.xml
@@ -0,0 +1,6 @@
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/misc.xml b/.idea/misc.xml
new file mode 100644
index 0000000..8656114
--- /dev/null
+++ b/.idea/misc.xml
@@ -0,0 +1,7 @@
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/modules.xml b/.idea/modules.xml
new file mode 100644
index 0000000..48cc268
--- /dev/null
+++ b/.idea/modules.xml
@@ -0,0 +1,8 @@
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/vcs.xml b/.idea/vcs.xml
new file mode 100644
index 0000000..94a25f7
--- /dev/null
+++ b/.idea/vcs.xml
@@ -0,0 +1,6 @@
+
+
+
+
+
+
\ No newline at end of file
diff --git a/YouTubeMDBot/__init__.py b/YouTubeMDBot/__init__.py
index 986bc7c..00faadd 100755
--- a/YouTubeMDBot/__init__.py
+++ b/YouTubeMDBot/__init__.py
@@ -41,3 +41,5 @@
from .metadata import YouTubeMetadataIdentifier
from .utils import get_yt_video_id
+
+from .multiprocess import MultiprocessBase
diff --git a/YouTubeMDBot/database/__init__.py b/YouTubeMDBot/database/__init__.py
index 5e2784b..1ab0139 100644
--- a/YouTubeMDBot/database/__init__.py
+++ b/YouTubeMDBot/database/__init__.py
@@ -13,5 +13,5 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
-from ..database.psql import PostgresSQL
+from ..database.psql import PostgreSQL
from ..database.query import Query
diff --git a/YouTubeMDBot/database/psql.py b/YouTubeMDBot/database/psql.py
index f969b30..ad8e14d 100644
--- a/YouTubeMDBot/database/psql.py
+++ b/YouTubeMDBot/database/psql.py
@@ -15,12 +15,9 @@
# along with this program. If not, see .
from typing import Any
from typing import Optional
-from queue import Queue
+
from psycopg2 import pool
from psycopg2.pool import PoolError
-from threading import Condition
-from threading import Lock
-from multiprocessing import Process
from . import Query
from .. import MAX_PROCESS
@@ -29,112 +26,182 @@
from .. import DB_PASSWORD
from .. import DB_NAME
from .. import DB_HOST
+from .. import MultiprocessBase
-class PostgresSQL:
- __instance = None
-
+class PostgreSQL(MultiprocessBase):
def __new__(cls, **kwargs):
- if PostgresSQL.__instance is None:
- PostgresSQL.__instance = object.__new__(cls)
- PostgresSQL.__instance.__connection = \
- pool.ThreadedConnectionPool(minconn=1,
- maxconn=MAX_PROCESS,
- user=DB_USER,
- password=DB_PASSWORD,
- dbname=DB_NAME,
- host=DB_HOST,
- port=DB_PORT)
- PostgresSQL.__instance.__waiting_processes = Queue()
- PostgresSQL.__instance.__running_processes = 0
- PostgresSQL.__instance.__lock = Lock()
- PostgresSQL.__instance.__queue_consumer = \
- Process(target=cls.__consumer,
- args=(cls, cls.__running_processes,))
- PostgresSQL.__instance.__queue_consumer.start()
- for key, value in kwargs.items():
- setattr(PostgresSQL.__instance, key, value)
- return PostgresSQL.__instance
+ if MultiprocessBase.__instance is None:
+ connection = pool.ThreadedConnectionPool(minconn=1,
+ maxconn=MAX_PROCESS,
+ user=DB_USER,
+ password=DB_PASSWORD,
+ dbname=DB_NAME,
+ host=DB_HOST,
+ port=DB_PORT)
+ return super().__new__(cls, connection=connection)
+ return super().__new__(cls)
- def __consumer(self, process_queue: Queue):
- condition = Condition()
- while self.__connection.closed == 0:
- with condition:
- condition.wait_for(lambda: self.__running_processes <= MAX_PROCESS)
- pending_process = process_queue.get()
- pending_process["fn"](pending_process["query"])
+ def free_connection(self, connection):
+ super().free_connection(connection)
+ self.connection.putconn(connection)
- def __get_connection(self) -> Optional[Any]:
- with self.__lock:
- if self.__running_processes <= MAX_PROCESS:
- self.__running_processes += 1
- try:
- return self.__connection.getconn()
- except PoolError:
- return None
- return None
-
- def __free_connection(self, connection):
- self.__connection.putconn(connection)
- with self.__lock:
- self.__running_processes -= 1
+ def get_connection(self) -> Optional[Any]:
+ super().get_connection()
+ try:
+ return self.connection.getconn()
+ except PoolError:
+ return None
def execute(self, query: Query):
- connection = self.__get_connection()
- if connection is not None:
- with connection.cursor() as cursor:
- cursor.execute(query.query)
- query._result = cursor.rowcount
- query.is_completed = True
- self.__free_connection(connection)
- else:
- self.__waiting_processes.put({"query": query, "fn": self.execute})
+ super().new(self._execute, query)
+
+ @staticmethod
+ def _execute(self, query: Query, connection):
+ with connection.cursor() as cursor:
+ cursor.execute(query.query)
+ query._result = cursor.rowcount
+ query.is_completed = True
- def fetchone(self, query: Query):
- connection = self.__get_connection()
- if connection is not None:
- with connection.cursor() as cursor:
- cursor.execute(query.query)
- query._result = cursor.fetchone()
- query.is_completed = True
- self.__free_connection(connection)
- else:
- self.__waiting_processes.put({"query": query, "fn": self.fetchone})
+ @staticmethod
+ def _fetchone(query: Query, connection):
+ with connection.cursor() as cursor:
+ cursor.execute(query.query)
+ query._result = cursor.fetchone()
+ query.is_completed = True
- def fetchall(self, query: Query):
- connection = self.__get_connection()
- if connection is not None:
- with connection.cursor() as cursor:
- cursor.execute(query.query)
- query._result = cursor.fetchall()
- query.is_completed = True
- self.__free_connection(connection)
- else:
- self.__waiting_processes.put({"query": query, "fn": self.fetchall})
+ @staticmethod
+ def _fetchall(query: Query, connection):
+ with connection.cursor() as cursor:
+ cursor.execute(query.query)
+ query._result = cursor.fetchall()
+ query.is_completed = True
- def fetchiter(self, query: Query):
- connection = self.__get_connection()
- if connection is not None:
- with connection.cursor() as cursor:
- cursor.execute(query.query)
+ @staticmethod
+ def _fetchiter(query: Query, connection):
+ with connection.cursor() as cursor:
+ cursor.execute(query.query)
- def generate():
- while True:
- items = cursor.fetchmany()
- if not items:
- break
- for item in items:
- yield item
+ def generate():
+ while True:
+ items = cursor.fetchmany()
+ if not items:
+ break
+ for item in items:
+ yield item
- query._result = generate()
- query.is_completed = True
- self.__free_connection(connection)
- else:
- self.__waiting_processes.put({"query": query, "fn": self.fetchiter})
+ query._result = generate()
+ query.is_completed = True
def __del__(self):
- if self.__connection.closed == 0:
- while not self.__waiting_processes.empty():
- continue
- self.__queue_consumer.terminate()
- self.__instance.__connection.closeall()
+ super().__del__()
+ self.connection.closeall()
+
+# class PostgresSQL:
+# __instance = None
+#
+# def __new__(cls, **kwargs):
+# if PostgresSQL.__instance is None:
+# PostgresSQL.__instance = object.__new__(cls)
+# PostgresSQL.__instance.__connection = \
+# pool.ThreadedConnectionPool(minconn=1,
+# maxconn=MAX_PROCESS,
+# user=DB_USER,
+# password=DB_PASSWORD,
+# dbname=DB_NAME,
+# host=DB_HOST,
+# port=DB_PORT)
+# PostgresSQL.__instance.__waiting_processes = Queue()
+# PostgresSQL.__instance.__running_processes = 0
+# PostgresSQL.__instance.__lock = Lock()
+# PostgresSQL.__instance.__queue_consumer = \
+# Process(target=cls.__consumer,
+# args=(cls, cls.__running_processes,))
+# PostgresSQL.__instance.__queue_consumer.start()
+# for key, value in kwargs.items():
+# setattr(PostgresSQL.__instance, key, value)
+# return PostgresSQL.__instance
+#
+# def __consumer(self, process_queue: Queue):
+# condition = Condition()
+# while self.__connection.closed == 0:
+# with condition:
+# condition.wait_for(
+# lambda: self.__running_processes <= MAX_PROCESS)
+# pending_process = process_queue.get()
+# pending_process["fn"](pending_process["query"])
+#
+# def __get_connection(self) -> Optional[Any]:
+# with self.__lock:
+# if self.__running_processes <= MAX_PROCESS:
+# self.__running_processes += 1
+# try:
+# return self.__connection.getconn()
+# except PoolError:
+# return None
+# return None
+#
+# def __free_connection(self, connection):
+# self.__connection.putconn(connection)
+# with self.__lock:
+# self.__running_processes -= 1
+#
+# def execute(self, query: Query):
+# connection = self.__get_connection()
+# if connection is not None:
+# with connection.cursor() as cursor:
+# cursor.execute(query.query)
+# query._result = cursor.rowcount
+# query.is_completed = True
+# self.__free_connection(connection)
+# else:
+# self.__waiting_processes.put({"query": query, "fn": self.execute})
+#
+# def fetchone(self, query: Query):
+# connection = self.__get_connection()
+# if connection is not None:
+# with connection.cursor() as cursor:
+# cursor.execute(query.query)
+# query._result = cursor.fetchone()
+# query.is_completed = True
+# self.__free_connection(connection)
+# else:
+# self.__waiting_processes.put({"query": query, "fn": self.fetchone})
+#
+# def fetchall(self, query: Query):
+# connection = self.__get_connection()
+# if connection is not None:
+# with connection.cursor() as cursor:
+# cursor.execute(query.query)
+# query._result = cursor.fetchall()
+# query.is_completed = True
+# self.__free_connection(connection)
+# else:
+# self.__waiting_processes.put({"query": query, "fn": self.fetchall})
+#
+# def fetchiter(self, query: Query):
+# connection = self.__get_connection()
+# if connection is not None:
+# with connection.cursor() as cursor:
+# cursor.execute(query.query)
+#
+# def generate():
+# while True:
+# items = cursor.fetchmany()
+# if not items:
+# break
+# for item in items:
+# yield item
+#
+# query._result = generate()
+# query.is_completed = True
+# self.__free_connection(connection)
+# else:
+# self.__waiting_processes.put({"query": query, "fn": self.fetchiter})
+#
+# def __del__(self):
+# if self.__connection.closed == 0:
+# while not self.__waiting_processes.empty():
+# continue
+# self.__queue_consumer.terminate()
+# self.__instance.__connection.closeall()
diff --git a/YouTubeMDBot/multiprocess/__init__.py b/YouTubeMDBot/multiprocess/__init__.py
new file mode 100644
index 0000000..be565a8
--- /dev/null
+++ b/YouTubeMDBot/multiprocess/__init__.py
@@ -0,0 +1,16 @@
+# YouTubeMDBot
+# Copyright (C) 2019 - Javinator9889
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+from ..multiprocess.abcprocess import MultiprocessBase
diff --git a/YouTubeMDBot/multiprocess/abcprocess.py b/YouTubeMDBot/multiprocess/abcprocess.py
new file mode 100644
index 0000000..7670a3e
--- /dev/null
+++ b/YouTubeMDBot/multiprocess/abcprocess.py
@@ -0,0 +1,95 @@
+# YouTubeMDBot
+# Copyright (C) 2019 - Javinator9889
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+from abc import ABC
+from abc import abstractmethod
+
+from typing import Any
+from typing import Dict
+from typing import Optional
+from typing import Callable
+
+from queue import Queue
+
+from threading import Condition
+from threading import Lock
+
+from multiprocessing import Process
+
+from .. import MAX_PROCESS
+
+
+class MultiprocessBase(ABC):
+ __instance = None
+ inst = None
+
+ def __new__(cls, maxsize: int = 0, **kwargs):
+ if MultiprocessBase.__instance is None:
+ MultiprocessBase.__instance = object.__new__(cls)
+ MultiprocessBase.__instance.waiting_processes = Queue(maxsize)
+ MultiprocessBase.__instance.running_processes = 0
+ MultiprocessBase.__instance.lock = Lock()
+ MultiprocessBase.__instance.finished = False
+ MultiprocessBase.__instance.queue_consumer = \
+ Process(target=cls.__consumer)
+ MultiprocessBase.__instance.queue_consumer.start()
+ for key, value in kwargs.items():
+ setattr(MultiprocessBase.__instance, key, value)
+ return MultiprocessBase.__instance
+
+ def __consumer(self):
+ condition = Condition()
+ while not self.finished:
+ with condition:
+ condition.wait_for(
+ lambda: self.running_processes <= MAX_PROCESS and not
+ self.waiting_processes.empty())
+ process = self.waiting_processes.get()
+ self.__spawn(process)
+
+ def __spawn(self, process: Dict[str, Any]):
+ connection = self.get_connection()
+ child_process = Process(target=self.__run,
+ args=(process, connection,))
+ child_process.start()
+
+ @abstractmethod
+ def get_connection(self) -> Optional[Any]:
+ with self.lock:
+ if self.running_processes <= MAX_PROCESS:
+ self.running_processes += 1
+ return None
+
+ @abstractmethod
+ def free_connection(self, connection):
+ with self.lock:
+ self.running_processes -= 1
+
+ def __run(self, *args) -> Optional[Any]:
+ fn = args[0]["fn"]
+ fn_args = args[0]["args"]
+ result = fn(*fn_args, args[1])
+ self.free_connection(args[1])
+ return result
+
+ def new(self, fn: Callable, *args):
+ self.waiting_processes.put({"fn": fn, "args": args})
+
+ def __del__(self):
+ if not self.finished:
+ self.finished = True
+ while not self.waiting_processes.empty():
+ continue
+ self.queue_consumer.terminate()