Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Cleaned-up code
  • Loading branch information
Javinator9889 committed Dec 3, 2019
1 parent 2c9f028 commit 2e2fb16
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 119 deletions.
120 changes: 10 additions & 110 deletions YouTubeMDBot/database/psql.py
Expand Up @@ -31,7 +31,7 @@

class PostgreSQL(MultiprocessBase):
def __new__(cls, **kwargs):
if MultiprocessBase.__instance is None:
if cls.__instance is None:
connection = pool.ThreadedConnectionPool(minconn=1,
maxconn=MAX_PROCESS,
user=DB_USER,
Expand All @@ -56,6 +56,15 @@ def get_connection(self) -> Optional[Any]:
def execute(self, query: Query):
super().new(self._execute, query)

def fetchone(self, query: Query):
super().new(self._fetchone, query)

def fetchall(self, query: Query):
super().new(self._fetchall, query)

def fetchiter(self, query: Query):
super().new(self._fetchiter, query)

@staticmethod
def _execute(self, query: Query, connection):
with connection.cursor() as cursor:
Expand Down Expand Up @@ -96,112 +105,3 @@ def generate():
def __del__(self):
super().__del__()
self.connection.closeall()

# class PostgresSQL:
# __instance = None
#
# def __new__(cls, **kwargs):
# if PostgresSQL.__instance is None:
# PostgresSQL.__instance = object.__new__(cls)
# PostgresSQL.__instance.__connection = \
# pool.ThreadedConnectionPool(minconn=1,
# maxconn=MAX_PROCESS,
# user=DB_USER,
# password=DB_PASSWORD,
# dbname=DB_NAME,
# host=DB_HOST,
# port=DB_PORT)
# PostgresSQL.__instance.__waiting_processes = Queue()
# PostgresSQL.__instance.__running_processes = 0
# PostgresSQL.__instance.__lock = Lock()
# PostgresSQL.__instance.__queue_consumer = \
# Process(target=cls.__consumer,
# args=(cls, cls.__running_processes,))
# PostgresSQL.__instance.__queue_consumer.start()
# for key, value in kwargs.items():
# setattr(PostgresSQL.__instance, key, value)
# return PostgresSQL.__instance
#
# def __consumer(self, process_queue: Queue):
# condition = Condition()
# while self.__connection.closed == 0:
# with condition:
# condition.wait_for(
# lambda: self.__running_processes <= MAX_PROCESS)
# pending_process = process_queue.get()
# pending_process["fn"](pending_process["query"])
#
# def __get_connection(self) -> Optional[Any]:
# with self.__lock:
# if self.__running_processes <= MAX_PROCESS:
# self.__running_processes += 1
# try:
# return self.__connection.getconn()
# except PoolError:
# return None
# return None
#
# def __free_connection(self, connection):
# self.__connection.putconn(connection)
# with self.__lock:
# self.__running_processes -= 1
#
# def execute(self, query: Query):
# connection = self.__get_connection()
# if connection is not None:
# with connection.cursor() as cursor:
# cursor.execute(query.query)
# query._result = cursor.rowcount
# query.is_completed = True
# self.__free_connection(connection)
# else:
# self.__waiting_processes.put({"query": query, "fn": self.execute})
#
# def fetchone(self, query: Query):
# connection = self.__get_connection()
# if connection is not None:
# with connection.cursor() as cursor:
# cursor.execute(query.query)
# query._result = cursor.fetchone()
# query.is_completed = True
# self.__free_connection(connection)
# else:
# self.__waiting_processes.put({"query": query, "fn": self.fetchone})
#
# def fetchall(self, query: Query):
# connection = self.__get_connection()
# if connection is not None:
# with connection.cursor() as cursor:
# cursor.execute(query.query)
# query._result = cursor.fetchall()
# query.is_completed = True
# self.__free_connection(connection)
# else:
# self.__waiting_processes.put({"query": query, "fn": self.fetchall})
#
# def fetchiter(self, query: Query):
# connection = self.__get_connection()
# if connection is not None:
# with connection.cursor() as cursor:
# cursor.execute(query.query)
#
# def generate():
# while True:
# items = cursor.fetchmany()
# if not items:
# break
# for item in items:
# yield item
#
# query._result = generate()
# query.is_completed = True
# self.__free_connection(connection)
# else:
# self.__waiting_processes.put({"query": query, "fn": self.fetchiter})
#
# def __del__(self):
# if self.__connection.closed == 0:
# while not self.__waiting_processes.empty():
# continue
# self.__queue_consumer.terminate()
# self.__instance.__connection.closeall()
18 changes: 9 additions & 9 deletions YouTubeMDBot/multiprocess/abcprocess.py
Expand Up @@ -37,17 +37,17 @@ class MultiprocessBase(ABC):

def __new__(cls, maxsize: int = 0, **kwargs):
if MultiprocessBase.__instance is None:
MultiprocessBase.__instance = object.__new__(cls)
MultiprocessBase.__instance.waiting_processes = Queue(maxsize)
MultiprocessBase.__instance.running_processes = 0
MultiprocessBase.__instance.lock = Lock()
MultiprocessBase.__instance.finished = False
MultiprocessBase.__instance.queue_consumer = \
cls.__instance = object.__new__(cls)
cls.__instance.waiting_processes = Queue(maxsize)
cls.__instance.running_processes = 0
cls.__instance.lock = Lock()
cls.__instance.finished = False
cls.__instance.queue_consumer = \
Process(target=cls.__consumer)
MultiprocessBase.__instance.queue_consumer.start()
cls.__instance.queue_consumer.start()
for key, value in kwargs.items():
setattr(MultiprocessBase.__instance, key, value)
return MultiprocessBase.__instance
setattr(cls.__instance, key, value)
return cls.__instance

def __consumer(self):
condition = Condition()
Expand Down

0 comments on commit 2e2fb16

Please sign in to comment.