Skip to content

Commit

Permalink
Updated multiprocess base class
Browse files Browse the repository at this point in the history
Using an abstract class for encapsulating common methods and attributes that must be shared between all subclasses
  • Loading branch information
Javinator9889 committed Dec 3, 2019
1 parent a8ea3d8 commit 2c9f028
Show file tree
Hide file tree
Showing 13 changed files with 333 additions and 100 deletions.
3 changes: 3 additions & 0 deletions .idea/.gitignore

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions .idea/YouTubeMDBot.iml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions .idea/codeStyles/codeStyleConfig.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions .idea/dictionaries/javinator9889.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions .idea/inspectionProfiles/profiles_settings.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions .idea/misc.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions .idea/modules.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions .idea/vcs.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions YouTubeMDBot/__init__.py
Expand Up @@ -41,3 +41,5 @@
from .metadata import YouTubeMetadataIdentifier

from .utils import get_yt_video_id

from .multiprocess import MultiprocessBase
2 changes: 1 addition & 1 deletion YouTubeMDBot/database/__init__.py
Expand Up @@ -13,5 +13,5 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from ..database.psql import PostgresSQL
from ..database.psql import PostgreSQL
from ..database.query import Query
265 changes: 166 additions & 99 deletions YouTubeMDBot/database/psql.py
Expand Up @@ -15,12 +15,9 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from typing import Any
from typing import Optional
from queue import Queue

from psycopg2 import pool
from psycopg2.pool import PoolError
from threading import Condition
from threading import Lock
from multiprocessing import Process

from . import Query
from .. import MAX_PROCESS
Expand All @@ -29,112 +26,182 @@
from .. import DB_PASSWORD
from .. import DB_NAME
from .. import DB_HOST
from .. import MultiprocessBase


class PostgresSQL:
__instance = None

class PostgreSQL(MultiprocessBase):
def __new__(cls, **kwargs):
if PostgresSQL.__instance is None:
PostgresSQL.__instance = object.__new__(cls)
PostgresSQL.__instance.__connection = \
pool.ThreadedConnectionPool(minconn=1,
maxconn=MAX_PROCESS,
user=DB_USER,
password=DB_PASSWORD,
dbname=DB_NAME,
host=DB_HOST,
port=DB_PORT)
PostgresSQL.__instance.__waiting_processes = Queue()
PostgresSQL.__instance.__running_processes = 0
PostgresSQL.__instance.__lock = Lock()
PostgresSQL.__instance.__queue_consumer = \
Process(target=cls.__consumer,
args=(cls, cls.__running_processes,))
PostgresSQL.__instance.__queue_consumer.start()
for key, value in kwargs.items():
setattr(PostgresSQL.__instance, key, value)
return PostgresSQL.__instance
if MultiprocessBase.__instance is None:
connection = pool.ThreadedConnectionPool(minconn=1,
maxconn=MAX_PROCESS,
user=DB_USER,
password=DB_PASSWORD,
dbname=DB_NAME,
host=DB_HOST,
port=DB_PORT)
return super().__new__(cls, connection=connection)
return super().__new__(cls)

def __consumer(self, process_queue: Queue):
condition = Condition()
while self.__connection.closed == 0:
with condition:
condition.wait_for(lambda: self.__running_processes <= MAX_PROCESS)
pending_process = process_queue.get()
pending_process["fn"](pending_process["query"])
def free_connection(self, connection):
super().free_connection(connection)
self.connection.putconn(connection)

def __get_connection(self) -> Optional[Any]:
with self.__lock:
if self.__running_processes <= MAX_PROCESS:
self.__running_processes += 1
try:
return self.__connection.getconn()
except PoolError:
return None
return None

def __free_connection(self, connection):
self.__connection.putconn(connection)
with self.__lock:
self.__running_processes -= 1
def get_connection(self) -> Optional[Any]:
super().get_connection()
try:
return self.connection.getconn()
except PoolError:
return None

def execute(self, query: Query):
connection = self.__get_connection()
if connection is not None:
with connection.cursor() as cursor:
cursor.execute(query.query)
query._result = cursor.rowcount
query.is_completed = True
self.__free_connection(connection)
else:
self.__waiting_processes.put({"query": query, "fn": self.execute})
super().new(self._execute, query)

@staticmethod
def _execute(self, query: Query, connection):
with connection.cursor() as cursor:
cursor.execute(query.query)
query._result = cursor.rowcount
query.is_completed = True

def fetchone(self, query: Query):
connection = self.__get_connection()
if connection is not None:
with connection.cursor() as cursor:
cursor.execute(query.query)
query._result = cursor.fetchone()
query.is_completed = True
self.__free_connection(connection)
else:
self.__waiting_processes.put({"query": query, "fn": self.fetchone})
@staticmethod
def _fetchone(query: Query, connection):
with connection.cursor() as cursor:
cursor.execute(query.query)
query._result = cursor.fetchone()
query.is_completed = True

def fetchall(self, query: Query):
connection = self.__get_connection()
if connection is not None:
with connection.cursor() as cursor:
cursor.execute(query.query)
query._result = cursor.fetchall()
query.is_completed = True
self.__free_connection(connection)
else:
self.__waiting_processes.put({"query": query, "fn": self.fetchall})
@staticmethod
def _fetchall(query: Query, connection):
with connection.cursor() as cursor:
cursor.execute(query.query)
query._result = cursor.fetchall()
query.is_completed = True

def fetchiter(self, query: Query):
connection = self.__get_connection()
if connection is not None:
with connection.cursor() as cursor:
cursor.execute(query.query)
@staticmethod
def _fetchiter(query: Query, connection):
with connection.cursor() as cursor:
cursor.execute(query.query)

def generate():
while True:
items = cursor.fetchmany()
if not items:
break
for item in items:
yield item
def generate():
while True:
items = cursor.fetchmany()
if not items:
break
for item in items:
yield item

query._result = generate()
query.is_completed = True
self.__free_connection(connection)
else:
self.__waiting_processes.put({"query": query, "fn": self.fetchiter})
query._result = generate()
query.is_completed = True

def __del__(self):
if self.__connection.closed == 0:
while not self.__waiting_processes.empty():
continue
self.__queue_consumer.terminate()
self.__instance.__connection.closeall()
super().__del__()
self.connection.closeall()

# class PostgresSQL:
# __instance = None
#
# def __new__(cls, **kwargs):
# if PostgresSQL.__instance is None:
# PostgresSQL.__instance = object.__new__(cls)
# PostgresSQL.__instance.__connection = \
# pool.ThreadedConnectionPool(minconn=1,
# maxconn=MAX_PROCESS,
# user=DB_USER,
# password=DB_PASSWORD,
# dbname=DB_NAME,
# host=DB_HOST,
# port=DB_PORT)
# PostgresSQL.__instance.__waiting_processes = Queue()
# PostgresSQL.__instance.__running_processes = 0
# PostgresSQL.__instance.__lock = Lock()
# PostgresSQL.__instance.__queue_consumer = \
# Process(target=cls.__consumer,
# args=(cls, cls.__running_processes,))
# PostgresSQL.__instance.__queue_consumer.start()
# for key, value in kwargs.items():
# setattr(PostgresSQL.__instance, key, value)
# return PostgresSQL.__instance
#
# def __consumer(self, process_queue: Queue):
# condition = Condition()
# while self.__connection.closed == 0:
# with condition:
# condition.wait_for(
# lambda: self.__running_processes <= MAX_PROCESS)
# pending_process = process_queue.get()
# pending_process["fn"](pending_process["query"])
#
# def __get_connection(self) -> Optional[Any]:
# with self.__lock:
# if self.__running_processes <= MAX_PROCESS:
# self.__running_processes += 1
# try:
# return self.__connection.getconn()
# except PoolError:
# return None
# return None
#
# def __free_connection(self, connection):
# self.__connection.putconn(connection)
# with self.__lock:
# self.__running_processes -= 1
#
# def execute(self, query: Query):
# connection = self.__get_connection()
# if connection is not None:
# with connection.cursor() as cursor:
# cursor.execute(query.query)
# query._result = cursor.rowcount
# query.is_completed = True
# self.__free_connection(connection)
# else:
# self.__waiting_processes.put({"query": query, "fn": self.execute})
#
# def fetchone(self, query: Query):
# connection = self.__get_connection()
# if connection is not None:
# with connection.cursor() as cursor:
# cursor.execute(query.query)
# query._result = cursor.fetchone()
# query.is_completed = True
# self.__free_connection(connection)
# else:
# self.__waiting_processes.put({"query": query, "fn": self.fetchone})
#
# def fetchall(self, query: Query):
# connection = self.__get_connection()
# if connection is not None:
# with connection.cursor() as cursor:
# cursor.execute(query.query)
# query._result = cursor.fetchall()
# query.is_completed = True
# self.__free_connection(connection)
# else:
# self.__waiting_processes.put({"query": query, "fn": self.fetchall})
#
# def fetchiter(self, query: Query):
# connection = self.__get_connection()
# if connection is not None:
# with connection.cursor() as cursor:
# cursor.execute(query.query)
#
# def generate():
# while True:
# items = cursor.fetchmany()
# if not items:
# break
# for item in items:
# yield item
#
# query._result = generate()
# query.is_completed = True
# self.__free_connection(connection)
# else:
# self.__waiting_processes.put({"query": query, "fn": self.fetchiter})
#
# def __del__(self):
# if self.__connection.closed == 0:
# while not self.__waiting_processes.empty():
# continue
# self.__queue_consumer.terminate()
# self.__instance.__connection.closeall()
16 changes: 16 additions & 0 deletions YouTubeMDBot/multiprocess/__init__.py
@@ -0,0 +1,16 @@
# YouTubeMDBot
# Copyright (C) 2019 - Javinator9889
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from ..multiprocess.abcprocess import MultiprocessBase

0 comments on commit 2c9f028

Please sign in to comment.