Skip to content

Commit

Permalink
Updated Multiprocess library
Browse files Browse the repository at this point in the history
In this project there is no need of use another process to handle some data. That is why it is better to use a ThreadPool for managing multiple connections

TODO: the PostgreSQL class
  • Loading branch information
Javinator9889 committed Feb 12, 2020
1 parent 993b6fe commit ad0cbac
Show file tree
Hide file tree
Showing 16 changed files with 188 additions and 178 deletions.
5 changes: 4 additions & 1 deletion .gitlab-ci.yml
Expand Up @@ -18,8 +18,11 @@ cache:

before_script:
- python -V # Print out python version for debugging
- apt update && apt upgrade -y
- apt update
- apt install -y libchromaprint-tools ffmpeg --install-recommends
- git clone https://github.com/beetbox/pyacoustid.git && cd pyacoustid
- python setup.py install
- cd .. && rm -rf pyacoustid/
- pip install -r YouTubeMDBot/requirements.txt

test:pylint:
Expand Down
2 changes: 1 addition & 1 deletion .idea/YouTubeMDBot.iml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion .idea/misc.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions .idea/vcs.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 4 additions & 13 deletions YouTubeMDBot/__init__.py
Expand Up @@ -15,32 +15,23 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from .api import YouTubeAPI
from .api import YouTubeVideoData

from .audio import FPCalc
from .audio import FFmpegOGG
from .audio import FFmpegMP3
from .audio import FFmpegOGG
from .audio import FFmpegOpener
from .audio import FPCalc
from .audio import ffmpeg_available

from .commands import StartHandler

from .constants import *

from .decorators import restricted
from .decorators import send_action

from .downloader import MultipleYouTubeDownloader
from .downloader import YouTubeDownloader

from .errors import EmptyBodyError
from .errors import FinishedException

from .logging_utils import LoggingHandler
from .logging_utils import setup_logging

from .metadata import AudioMetadata
from .metadata import MetadataIdentifier
from .metadata import YouTubeMetadataIdentifier

from .multiprocess import ThreadPoolBase
from .utils import get_yt_video_id

from .multiprocess import MultiprocessBase
6 changes: 3 additions & 3 deletions YouTubeMDBot/constants/app_constants.py
Expand Up @@ -47,8 +47,8 @@
MAX_PROCESS = cpu_count()

# Database constants
DB_NAME = os.environ["DATABASE_NAME"]
DB_USER = os.environ["DATABASE_USER"]
DB_PASSWORD = os.environ["DATABASE_PASSWORD"]
DB_NAME = True # os.environ["DATABASE_NAME"]
DB_USER = True # os.environ["DATABASE_USER"]
DB_PASSWORD = True # os.environ["DATABASE_PASSWORD"]
DB_HOST = "127.0.0.1"
DB_PORT = 5432
11 changes: 5 additions & 6 deletions YouTubeMDBot/database/psql.py
Expand Up @@ -20,13 +20,12 @@
from psycopg2.pool import PoolError

from . import Query
from .. import MAX_PROCESS
from .. import DB_USER
from .. import DB_PORT
from .. import DB_PASSWORD
from .. import DB_NAME
from .. import DB_HOST
from .. import MultiprocessBase
from .. import DB_NAME
from .. import DB_PASSWORD
from .. import DB_PORT
from .. import DB_USER
from .. import MAX_PROCESS


class PostgreSQL(MultiprocessBase):
Expand Down
1 change: 1 addition & 0 deletions YouTubeMDBot/downloader/__init__.py
Expand Up @@ -13,4 +13,5 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from ..downloader.youtube_downloader import MultipleYouTubeDownloader
from ..downloader.youtube_downloader import YouTubeDownloader
26 changes: 25 additions & 1 deletion YouTubeMDBot/downloader/youtube_downloader.py
Expand Up @@ -14,18 +14,23 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from io import BytesIO
from typing import Any
from typing import Callable
from typing import Tuple

from .. import ThreadPoolBase
from ..constants.app_constants import YDL_CLI_OPTIONS


class YouTubeDownloader:
"""
Download a YouTube video directly into memory.
"""

def __init__(self, url: str):
"""
Creates the YouTubeDownloader object. Call "download" for obtaining the video.
Creates the YouTubeDownloader object. Call "download" for obtaining
the video.
:param url: the video URL.
"""
self.__url: str = url
Expand Down Expand Up @@ -56,3 +61,22 @@ def get_url(self) -> str:
:return: str with the URL.
"""
return self.__url


class MultipleYouTubeDownloader(ThreadPoolBase):
def __new__(cls,
max_processes: int = 4,
name: str = "YouTubeDownloader",
**kwargs):
return super().__new__(cls, max_processes, name, **kwargs)

def download(self, yt_obj: YouTubeDownloader) -> Tuple[BytesIO, bytes]:
return super().wait_execute(yt_obj.download)

def download_async(self,
yt_obj: YouTubeDownloader,
callback: Callable[[Any], Any] = None,
error_callback: Callable[[Any], Any] = None):
return super().execute(yt_obj.download,
callback=callback,
err_callback=error_callback)
2 changes: 1 addition & 1 deletion YouTubeMDBot/multiprocess/__init__.py
Expand Up @@ -13,4 +13,4 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from ..multiprocess.abcprocess import MultiprocessBase
from ..multiprocess.abcprocess import ThreadPoolBase
108 changes: 34 additions & 74 deletions YouTubeMDBot/multiprocess/abcprocess.py
Expand Up @@ -14,105 +14,65 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from abc import ABC
from abc import abstractmethod

from multiprocessing.pool import ThreadPool
from threading import Lock
from typing import Any
from typing import Dict
from typing import Optional
from typing import Callable

from queue import Queue

from threading import Lock
from threading import Thread
from threading import Condition

from .. import MAX_PROCESS
from .. import FinishedException
from .. import MAX_PROCESS


class MultiprocessBase(ABC):
class ThreadPoolBase(ABC):
__instance = None

def __new__(cls,
maxsize: int = 0,
max_processes: int = MAX_PROCESS,
name: str = "ThreadBase",
**kwargs):
if MultiprocessBase.__instance is None:
if ThreadPoolBase.__instance is None:
cls.__instance = object.__new__(cls)
cls.__instance.waiting_processes = Queue(maxsize)
cls.__instance.running_processes = 0
cls.__instance.lock = Lock()
cls.__instance.__pool = ThreadPool(processes=max_processes)
cls.__instance.__lock = Lock()
cls.__instance.__finished = False
cls.__instance.max_processes = max_processes
cls.__instance.__condition = Condition()
cls.__instance.queue_consumer = \
Thread(target=cls.__instance.__consumer)
cls.__instance.queue_consumer.start()
cls.__instance.name = name
for key, value in kwargs.items():
setattr(cls.__instance, key, value)
return cls.__instance

def __process_ready(self) -> bool:
return self.running_processes < self.max_processes and not \
self.waiting_processes.empty() or self.finished

def __consumer(self):
while not self.finished:
with self.__condition:
self.__condition.wait_for(self.__process_ready)
if not self.finished:
process = self.waiting_processes.get()
self.__spawn(process)
return

def __spawn(self, process: Dict[str, Any]):
connection = self.get_connection()
child_process = Thread(target=self.__run,
args=(process, connection,))
child_process.start()

@abstractmethod
def get_connection(self) -> Optional[Any]:
with self.lock:
if self.running_processes <= self.max_processes:
self.running_processes += 1
return None

@abstractmethod
def free_connection(self, connection):
with self.lock:
self.running_processes -= 1
with self.__condition:
self.__condition.notify_all()

def __run(self, *args) -> Optional[Any]:
fn = args[0]["fn"]
fn_args = args[0]["args"]
result = fn(*fn_args, args[1]) if args[1] is not None else fn(*fn_args)
self.free_connection(args[1])
return result

def new(self, fn: Callable, *args):
if not self.finished:
self.waiting_processes.put({"fn": fn, "args": args})
with self.__condition:
self.__condition.notify_all()
else:
raise FinishedException("The process has finished")

@property
def finished(self) -> bool:
with self.lock:
with self.__lock:
return self.__finished

@finished.setter
def finished(self, value: bool):
with self.lock:
with self.__lock:
self.__finished = value

def __del__(self):
if not self.finished:
while not self.waiting_processes.empty():
continue
self.__pool.close()
self.__pool.join()
self.finished = True

def wait_execute(self, func: Callable, *args, **kwargs) -> Any:
if not self.finished:
return self.__pool.apply(func=func, args=args, kwds=kwargs)
else:
raise FinishedException(f"The thread pool {self.name} has finished")

def execute(self,
func: Callable,
args=(),
kwds={},
callback: Callable[[Any], Any] = None,
err_callback: Callable[[Any], Any] = None):
if not self.finished:
return self.__pool.apply_async(func=func,
args=args,
kwds=kwds,
callback=callback,
error_callback=err_callback)
else:
raise FinishedException(f"The thread pool {self.name} has finished")
3 changes: 1 addition & 2 deletions YouTubeMDBot/requirements.txt
Expand Up @@ -4,6 +4,5 @@ google-api-python-client
musicbrainzngs
ujson
youtube_dl
pyacoustid
python-telegram-bot
psycopg2
psycopg2-binary
34 changes: 24 additions & 10 deletions YouTubeMDBot/tests/downloader.py
Expand Up @@ -2,6 +2,7 @@
import unittest
from time import sleep

from YouTubeMDBot.downloader import MultipleYouTubeDownloader
from YouTubeMDBot.downloader import YouTubeDownloader


Expand All @@ -11,14 +12,25 @@ class DownloadTest(unittest.TestCase):
_lock = threading.Lock()

def test_multithread_download(self):
yt1 = YouTubeDownloader(url="https://www.youtube.com/watch?v=Inm-N5rLUSI")
yt2 = YouTubeDownloader(url="https://www.youtube.com/watch?v=-_ZwpOdXXcA")
yt3 = YouTubeDownloader(url="https://www.youtube.com/watch?v=WOGWZD5iT10")
yt4 = YouTubeDownloader(url="https://www.youtube.com/watch?v=9HfoNUjw5u8")
t1 = threading.Thread(target=self.write_to_file, args=(yt1, "v1.m4a",))
t2 = threading.Thread(target=self.write_to_file, args=(yt2, "v2.m4a",))
t3 = threading.Thread(target=self.write_to_file, args=(yt3, "v3.m4a",))
t4 = threading.Thread(target=self.write_to_file, args=(yt4, "v4.m4a",))
yt1 = YouTubeDownloader(
url="https://www.youtube.com/watch?v=Inm-N5rLUSI")
yt2 = YouTubeDownloader(
url="https://www.youtube.com/watch?v=-_ZwpOdXXcA")
yt3 = YouTubeDownloader(
url="https://www.youtube.com/watch?v=WOGWZD5iT10")
yt4 = YouTubeDownloader(
url="https://www.youtube.com/watch?v=9HfoNUjw5u8")

ytdl = MultipleYouTubeDownloader()
ft1 = ytdl.download_async(yt1)
ft2 = ytdl.download_async(yt2)
ft3 = ytdl.download_async(yt3)
ft4 = ytdl.download_async(yt4)

t1 = threading.Thread(target=self.write_to_file, args=(ft1, "v1.m4a",))
t2 = threading.Thread(target=self.write_to_file, args=(ft2, "v2.m4a",))
t3 = threading.Thread(target=self.write_to_file, args=(ft3, "v3.m4a",))
t4 = threading.Thread(target=self.write_to_file, args=(ft4, "v4.m4a",))

self._max = 4

Expand All @@ -30,12 +42,14 @@ def test_multithread_download(self):
while self._elements < self._max:
sleep(1)

del ytdl

def barrier(self):
with self._lock:
self._elements += 1

def write_to_file(self, yt: YouTubeDownloader, name: str):
_, data = yt.download()
def write_to_file(self, future, name: str):
_, data = future.get()
print(name + " downloaded")
with open(name, "wb") as f:
f.write(data)
Expand Down

0 comments on commit ad0cbac

Please sign in to comment.