Skip to content

Commit

Permalink
Multiprocess base class finished - completely functional
Browse files Browse the repository at this point in the history
  • Loading branch information
Javinator9889 committed Dec 13, 2019
1 parent 2e2fb16 commit 993b6fe
Show file tree
Hide file tree
Showing 7 changed files with 137 additions and 21 deletions.
1 change: 1 addition & 0 deletions .idea/dictionaries/javinator9889.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions YouTubeMDBot/__init__.py
Expand Up @@ -32,6 +32,7 @@
from .downloader import YouTubeDownloader

from .errors import EmptyBodyError
from .errors import FinishedException

from .logging_utils import LoggingHandler
from .logging_utils import setup_logging
Expand Down
3 changes: 3 additions & 0 deletions YouTubeMDBot/__main__.py
Expand Up @@ -13,3 +13,6 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from .tests.multiprocess_tests import main

main()
22 changes: 22 additions & 0 deletions YouTubeMDBot/errors/FinishedException.py
@@ -0,0 +1,22 @@
# YouTubeMDBot
# Copyright (C) 2019 - Javinator9889
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.


class FinishedException(Exception):
"""
Raises an exception when the process has finished working.
"""
pass
1 change: 1 addition & 0 deletions YouTubeMDBot/errors/__init__.py
Expand Up @@ -14,3 +14,4 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from ..errors.EmptyBodyError import EmptyBodyError
from ..errors.FinishedException import FinishedException
65 changes: 44 additions & 21 deletions YouTubeMDBot/multiprocess/abcprocess.py
Expand Up @@ -23,73 +23,96 @@

from queue import Queue

from threading import Condition
from threading import Lock

from multiprocessing import Process
from threading import Thread
from threading import Condition

from .. import MAX_PROCESS
from .. import FinishedException


class MultiprocessBase(ABC):
__instance = None
inst = None

def __new__(cls, maxsize: int = 0, **kwargs):
def __new__(cls,
maxsize: int = 0,
max_processes: int = MAX_PROCESS,
**kwargs):
if MultiprocessBase.__instance is None:
cls.__instance = object.__new__(cls)
cls.__instance.waiting_processes = Queue(maxsize)
cls.__instance.running_processes = 0
cls.__instance.lock = Lock()
cls.__instance.finished = False
cls.__instance.__finished = False
cls.__instance.max_processes = max_processes
cls.__instance.__condition = Condition()
cls.__instance.queue_consumer = \
Process(target=cls.__consumer)
Thread(target=cls.__instance.__consumer)
cls.__instance.queue_consumer.start()
for key, value in kwargs.items():
setattr(cls.__instance, key, value)
return cls.__instance

def __process_ready(self) -> bool:
return self.running_processes < self.max_processes and not \
self.waiting_processes.empty() or self.finished

def __consumer(self):
condition = Condition()
while not self.finished:
with condition:
condition.wait_for(
lambda: self.running_processes <= MAX_PROCESS and not
self.waiting_processes.empty())
process = self.waiting_processes.get()
self.__spawn(process)
with self.__condition:
self.__condition.wait_for(self.__process_ready)
if not self.finished:
process = self.waiting_processes.get()
self.__spawn(process)
return

def __spawn(self, process: Dict[str, Any]):
connection = self.get_connection()
child_process = Process(target=self.__run,
args=(process, connection,))
child_process = Thread(target=self.__run,
args=(process, connection,))
child_process.start()

@abstractmethod
def get_connection(self) -> Optional[Any]:
with self.lock:
if self.running_processes <= MAX_PROCESS:
if self.running_processes <= self.max_processes:
self.running_processes += 1
return None

@abstractmethod
def free_connection(self, connection):
with self.lock:
self.running_processes -= 1
with self.__condition:
self.__condition.notify_all()

def __run(self, *args) -> Optional[Any]:
fn = args[0]["fn"]
fn_args = args[0]["args"]
result = fn(*fn_args, args[1])
result = fn(*fn_args, args[1]) if args[1] is not None else fn(*fn_args)
self.free_connection(args[1])
return result

def new(self, fn: Callable, *args):
self.waiting_processes.put({"fn": fn, "args": args})
if not self.finished:
self.waiting_processes.put({"fn": fn, "args": args})
with self.__condition:
self.__condition.notify_all()
else:
raise FinishedException("The process has finished")

@property
def finished(self) -> bool:
with self.lock:
return self.__finished

@finished.setter
def finished(self, value: bool):
with self.lock:
self.__finished = value

def __del__(self):
if not self.finished:
self.finished = True
while not self.waiting_processes.empty():
continue
self.queue_consumer.terminate()
self.finished = True
65 changes: 65 additions & 0 deletions YouTubeMDBot/tests/multiprocess_tests.py
@@ -0,0 +1,65 @@
# YouTubeMDBot
# Copyright (C) 2019 - Javinator9889
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
import threading

from typing import Optional, Any, Callable

from .. import MultiprocessBase


class MPTest(MultiprocessBase):
def get_connection(self) -> Optional[Any]:
return super().get_connection()

def free_connection(self, connection):
super().free_connection(connection)


def main():
from time import time
from time import sleep
from random import random

test = MPTest(max_processes=4)
startt = time()
print(f"Test created - start time: {startt:.3f}s")

def pinfo(x):
print(f"Process #{x} - executing at {(time() - startt):.3f}s")
t = (random() * 10) + 1
print(f"Process #{x} waiting {t:.3f}s")
st = time()
sleep(t)
print(f"Process #{x} wakes-up after {(time() - st):.3f}s and finishes")
print(f"Thread {os.getpid()} finished!")
return

print(f"Main PID: {os.getpid()}")

for i in range(20):
# print(f"Giving new function {i}")
f = pinfo
test.new(f, i)

while not test.waiting_processes.empty():
print(" ", end="\r")
print(f"Threads: {threading.active_count() - 2}", end="\r")
sleep(0.1)
# del test
test.finished = True
print(f"Main finished: {os.getpid()}")
return

0 comments on commit 993b6fe

Please sign in to comment.