From 2c9f028ed590efb7ad62186560edfb35c12656e6 Mon Sep 17 00:00:00 2001 From: Javinator9889 Date: Tue, 3 Dec 2019 13:07:35 +0100 Subject: [PATCH] Updated multiprocess base class Using an abstract class for encapsulating common methods and attributes that must be shared between all subclasses --- .idea/.gitignore | 3 + .idea/YouTubeMDBot.iml | 11 + .idea/codeStyles/codeStyleConfig.xml | 5 + .idea/dictionaries/javinator9889.xml | 7 + .../inspectionProfiles/profiles_settings.xml | 6 + .idea/misc.xml | 7 + .idea/modules.xml | 8 + .idea/vcs.xml | 6 + YouTubeMDBot/__init__.py | 2 + YouTubeMDBot/database/__init__.py | 2 +- YouTubeMDBot/database/psql.py | 265 +++++++++++------- YouTubeMDBot/multiprocess/__init__.py | 16 ++ YouTubeMDBot/multiprocess/abcprocess.py | 95 +++++++ 13 files changed, 333 insertions(+), 100 deletions(-) create mode 100644 .idea/.gitignore create mode 100644 .idea/YouTubeMDBot.iml create mode 100644 .idea/codeStyles/codeStyleConfig.xml create mode 100644 .idea/dictionaries/javinator9889.xml create mode 100644 .idea/inspectionProfiles/profiles_settings.xml create mode 100644 .idea/misc.xml create mode 100644 .idea/modules.xml create mode 100644 .idea/vcs.xml create mode 100644 YouTubeMDBot/multiprocess/__init__.py create mode 100644 YouTubeMDBot/multiprocess/abcprocess.py diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..0e40fe8 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,3 @@ + +# Default ignored files +/workspace.xml \ No newline at end of file diff --git a/.idea/YouTubeMDBot.iml b/.idea/YouTubeMDBot.iml new file mode 100644 index 0000000..6711606 --- /dev/null +++ b/.idea/YouTubeMDBot.iml @@ -0,0 +1,11 @@ + + + + + + + + + + \ No newline at end of file diff --git a/.idea/codeStyles/codeStyleConfig.xml b/.idea/codeStyles/codeStyleConfig.xml new file mode 100644 index 0000000..a55e7a1 --- /dev/null +++ b/.idea/codeStyles/codeStyleConfig.xml @@ -0,0 +1,5 @@ + + + + \ No newline at end of file diff --git a/.idea/dictionaries/javinator9889.xml b/.idea/dictionaries/javinator9889.xml new file mode 100644 index 0000000..a59efb2 --- /dev/null +++ b/.idea/dictionaries/javinator9889.xml @@ -0,0 +1,7 @@ + + + + postgre + + + \ No newline at end of file diff --git a/.idea/inspectionProfiles/profiles_settings.xml b/.idea/inspectionProfiles/profiles_settings.xml new file mode 100644 index 0000000..105ce2d --- /dev/null +++ b/.idea/inspectionProfiles/profiles_settings.xml @@ -0,0 +1,6 @@ + + + + \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml new file mode 100644 index 0000000..8656114 --- /dev/null +++ b/.idea/misc.xml @@ -0,0 +1,7 @@ + + + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..48cc268 --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..94a25f7 --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/YouTubeMDBot/__init__.py b/YouTubeMDBot/__init__.py index 986bc7c..00faadd 100755 --- a/YouTubeMDBot/__init__.py +++ b/YouTubeMDBot/__init__.py @@ -41,3 +41,5 @@ from .metadata import YouTubeMetadataIdentifier from .utils import get_yt_video_id + +from .multiprocess import MultiprocessBase diff --git a/YouTubeMDBot/database/__init__.py b/YouTubeMDBot/database/__init__.py index 5e2784b..1ab0139 100644 --- a/YouTubeMDBot/database/__init__.py +++ b/YouTubeMDBot/database/__init__.py @@ -13,5 +13,5 @@ # # You should have received a copy of the GNU General Public License # along with this program. If not, see . -from ..database.psql import PostgresSQL +from ..database.psql import PostgreSQL from ..database.query import Query diff --git a/YouTubeMDBot/database/psql.py b/YouTubeMDBot/database/psql.py index f969b30..ad8e14d 100644 --- a/YouTubeMDBot/database/psql.py +++ b/YouTubeMDBot/database/psql.py @@ -15,12 +15,9 @@ # along with this program. If not, see . from typing import Any from typing import Optional -from queue import Queue + from psycopg2 import pool from psycopg2.pool import PoolError -from threading import Condition -from threading import Lock -from multiprocessing import Process from . import Query from .. import MAX_PROCESS @@ -29,112 +26,182 @@ from .. import DB_PASSWORD from .. import DB_NAME from .. import DB_HOST +from .. import MultiprocessBase -class PostgresSQL: - __instance = None - +class PostgreSQL(MultiprocessBase): def __new__(cls, **kwargs): - if PostgresSQL.__instance is None: - PostgresSQL.__instance = object.__new__(cls) - PostgresSQL.__instance.__connection = \ - pool.ThreadedConnectionPool(minconn=1, - maxconn=MAX_PROCESS, - user=DB_USER, - password=DB_PASSWORD, - dbname=DB_NAME, - host=DB_HOST, - port=DB_PORT) - PostgresSQL.__instance.__waiting_processes = Queue() - PostgresSQL.__instance.__running_processes = 0 - PostgresSQL.__instance.__lock = Lock() - PostgresSQL.__instance.__queue_consumer = \ - Process(target=cls.__consumer, - args=(cls, cls.__running_processes,)) - PostgresSQL.__instance.__queue_consumer.start() - for key, value in kwargs.items(): - setattr(PostgresSQL.__instance, key, value) - return PostgresSQL.__instance + if MultiprocessBase.__instance is None: + connection = pool.ThreadedConnectionPool(minconn=1, + maxconn=MAX_PROCESS, + user=DB_USER, + password=DB_PASSWORD, + dbname=DB_NAME, + host=DB_HOST, + port=DB_PORT) + return super().__new__(cls, connection=connection) + return super().__new__(cls) - def __consumer(self, process_queue: Queue): - condition = Condition() - while self.__connection.closed == 0: - with condition: - condition.wait_for(lambda: self.__running_processes <= MAX_PROCESS) - pending_process = process_queue.get() - pending_process["fn"](pending_process["query"]) + def free_connection(self, connection): + super().free_connection(connection) + self.connection.putconn(connection) - def __get_connection(self) -> Optional[Any]: - with self.__lock: - if self.__running_processes <= MAX_PROCESS: - self.__running_processes += 1 - try: - return self.__connection.getconn() - except PoolError: - return None - return None - - def __free_connection(self, connection): - self.__connection.putconn(connection) - with self.__lock: - self.__running_processes -= 1 + def get_connection(self) -> Optional[Any]: + super().get_connection() + try: + return self.connection.getconn() + except PoolError: + return None def execute(self, query: Query): - connection = self.__get_connection() - if connection is not None: - with connection.cursor() as cursor: - cursor.execute(query.query) - query._result = cursor.rowcount - query.is_completed = True - self.__free_connection(connection) - else: - self.__waiting_processes.put({"query": query, "fn": self.execute}) + super().new(self._execute, query) + + @staticmethod + def _execute(self, query: Query, connection): + with connection.cursor() as cursor: + cursor.execute(query.query) + query._result = cursor.rowcount + query.is_completed = True - def fetchone(self, query: Query): - connection = self.__get_connection() - if connection is not None: - with connection.cursor() as cursor: - cursor.execute(query.query) - query._result = cursor.fetchone() - query.is_completed = True - self.__free_connection(connection) - else: - self.__waiting_processes.put({"query": query, "fn": self.fetchone}) + @staticmethod + def _fetchone(query: Query, connection): + with connection.cursor() as cursor: + cursor.execute(query.query) + query._result = cursor.fetchone() + query.is_completed = True - def fetchall(self, query: Query): - connection = self.__get_connection() - if connection is not None: - with connection.cursor() as cursor: - cursor.execute(query.query) - query._result = cursor.fetchall() - query.is_completed = True - self.__free_connection(connection) - else: - self.__waiting_processes.put({"query": query, "fn": self.fetchall}) + @staticmethod + def _fetchall(query: Query, connection): + with connection.cursor() as cursor: + cursor.execute(query.query) + query._result = cursor.fetchall() + query.is_completed = True - def fetchiter(self, query: Query): - connection = self.__get_connection() - if connection is not None: - with connection.cursor() as cursor: - cursor.execute(query.query) + @staticmethod + def _fetchiter(query: Query, connection): + with connection.cursor() as cursor: + cursor.execute(query.query) - def generate(): - while True: - items = cursor.fetchmany() - if not items: - break - for item in items: - yield item + def generate(): + while True: + items = cursor.fetchmany() + if not items: + break + for item in items: + yield item - query._result = generate() - query.is_completed = True - self.__free_connection(connection) - else: - self.__waiting_processes.put({"query": query, "fn": self.fetchiter}) + query._result = generate() + query.is_completed = True def __del__(self): - if self.__connection.closed == 0: - while not self.__waiting_processes.empty(): - continue - self.__queue_consumer.terminate() - self.__instance.__connection.closeall() + super().__del__() + self.connection.closeall() + +# class PostgresSQL: +# __instance = None +# +# def __new__(cls, **kwargs): +# if PostgresSQL.__instance is None: +# PostgresSQL.__instance = object.__new__(cls) +# PostgresSQL.__instance.__connection = \ +# pool.ThreadedConnectionPool(minconn=1, +# maxconn=MAX_PROCESS, +# user=DB_USER, +# password=DB_PASSWORD, +# dbname=DB_NAME, +# host=DB_HOST, +# port=DB_PORT) +# PostgresSQL.__instance.__waiting_processes = Queue() +# PostgresSQL.__instance.__running_processes = 0 +# PostgresSQL.__instance.__lock = Lock() +# PostgresSQL.__instance.__queue_consumer = \ +# Process(target=cls.__consumer, +# args=(cls, cls.__running_processes,)) +# PostgresSQL.__instance.__queue_consumer.start() +# for key, value in kwargs.items(): +# setattr(PostgresSQL.__instance, key, value) +# return PostgresSQL.__instance +# +# def __consumer(self, process_queue: Queue): +# condition = Condition() +# while self.__connection.closed == 0: +# with condition: +# condition.wait_for( +# lambda: self.__running_processes <= MAX_PROCESS) +# pending_process = process_queue.get() +# pending_process["fn"](pending_process["query"]) +# +# def __get_connection(self) -> Optional[Any]: +# with self.__lock: +# if self.__running_processes <= MAX_PROCESS: +# self.__running_processes += 1 +# try: +# return self.__connection.getconn() +# except PoolError: +# return None +# return None +# +# def __free_connection(self, connection): +# self.__connection.putconn(connection) +# with self.__lock: +# self.__running_processes -= 1 +# +# def execute(self, query: Query): +# connection = self.__get_connection() +# if connection is not None: +# with connection.cursor() as cursor: +# cursor.execute(query.query) +# query._result = cursor.rowcount +# query.is_completed = True +# self.__free_connection(connection) +# else: +# self.__waiting_processes.put({"query": query, "fn": self.execute}) +# +# def fetchone(self, query: Query): +# connection = self.__get_connection() +# if connection is not None: +# with connection.cursor() as cursor: +# cursor.execute(query.query) +# query._result = cursor.fetchone() +# query.is_completed = True +# self.__free_connection(connection) +# else: +# self.__waiting_processes.put({"query": query, "fn": self.fetchone}) +# +# def fetchall(self, query: Query): +# connection = self.__get_connection() +# if connection is not None: +# with connection.cursor() as cursor: +# cursor.execute(query.query) +# query._result = cursor.fetchall() +# query.is_completed = True +# self.__free_connection(connection) +# else: +# self.__waiting_processes.put({"query": query, "fn": self.fetchall}) +# +# def fetchiter(self, query: Query): +# connection = self.__get_connection() +# if connection is not None: +# with connection.cursor() as cursor: +# cursor.execute(query.query) +# +# def generate(): +# while True: +# items = cursor.fetchmany() +# if not items: +# break +# for item in items: +# yield item +# +# query._result = generate() +# query.is_completed = True +# self.__free_connection(connection) +# else: +# self.__waiting_processes.put({"query": query, "fn": self.fetchiter}) +# +# def __del__(self): +# if self.__connection.closed == 0: +# while not self.__waiting_processes.empty(): +# continue +# self.__queue_consumer.terminate() +# self.__instance.__connection.closeall() diff --git a/YouTubeMDBot/multiprocess/__init__.py b/YouTubeMDBot/multiprocess/__init__.py new file mode 100644 index 0000000..be565a8 --- /dev/null +++ b/YouTubeMDBot/multiprocess/__init__.py @@ -0,0 +1,16 @@ +# YouTubeMDBot +# Copyright (C) 2019 - Javinator9889 +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +from ..multiprocess.abcprocess import MultiprocessBase diff --git a/YouTubeMDBot/multiprocess/abcprocess.py b/YouTubeMDBot/multiprocess/abcprocess.py new file mode 100644 index 0000000..7670a3e --- /dev/null +++ b/YouTubeMDBot/multiprocess/abcprocess.py @@ -0,0 +1,95 @@ +# YouTubeMDBot +# Copyright (C) 2019 - Javinator9889 +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +from abc import ABC +from abc import abstractmethod + +from typing import Any +from typing import Dict +from typing import Optional +from typing import Callable + +from queue import Queue + +from threading import Condition +from threading import Lock + +from multiprocessing import Process + +from .. import MAX_PROCESS + + +class MultiprocessBase(ABC): + __instance = None + inst = None + + def __new__(cls, maxsize: int = 0, **kwargs): + if MultiprocessBase.__instance is None: + MultiprocessBase.__instance = object.__new__(cls) + MultiprocessBase.__instance.waiting_processes = Queue(maxsize) + MultiprocessBase.__instance.running_processes = 0 + MultiprocessBase.__instance.lock = Lock() + MultiprocessBase.__instance.finished = False + MultiprocessBase.__instance.queue_consumer = \ + Process(target=cls.__consumer) + MultiprocessBase.__instance.queue_consumer.start() + for key, value in kwargs.items(): + setattr(MultiprocessBase.__instance, key, value) + return MultiprocessBase.__instance + + def __consumer(self): + condition = Condition() + while not self.finished: + with condition: + condition.wait_for( + lambda: self.running_processes <= MAX_PROCESS and not + self.waiting_processes.empty()) + process = self.waiting_processes.get() + self.__spawn(process) + + def __spawn(self, process: Dict[str, Any]): + connection = self.get_connection() + child_process = Process(target=self.__run, + args=(process, connection,)) + child_process.start() + + @abstractmethod + def get_connection(self) -> Optional[Any]: + with self.lock: + if self.running_processes <= MAX_PROCESS: + self.running_processes += 1 + return None + + @abstractmethod + def free_connection(self, connection): + with self.lock: + self.running_processes -= 1 + + def __run(self, *args) -> Optional[Any]: + fn = args[0]["fn"] + fn_args = args[0]["args"] + result = fn(*fn_args, args[1]) + self.free_connection(args[1]) + return result + + def new(self, fn: Callable, *args): + self.waiting_processes.put({"fn": fn, "args": args}) + + def __del__(self): + if not self.finished: + self.finished = True + while not self.waiting_processes.empty(): + continue + self.queue_consumer.terminate()