From b234aee561734ca74f28b7f2e53f59f37dd61507 Mon Sep 17 00:00:00 2001 From: Javinator9889 Date: Mon, 1 Jun 2020 10:01:37 +0200 Subject: [PATCH] Updated tests - removed old ones and created the structure for the new ones --- YouTubeMDBot/.idea/YouTubeMDBot.iml | 6 +- YouTubeMDBot/__main__.py | 6 +- YouTubeMDBot/downloader/youtube_downloader.py | 2 +- YouTubeMDBot/tests/converter.py | 35 ----- YouTubeMDBot/tests/database.py | 41 ------ YouTubeMDBot/tests/download_test.py | 71 ++++++++++ YouTubeMDBot/tests/downloader.py | 59 --------- YouTubeMDBot/tests/identifier.py | 124 ------------------ YouTubeMDBot/tests/m4adownloader.py | 29 ---- YouTubeMDBot/tests/multiprocess_tests.py | 62 --------- YouTubeMDBot/tests/song_search.py | 26 ---- YouTubeMDBot/tests/tagger.py | 45 ------- 12 files changed, 80 insertions(+), 426 deletions(-) delete mode 100644 YouTubeMDBot/tests/converter.py delete mode 100644 YouTubeMDBot/tests/database.py create mode 100644 YouTubeMDBot/tests/download_test.py delete mode 100755 YouTubeMDBot/tests/downloader.py delete mode 100755 YouTubeMDBot/tests/identifier.py delete mode 100644 YouTubeMDBot/tests/m4adownloader.py delete mode 100644 YouTubeMDBot/tests/multiprocess_tests.py delete mode 100755 YouTubeMDBot/tests/song_search.py delete mode 100755 YouTubeMDBot/tests/tagger.py diff --git a/YouTubeMDBot/.idea/YouTubeMDBot.iml b/YouTubeMDBot/.idea/YouTubeMDBot.iml index 3d4ed49..508fbf6 100755 --- a/YouTubeMDBot/.idea/YouTubeMDBot.iml +++ b/YouTubeMDBot/.idea/YouTubeMDBot.iml @@ -1,7 +1,11 @@ - + + + + + diff --git a/YouTubeMDBot/__main__.py b/YouTubeMDBot/__main__.py index 739a9d4..a159692 100755 --- a/YouTubeMDBot/__main__.py +++ b/YouTubeMDBot/__main__.py @@ -13,6 +13,6 @@ # # You should have received a copy of the GNU General Public License # along with this program. If not, see . -from .tests.multiprocess_tests import main - -main() +if __name__ == '__main__': + # TODO + pass diff --git a/YouTubeMDBot/downloader/youtube_downloader.py b/YouTubeMDBot/downloader/youtube_downloader.py index 8afaa62..72e9523 100755 --- a/YouTubeMDBot/downloader/youtube_downloader.py +++ b/YouTubeMDBot/downloader/youtube_downloader.py @@ -86,7 +86,7 @@ def download(self, yt_obj: YouTubeDownloader) -> Tuple[BytesIO, bytes]: def download_async(self, yt_obj: YouTubeDownloader, - callback: Callable[[Any], Any] = None, + callback: Callable[[tuple], Any] = None, error_callback: Callable[[Any], Any] = None): return super().execute(yt_obj.download, callback=callback, diff --git a/YouTubeMDBot/tests/converter.py b/YouTubeMDBot/tests/converter.py deleted file mode 100644 index 0cf6e43..0000000 --- a/YouTubeMDBot/tests/converter.py +++ /dev/null @@ -1,35 +0,0 @@ -import unittest -import mutagen - -from io import BytesIO -from typing import Tuple - -# from YouTubeMDBot.tests.tagger import TaggerTest -from YouTubeMDBot.audio import FFmpegMP3 -from YouTubeMDBot.audio import FFmpegOGG - -"""class MyTestCase(TaggerTest): - def find_metadata(self, future, downloader) -> Tuple[BytesIO, bytes, dict]: - print(f"Running test: find_metadata in {__file__}") - io, data, song_info = super().find_metadata(future, downloader) - io.seek(0) - mp3 = FFmpegMP3(data=data, bitrate="96k") # downrate - ogg = FFmpegOGG(data=data, bitrate="256k") # uprate - - mp3.convert() - ogg.convert() - - mp3_container = BytesIO(mp3.get_output()) - ogg_container = BytesIO(ogg.get_output()) - - print(mp3.get_extra().decode("utf-8")) - print(ogg.get_extra().decode("utf-8")) - - print(mutagen.File(mp3_container).pprint()) - print(mutagen.File(ogg_container).pprint()) - - return io, data, song_info - - -if __name__ == '__main__': - unittest.main()""" diff --git a/YouTubeMDBot/tests/database.py b/YouTubeMDBot/tests/database.py deleted file mode 100644 index e7b12ab..0000000 --- a/YouTubeMDBot/tests/database.py +++ /dev/null @@ -1,41 +0,0 @@ -import unittest -import time - -from YouTubeMDBot.database import * - - -class DatabaseTesting(unittest.TestCase): - def test_creation(self): - db_item = PostgreSQLItem() - db = Initializer(db_item) - db.init() - - user_db = UserDB(db_item) - print(hex(id(db))) - print(hex(id(user_db))) - - user_db.register_new_user(12334, "test", "test", "en") - user_db.register_new_user(12335, "test", "test", "en") - user_db.register_new_user(12336, "test", "test", "en") - user_db.register_new_user(12337, "test", "test", "en") - user_db.register_new_user(12338, "test", "test", "en") - user_db.register_new_user(12339, "test", "test", "en") - user_db.register_new_user(12330, "test", "test", "en") - user_db.register_new_user(12331, "test", "test", "en") - user_db.register_new_user(12332, "test", "test", "en") - user_db.register_new_user(12333, "test", "test", "en") - user_db.register_new_user(12344, "test", "test", "en") - - # time.sleep(1) - - # for uid in (12334, 12335, 12336, 12337, 12338, 12339, 12330, 12331, - # 12332, 12333, 12344): - # print(user_db.get_user_information(uid)) - - db_item.stop() - - # del db_item - - -if __name__ == '__main__': - unittest.main() diff --git a/YouTubeMDBot/tests/download_test.py b/YouTubeMDBot/tests/download_test.py new file mode 100644 index 0000000..6c16f7d --- /dev/null +++ b/YouTubeMDBot/tests/download_test.py @@ -0,0 +1,71 @@ +import unittest +import logging + +from YouTubeMDBot.downloader import YouTubeDownloader +from YouTubeMDBot.downloader import M4AYouTubeDownloader +from YouTubeMDBot.downloader import MultipleYouTubeDownloader + +from YouTubeMDBot.utils import CBarrier + +log = logging.basicConfig() + + +class DownloadTest(unittest.TestCase): + lock = Lock() + + @property + def finished(self): + with self.lock: + return self.__finished + + @finished.setter + def finished(self, value): + with self.lock: + self.__finished = value + + def test_single_download(self): + downloader = YouTubeDownloader( + url="https://www.youtube.com/watch?v=Inm-N5rLUSI" + ) + self._test_download(downloader) + + def test_single_download_to_m4a(self): + downloader = M4AYouTubeDownloader( + url="https://www.youtube.com/watch?v=Inm-N5rLUSI" + ) + self._test_download(downloader) + + def test_multiple_downloader(self): + downloader = MultipleYouTubeDownloader() + self.finished = 0 + urls = { + "https://www.youtube.com/watch?v=Inm-N5rLUSI", + "https://www.youtube.com/watch?v=-_ZwpOdXXcA", + "https://www.youtube.com/watch?v=WOGWZD5iT10", + "https://www.youtube.com/watch?v=GfKV9KaNJXc", + "https://www.youtube.com/watch?v=DiItGE3eAyQ", + "https://www.youtube.com/watch?v=GuZzuQvv7uc" + } + for url in urls: + yt_downloader = YouTubeDownloader(url) + downloader.download_async(yt_downloader, + callback=self._download_finished_callback, + error_callback=self._download_failed_callback) + while self.finished != 6: + sleep(1) + + def _test_download(self, downloader: YouTubeDownloader): + io, data = downloader.download() + self.assertEqual(io.read(), data) + + def _download_finished_callback(self, data): + print("Video download finished") + print(type(data)) + self.finished += 1 + + def _download_failed_callback(self, err): + print(f"Captured error: {err}") + + +if __name__ == '__main__': + unittest.main() diff --git a/YouTubeMDBot/tests/downloader.py b/YouTubeMDBot/tests/downloader.py deleted file mode 100755 index c49c308..0000000 --- a/YouTubeMDBot/tests/downloader.py +++ /dev/null @@ -1,59 +0,0 @@ -import threading -import unittest -from threading import Barrier - -from YouTubeMDBot.downloader import MultipleYouTubeDownloader -from YouTubeMDBot.downloader import YouTubeDownloader - - -class DownloadTest(unittest.TestCase): - _elements = 0 - _lock = threading.Lock() - _barrier = Barrier(parties=5) - - def test_multithread_download(self): - print(f"Running test: test_multithread_download in {__file__}") - yt1 = YouTubeDownloader( - url="https://www.youtube.com/watch?v=Inm-N5rLUSI") - yt2 = YouTubeDownloader( - url="https://www.youtube.com/watch?v=-_ZwpOdXXcA") - yt3 = YouTubeDownloader( - url="https://www.youtube.com/watch?v=WOGWZD5iT10") - yt4 = YouTubeDownloader( - url="https://www.youtube.com/watch?v=9HfoNUjw5u8") - - ytdl = MultipleYouTubeDownloader() - ft1 = ytdl.download_async(yt1, error_callback=handle_error) - ft2 = ytdl.download_async(yt2, error_callback=handle_error) - ft3 = ytdl.download_async(yt3, error_callback=handle_error) - ft4 = ytdl.download_async(yt4, error_callback=handle_error) - - t1 = threading.Thread(target=self.write_to_file, args=(ft1, "v1.m4a",)) - t2 = threading.Thread(target=self.write_to_file, args=(ft2, "v2.m4a",)) - t3 = threading.Thread(target=self.write_to_file, args=(ft3, "v3.m4a",)) - t4 = threading.Thread(target=self.write_to_file, args=(ft4, "v4.m4a",)) - - t1.start() - t2.start() - t3.start() - t4.start() - - self._barrier.wait() - - ytdl.close() - - def write_to_file(self, future, name: str): - _, data = future.get() - print(name + " downloaded") - with open(name, "wb") as f: - f.write(data) - self._barrier.wait() - - -def handle_error(exception): - print("Unexpected exception: " + str(exception)) - raise exception() - - -if __name__ == '__main__': - unittest.main() diff --git a/YouTubeMDBot/tests/identifier.py b/YouTubeMDBot/tests/identifier.py deleted file mode 100755 index 02fa13e..0000000 --- a/YouTubeMDBot/tests/identifier.py +++ /dev/null @@ -1,124 +0,0 @@ -import threading -import unittest -from io import BytesIO -from pprint import pprint -from time import time -from typing import Tuple -from threading import Barrier - -from YouTubeMDBot.downloader import MultipleYouTubeDownloader -from YouTubeMDBot.downloader import YouTubeDownloader -from YouTubeMDBot.metadata import YouTubeMetadataIdentifier - - -class IdentifierTest(unittest.TestCase): - lock = threading.Lock() - song_info = {} - barrier = Barrier(parties=7) - - def test_identification(self): - print(f"Running test: test_identification in {__file__}") - url = "https://www.youtube.com/watch?v=YQHsXMglC9A" - downloader = YouTubeDownloader(url=url) - audio, data = downloader.download() - with open("hello.m4a", "wb") as song: - song.write(data) - identifier = YouTubeMetadataIdentifier(audio=data, - downloader=downloader) - - valid = identifier.identify_audio() - assert valid - print("{0} by {1} - score: {2} / 1\n" - "\thttps://musicbrainz.org/recording/{3}\n" - "\thttps://musicbrainz.org/release/{4}\n\n" - .format(identifier.title, identifier.artist, - identifier.score, - identifier.recording_id, identifier.release_id)) - with open("cover.jpg", "wb") as cover: - cover.write(identifier.cover) - - def test_multiple_download_identification(self): - print(f"Running test: test_multiple_download_identification in" - f" {__file__}") - yt1 = YouTubeDownloader( - url="https://www.youtube.com/watch?v=Inm-N5rLUSI") - yt2 = YouTubeDownloader( - url="https://www.youtube.com/watch?v=-_ZwpOdXXcA") - yt3 = YouTubeDownloader( - url="https://www.youtube.com/watch?v=WOGWZD5iT10") - yt4 = YouTubeDownloader( - url="https://www.youtube.com/watch?v=GfKV9KaNJXc") - yt5 = YouTubeDownloader( - url="https://www.youtube.com/watch?v=DiItGE3eAyQ") - yt6 = YouTubeDownloader( - url="https://www.youtube.com/watch?v=GuZzuQvv7uc") - - ytdl = MultipleYouTubeDownloader() - - f1 = ytdl.download_async(yt1, error_callback=handle_error) - f2 = ytdl.download_async(yt2, error_callback=handle_error) - f3 = ytdl.download_async(yt3, error_callback=handle_error) - f4 = ytdl.download_async(yt4, error_callback=handle_error) - f5 = ytdl.download_async(yt5, error_callback=handle_error) - f6 = ytdl.download_async(yt6, error_callback=handle_error) - - t1 = threading.Thread(target=self.find_metadata, args=(f1, yt1,)) - t2 = threading.Thread(target=self.find_metadata, args=(f2, yt2,)) - t3 = threading.Thread(target=self.find_metadata, args=(f3, yt3,)) - t4 = threading.Thread(target=self.find_metadata, args=(f4, yt4,)) - t5 = threading.Thread(target=self.find_metadata, args=(f5, yt5,)) - t6 = threading.Thread(target=self.find_metadata, args=(f6, yt6,)) - - self.max = 6 - - t1.start() - t2.start() - t3.start() - t4.start() - t5.start() - t6.start() - - self.barrier.wait() - pprint("Finished") - ytdl.close() - - def find_metadata(self, future, downloader) -> Tuple[BytesIO, bytes, dict]: - st_dl_t = time() - io, data = future.get() - f_dl_t = time() - print("Downloaded {} - elapsed time: {:.1f}s" - .format(downloader.url, f_dl_t - st_dl_t)) - identifier = \ - YouTubeMetadataIdentifier(audio=data, downloader=downloader) - valid = identifier.identify_audio() - assert valid - song_info = {downloader.url: { - "title": identifier.title, - "artist": identifier.artist, - "cover": identifier.cover - }} - if not identifier.youtube_data: - song_info[downloader.url]["score"] = identifier.score - song_info[downloader.url]["record_id"] = \ - "https://musicbrainz.org/recording/{0}".format( - identifier.recording_id) - song_info[downloader.url]["release_id"] = \ - "https://musicbrainz.org/release/{0}".format( - identifier.release_id) - song_info[downloader.url]["album"] = identifier.album - else: - song_info[downloader.url][ - "duration"] = identifier.duration - song_info[downloader.url]["id"] = identifier.youtube_id - song_info[downloader.url]["youtube_data"] = True - self.barrier.wait() - return io, data, song_info - - -def handle_error(exception): - raise RuntimeError("Catch exception while running a thread: " - + str(exception)) - - -if __name__ == '__main__': - unittest.main() diff --git a/YouTubeMDBot/tests/m4adownloader.py b/YouTubeMDBot/tests/m4adownloader.py deleted file mode 100644 index 11aee9d..0000000 --- a/YouTubeMDBot/tests/m4adownloader.py +++ /dev/null @@ -1,29 +0,0 @@ -import unittest -import mutagen - -from YouTubeMDBot.downloader import M4AYouTubeDownloader -from YouTubeMDBot.audio import FFmpegM4A - - -class MyTestCase(unittest.TestCase): - def test_download(self): - print(f"Running test: test_download in {__file__}") - dl = M4AYouTubeDownloader( - url="https://www.youtube.com/watch?v=s6VaeFCxta8", - bitrate="128k") - io, data = dl.download() - with open("outex.m4a", "wb") as of: - of.write(data) - print(mutagen.File(io).pprint()) - io.seek(0) - return io, data - - def test_normalization(self): - print(f"Running test: test_normalization in {__file__}") - io, data = self.test_download() - ctr = FFmpegM4A(data, "filename") - assert ctr.get_volume() == 0.0 - - -if __name__ == '__main__': - unittest.main() diff --git a/YouTubeMDBot/tests/multiprocess_tests.py b/YouTubeMDBot/tests/multiprocess_tests.py deleted file mode 100644 index b4c7176..0000000 --- a/YouTubeMDBot/tests/multiprocess_tests.py +++ /dev/null @@ -1,62 +0,0 @@ -# YouTubeMDBot -# Copyright (C) 2019 - Javinator9889 -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -import os - -from .. import ThreadPoolBase - - -class MPTest(ThreadPoolBase): - pass - - -def main(): - from time import time - from time import sleep - from random import random - - test = MPTest(max_processes=4) - startt = time() - print(f"Test created - start time: {startt:.3f}s") - - def pinfo(x): - print(f"Process #{x} - executing at {(time() - startt):.3f}s") - t = (random() * 10) + 1 - print(f"Process #{x} waiting {t:.3f}s") - st = time() - sleep(t) - print(f"Process #{x} wakes-up after {(time() - st):.3f}s and finishes") - print(f"Thread {os.getpid()} finished!") - return - - print(f"Main PID: {os.getpid()}") - - for i in range(20): - # print(f"Giving new function {i}") - f = pinfo - test.execute(f, i) - - # while not test.waiting_processes.empty(): - # print(" ", end="\r") - # print(f"Threads: {threading.active_count() - 2}", end="\r") - # sleep(0.1) - # del test - # test.finished = True - del test - print(f"Main finished: {os.getpid()}") - return - - -main() diff --git a/YouTubeMDBot/tests/song_search.py b/YouTubeMDBot/tests/song_search.py deleted file mode 100755 index 492be3d..0000000 --- a/YouTubeMDBot/tests/song_search.py +++ /dev/null @@ -1,26 +0,0 @@ -import unittest - -from YouTubeMDBot.api import YouTubeAPI -from YouTubeMDBot.api import YouTubeVideoData - - -class TestSearch(unittest.TestCase): - def test_search(self): - print(f"Running test: test_search in {__file__}") - s = YouTubeAPI() - search: dict = s.search(term="test") - data = YouTubeVideoData(data=search, ignore_errors=True) - print("Title: {0}\n" - "Artist: {1}\n" - "Thumbnail: {2}\n" - "Duration: {3}\n" - "Views: {4}\n" - "Likes: {5}\n" - "Dislikes: {6}\n" - "Id: {7}".format(data.title, data.artist, data.thumbnail, - data.duration, data.views, data.likes, - data.dislikes, data.id)) - - -if __name__ == '__main__': - unittest.main() diff --git a/YouTubeMDBot/tests/tagger.py b/YouTubeMDBot/tests/tagger.py deleted file mode 100755 index ba97606..0000000 --- a/YouTubeMDBot/tests/tagger.py +++ /dev/null @@ -1,45 +0,0 @@ -import unittest -from io import BytesIO -from typing import Tuple - -import mutagen - -from YouTubeMDBot.metadata import AudioMetadata -from YouTubeMDBot.tests.identifier import IdentifierTest -from YouTubeMDBot.utils import youtube_utils - - -"""class TaggerTest(IdentifierTest): - - def find_metadata(self, future, downloader) -> Tuple[BytesIO, bytes, dict]: - print(f"Running test: find_metadata in {__file__}") - io, data, song_info = super().find_metadata(future, downloader) - tagger = AudioMetadata(io) - url = downloader.url - - tagger.set_title(song_info[url]["title"]) - tagger.set_artist(song_info[url]["artist"]) - tagger.set_cover(song_info[url]["cover"]) - extra = ["YouTube URL: " + url] - if not song_info[url].get("youtube_data"): - tagger.set_album(song_info[url]["album"]) - extra.append("MusicBrainz Record ID: " + song_info[url][ - "record_id"]) - extra.append("MusicBrainz Release ID: " + song_info[url][ - "release_id"]) - tagger.set_extras(extra) - else: - tagger.set_extras(["YouTube ID: {}".format(song_info[url]["id"])]) - yid = youtube_utils.get_yt_video_id(url) - rs = tagger.save() - rs.seek(0) - print(mutagen.File(rs).pprint()) - rs.seek(0) - with open(yid + ".m4a", "wb") as f: - f.write(rs.read()) - rs.seek(0) - return rs, rs.read(), song_info - - -if __name__ == '__main__': - unittest.main()"""