From 2e2fb169c44b0ebe6e05646b6bddeb3be189dd1c Mon Sep 17 00:00:00 2001 From: Javinator9889 Date: Tue, 3 Dec 2019 13:18:46 +0100 Subject: [PATCH] Cleaned-up code --- YouTubeMDBot/database/psql.py | 120 ++---------------------- YouTubeMDBot/multiprocess/abcprocess.py | 18 ++-- 2 files changed, 19 insertions(+), 119 deletions(-) diff --git a/YouTubeMDBot/database/psql.py b/YouTubeMDBot/database/psql.py index ad8e14d..10a427d 100644 --- a/YouTubeMDBot/database/psql.py +++ b/YouTubeMDBot/database/psql.py @@ -31,7 +31,7 @@ class PostgreSQL(MultiprocessBase): def __new__(cls, **kwargs): - if MultiprocessBase.__instance is None: + if cls.__instance is None: connection = pool.ThreadedConnectionPool(minconn=1, maxconn=MAX_PROCESS, user=DB_USER, @@ -56,6 +56,15 @@ def get_connection(self) -> Optional[Any]: def execute(self, query: Query): super().new(self._execute, query) + def fetchone(self, query: Query): + super().new(self._fetchone, query) + + def fetchall(self, query: Query): + super().new(self._fetchall, query) + + def fetchiter(self, query: Query): + super().new(self._fetchiter, query) + @staticmethod def _execute(self, query: Query, connection): with connection.cursor() as cursor: @@ -96,112 +105,3 @@ def generate(): def __del__(self): super().__del__() self.connection.closeall() - -# class PostgresSQL: -# __instance = None -# -# def __new__(cls, **kwargs): -# if PostgresSQL.__instance is None: -# PostgresSQL.__instance = object.__new__(cls) -# PostgresSQL.__instance.__connection = \ -# pool.ThreadedConnectionPool(minconn=1, -# maxconn=MAX_PROCESS, -# user=DB_USER, -# password=DB_PASSWORD, -# dbname=DB_NAME, -# host=DB_HOST, -# port=DB_PORT) -# PostgresSQL.__instance.__waiting_processes = Queue() -# PostgresSQL.__instance.__running_processes = 0 -# PostgresSQL.__instance.__lock = Lock() -# PostgresSQL.__instance.__queue_consumer = \ -# Process(target=cls.__consumer, -# args=(cls, cls.__running_processes,)) -# PostgresSQL.__instance.__queue_consumer.start() -# for key, value in kwargs.items(): -# setattr(PostgresSQL.__instance, key, value) -# return PostgresSQL.__instance -# -# def __consumer(self, process_queue: Queue): -# condition = Condition() -# while self.__connection.closed == 0: -# with condition: -# condition.wait_for( -# lambda: self.__running_processes <= MAX_PROCESS) -# pending_process = process_queue.get() -# pending_process["fn"](pending_process["query"]) -# -# def __get_connection(self) -> Optional[Any]: -# with self.__lock: -# if self.__running_processes <= MAX_PROCESS: -# self.__running_processes += 1 -# try: -# return self.__connection.getconn() -# except PoolError: -# return None -# return None -# -# def __free_connection(self, connection): -# self.__connection.putconn(connection) -# with self.__lock: -# self.__running_processes -= 1 -# -# def execute(self, query: Query): -# connection = self.__get_connection() -# if connection is not None: -# with connection.cursor() as cursor: -# cursor.execute(query.query) -# query._result = cursor.rowcount -# query.is_completed = True -# self.__free_connection(connection) -# else: -# self.__waiting_processes.put({"query": query, "fn": self.execute}) -# -# def fetchone(self, query: Query): -# connection = self.__get_connection() -# if connection is not None: -# with connection.cursor() as cursor: -# cursor.execute(query.query) -# query._result = cursor.fetchone() -# query.is_completed = True -# self.__free_connection(connection) -# else: -# self.__waiting_processes.put({"query": query, "fn": self.fetchone}) -# -# def fetchall(self, query: Query): -# connection = self.__get_connection() -# if connection is not None: -# with connection.cursor() as cursor: -# cursor.execute(query.query) -# query._result = cursor.fetchall() -# query.is_completed = True -# self.__free_connection(connection) -# else: -# self.__waiting_processes.put({"query": query, "fn": self.fetchall}) -# -# def fetchiter(self, query: Query): -# connection = self.__get_connection() -# if connection is not None: -# with connection.cursor() as cursor: -# cursor.execute(query.query) -# -# def generate(): -# while True: -# items = cursor.fetchmany() -# if not items: -# break -# for item in items: -# yield item -# -# query._result = generate() -# query.is_completed = True -# self.__free_connection(connection) -# else: -# self.__waiting_processes.put({"query": query, "fn": self.fetchiter}) -# -# def __del__(self): -# if self.__connection.closed == 0: -# while not self.__waiting_processes.empty(): -# continue -# self.__queue_consumer.terminate() -# self.__instance.__connection.closeall() diff --git a/YouTubeMDBot/multiprocess/abcprocess.py b/YouTubeMDBot/multiprocess/abcprocess.py index 7670a3e..9923561 100644 --- a/YouTubeMDBot/multiprocess/abcprocess.py +++ b/YouTubeMDBot/multiprocess/abcprocess.py @@ -37,17 +37,17 @@ class MultiprocessBase(ABC): def __new__(cls, maxsize: int = 0, **kwargs): if MultiprocessBase.__instance is None: - MultiprocessBase.__instance = object.__new__(cls) - MultiprocessBase.__instance.waiting_processes = Queue(maxsize) - MultiprocessBase.__instance.running_processes = 0 - MultiprocessBase.__instance.lock = Lock() - MultiprocessBase.__instance.finished = False - MultiprocessBase.__instance.queue_consumer = \ + cls.__instance = object.__new__(cls) + cls.__instance.waiting_processes = Queue(maxsize) + cls.__instance.running_processes = 0 + cls.__instance.lock = Lock() + cls.__instance.finished = False + cls.__instance.queue_consumer = \ Process(target=cls.__consumer) - MultiprocessBase.__instance.queue_consumer.start() + cls.__instance.queue_consumer.start() for key, value in kwargs.items(): - setattr(MultiprocessBase.__instance, key, value) - return MultiprocessBase.__instance + setattr(cls.__instance, key, value) + return cls.__instance def __consumer(self): condition = Condition()