/
identifier.py
executable file
·133 lines (112 loc) · 4.77 KB
/
identifier.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
import threading
import unittest
from io import BytesIO
from pprint import pprint
from time import sleep
from time import time
from typing import Tuple
from YouTubeMDBot.downloader import MultipleYouTubeDownloader
from YouTubeMDBot.downloader import YouTubeDownloader
from YouTubeMDBot.metadata import YouTubeMetadataIdentifier
class IdentifierTest(unittest.TestCase):
lock = threading.Lock()
threads = 0
max = 0
song_info = {}
def test_identification(self):
url = "https://www.youtube.com/watch?v=YQHsXMglC9A"
downloader = YouTubeDownloader(url=url)
audio, data = downloader.download()
with open("hello.m4a", "wb") as song:
song.write(data)
identifier = YouTubeMetadataIdentifier(audio=data,
downloader=downloader)
valid = identifier.identify_audio()
assert valid
print("{0} by {1} - score: {2} / 1\n"
"\thttps://musicbrainz.org/recording/{3}\n"
"\thttps://musicbrainz.org/release/{4}\n\n"
.format(identifier.title, identifier.artist,
identifier.score,
identifier.recording_id, identifier.release_id))
with open("cover.jpg", "wb") as cover:
cover.write(identifier.cover)
def test_multiple_download_identification(self):
yt1 = YouTubeDownloader(
url="https://www.youtube.com/watch?v=Inm-N5rLUSI")
yt2 = YouTubeDownloader(
url="https://www.youtube.com/watch?v=-_ZwpOdXXcA")
yt3 = YouTubeDownloader(
url="https://www.youtube.com/watch?v=WOGWZD5iT10")
yt4 = YouTubeDownloader(
url="https://www.youtube.com/watch?v=GfKV9KaNJXc")
yt5 = YouTubeDownloader(
url="https://www.youtube.com/watch?v=DiItGE3eAyQ")
yt6 = YouTubeDownloader(
url="https://www.youtube.com/watch?v=GuZzuQvv7uc")
ytdl = MultipleYouTubeDownloader()
f1 = ytdl.download_async(yt1, error_callback=handle_error)
f2 = ytdl.download_async(yt2, error_callback=handle_error)
f3 = ytdl.download_async(yt3, error_callback=handle_error)
f4 = ytdl.download_async(yt4, error_callback=handle_error)
f5 = ytdl.download_async(yt5, error_callback=handle_error)
f6 = ytdl.download_async(yt6, error_callback=handle_error)
t1 = threading.Thread(target=self.find_metadata, args=(f1, yt1,))
t2 = threading.Thread(target=self.find_metadata, args=(f2, yt2,))
t3 = threading.Thread(target=self.find_metadata, args=(f3, yt3,))
t4 = threading.Thread(target=self.find_metadata, args=(f4, yt4,))
t5 = threading.Thread(target=self.find_metadata, args=(f5, yt5,))
t6 = threading.Thread(target=self.find_metadata, args=(f6, yt6,))
self.max = 6
t1.start()
t2.start()
t3.start()
t4.start()
t5.start()
t6.start()
while self.threads < self.max:
sleep(1)
pprint("Finished")
del ytdl
def barrier(self):
with self.lock:
self.threads += 1
def getThreads(self):
with self.lock:
return self.threads
def find_metadata(self, future, downloader) -> Tuple[BytesIO, bytes, dict]:
st_dl_t = time()
io, data = future.get()
f_dl_t = time()
print("Downloaded {} - elapsed time: {:.1f}s"
.format(downloader.get_url(), f_dl_t - st_dl_t))
identifier = \
YouTubeMetadataIdentifier(audio=data, downloader=downloader)
valid = identifier.identify_audio()
assert valid
song_info = {downloader.get_url(): {
"title": identifier.title,
"artist": identifier.artist,
"cover": identifier.cover
}}
if not identifier.youtube_data:
song_info[downloader.get_url()]["score"] = identifier.score
song_info[downloader.get_url()]["record_id"] = \
"https://musicbrainz.org/recording/{0}".format(
identifier.recording_id)
song_info[downloader.get_url()]["release_id"] = \
"https://musicbrainz.org/release/{0}".format(
identifier.release_id)
song_info[downloader.get_url()]["album"] = identifier.album
else:
song_info[downloader.get_url()][
"duration"] = identifier.duration
song_info[downloader.get_url()]["id"] = identifier.youtube_id
song_info[downloader.get_url()]["youtube_data"] = True
self.barrier()
return io, data, song_info
def handle_error(exception):
raise RuntimeError("Catch exception while running a thread: "
+ str(exception))
if __name__ == '__main__':
unittest.main()