Commit a4da41af authored by Javinator9889's avatar Javinator9889 🎼

Metadata for downloaded audio - milestone #3

parent 08aaff2a
Pipeline #78 failed with stage
in 11 minutes and 39 seconds
File mode changed from 100644 to 100755
File mode changed from 100644 to 100755
File mode changed from 100644 to 100755
File mode changed from 100644 to 100755
File mode changed from 100644 to 100755
File mode changed from 100644 to 100755
File mode changed from 100644 to 100755
File mode changed from 100644 to 100755
File mode changed from 100644 to 100755
File mode changed from 100644 to 100755
File mode changed from 100644 to 100755
File mode changed from 100644 to 100755
File mode changed from 100644 to 100755
......@@ -32,14 +32,14 @@ def ffmpeg_available() -> bool:
class FFmpegOpener(object):
def __init__(self, data: bytes):
io = BytesIO(data)
self._data = data
self.__ffmpeg_proc = Popen(["ffmpeg", "-i", "-", "-f", "s16le", "-"],
stdout=PIPE, stderr=PIPE, stdin=io)
stdout=PIPE, stderr=PIPE, stdin=PIPE)
self.__out = None
self.__err = None
def open(self) -> int:
self.__out, self.__err = self.__ffmpeg_proc.communicate()
self.__out, self.__err = self.__ffmpeg_proc.communicate(self._data)
return self.__ffmpeg_proc.returncode
def get_output(self) -> bytes:
......
# YouTubeMDBot
# Copyright (C) 2019 - Javinator9889
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from mutagen.mp4 import MP4
from mutagen.mp4 import MP4Cover
from io import BytesIO
class AudioMetadata:
def __init__(self, audio: BytesIO):
self._audio = MP4(audio)
self._data = audio
def set_title(self, title: str):
self._audio[u"\xa9nam"] = title
def set_artist(self, artist: str):
self._audio[u"\xa9ART"] = artist
def set_album(self, album: str):
self._audio[u"\xa9alb"] = album
def set_extras(self, extras: list):
self._audio[u"\xa9cmt"] = '; '.join(map(str, extras))
def set_cover(self, cover: bytes):
mp4_cover = MP4Cover(cover, MP4Cover.FORMAT_JPEG)
self._audio[u"covr"] = [mp4_cover]
def save(self) -> BytesIO:
self._data.seek(0)
self._audio.save(self._data)
return self._data
......@@ -38,10 +38,10 @@ class MetadataIdentifier(object):
self.recording_id: str = ""
self.score: float = 0.0
self.cover: bytes = bytes(0)
self.album: str = ""
self.duration: int = 0
self.youtube_data: bool = False
self.youtube_id: str = ""
# self._downloader = downloader
@staticmethod
def _is_valid_result(data: json) -> bool:
......@@ -62,7 +62,7 @@ class MetadataIdentifier(object):
data: json = acoustid.lookup(apikey=ACOUSTID_KEY,
fingerprint=fingerprint.fingerprint(),
duration=fingerprint.duration(),
meta="recordings releaseids")
meta="recordings releaseids releasegroups")
self.result = data
is_valid = self._is_valid_result(data)
if is_valid:
......@@ -77,24 +77,16 @@ class MetadataIdentifier(object):
else:
self.artist = "Unknown"
self.title = recording["title"]
self.release_id = recording["releases"][0]["id"]
if recording.get("releasegroups"):
self.release_id = \
recording["releasegroups"][0]["releases"][0]["id"]
self.album = recording["releasegroups"][0]["title"]
self.cover = musicbrainzngs.get_image_front(self.release_id)
self.recording_id = recording["id"]
self.duration = recording["duration"]
self.cover = musicbrainzngs.get_image_front(self.release_id)
is_valid = True
break
break
# elif self._downloader:
# from urllib.request import urlopen
#
# video_id = youtube_utils.get_yt_video_id(self._downloader.get_url())
# video_data = YouTubeAPI.video_details(video_id)
# self.title = video_data.title
# self.artist = video_data.artist
# self.duration = video_data.duration
# self.cover = urlopen(video_data.thumbnail).read()
# self.youtube_id = video_data.id
# self.youtube_data = True
return is_valid
......
......@@ -15,3 +15,5 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from ..metadata.MetadataIdentifier import MetadataIdentifier
from ..metadata.MetadataIdentifier import YouTubeMetadataIdentifier
from ..metadata.AudioMetadata import AudioMetadata
......@@ -3,6 +3,8 @@ import unittest
from pprint import pprint
from time import sleep
from time import time
from typing import Tuple
from io import BytesIO
from YouTubeMDBot.downloader import YouTubeDownloader
from YouTubeMDBot.metadata import YouTubeMetadataIdentifier
......@@ -38,23 +40,30 @@ class IdentifierTest(unittest.TestCase):
yt2 = YouTubeDownloader(url="https://www.youtube.com/watch?v=-_ZwpOdXXcA")
yt3 = YouTubeDownloader(url="https://www.youtube.com/watch?v=WOGWZD5iT10")
yt4 = YouTubeDownloader(url="https://www.youtube.com/watch?v=GfKV9KaNJXc")
yt5 = YouTubeDownloader(url="https://www.youtube.com/watch?v=DiItGE3eAyQ")
yt6 = YouTubeDownloader(url="https://www.youtube.com/watch?v=GuZzuQvv7uc")
t1 = threading.Thread(target=self.find_metadata, args=(yt1,))
t2 = threading.Thread(target=self.find_metadata, args=(yt2,))
t3 = threading.Thread(target=self.find_metadata, args=(yt3,))
t4 = threading.Thread(target=self.find_metadata, args=(yt4,))
t5 = threading.Thread(target=self.find_metadata, args=(yt5,))
t6 = threading.Thread(target=self.find_metadata, args=(yt6,))
self.max = 4
self.max = 6
t1.start()
t2.start()
t3.start()
t4.start()
t5.start()
t6.start()
while self.threads < self.max:
sleep(1)
pprint(self.song_info)
# pprint(self.song_info)
pprint("Finished")
def barrier(self):
with self.lock:
......@@ -64,9 +73,9 @@ class IdentifierTest(unittest.TestCase):
with self.lock:
return self.threads
def find_metadata(self, downloader: YouTubeDownloader):
def find_metadata(self, downloader: YouTubeDownloader) -> Tuple[BytesIO, bytes]:
st_dl_t = time()
_, data = downloader.download()
io, data = downloader.download()
f_dl_t = time()
print("Downloaded {} - elapsed time: {:.1f}s".format(downloader.get_url(),
f_dl_t - st_dl_t))
......@@ -75,7 +84,8 @@ class IdentifierTest(unittest.TestCase):
assert valid
self.song_info[downloader.get_url()] = {
"title": identifier.title,
"artist": identifier.artist
"artist": identifier.artist,
"cover": identifier.cover
}
if not identifier.youtube_data:
self.song_info[downloader.get_url()]["score"] = identifier.score
......@@ -83,11 +93,13 @@ class IdentifierTest(unittest.TestCase):
"https://musicbrainz.org/recording/{0}".format(identifier.recording_id)
self.song_info[downloader.get_url()]["release_id"] = \
"https://musicbrainz.org/release/{0}".format(identifier.release_id)
self.song_info[downloader.get_url()]["album"] = identifier.album
else:
self.song_info[downloader.get_url()]["duration"] = identifier.duration
self.song_info[downloader.get_url()]["id"] = identifier.youtube_id
self.song_info[downloader.get_url()]["youtube_data"] = True
self.barrier()
return io, data
if __name__ == '__main__':
......
import unittest
import mutagen
from typing import Tuple
from io import BytesIO
from YouTubeMDBot.tests.identifier import IdentifierTest
from YouTubeMDBot.downloader import YouTubeDownloader
from YouTubeMDBot.metadata import AudioMetadata
from YouTubeMDBot.utils import youtube_utils
class TaggerTest(IdentifierTest):
def find_metadata(self, downloader: YouTubeDownloader) -> Tuple[BytesIO, bytes]:
io, data = super().find_metadata(downloader)
tagger = AudioMetadata(io)
url = downloader.get_url()
tagger.set_title(super().song_info[url]["title"])
tagger.set_artist(super().song_info[url]["artist"])
tagger.set_cover(super().song_info[url]["cover"])
extra = ["YouTube URL: " + url]
if not super().song_info[url].get("youtube_data"):
tagger.set_album(super().song_info[url]["album"])
extra.append("MusicBrainz Record ID: " + super().song_info[url][
"record_id"])
extra.append("MusicBrainz Release ID: " + super().song_info[url][
"release_id"])
tagger.set_extras(extra)
else:
tagger.set_extras(["YouTube ID: {}".format(super().song_info[url]["id"])])
yid = youtube_utils.get_yt_video_id(url)
rs = tagger.save()
rs.seek(0)
print(mutagen.File(rs).pprint())
rs.seek(0)
with open(yid + ".m4a", "wb") as f:
f.write(rs.read())
rs.seek(0)
return rs, rs.read()
if __name__ == '__main__':
unittest.main()
# YouTubeMDBot
# Copyright (C) 2019 - Javinator9889
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import signal
class timeout:
def __init__(self, secs: int):
self.__secs = secs
def _raise_timeout(self, signum, frame):
raise TimeoutError("Function timeout! - {0}s".format(self.__secs))
def __enter__(self):
if self.__secs <= 0:
self._raise_timeout(0, 0)
signal.signal(signal.SIGALRM, self._raise_timeout)
signal.alarm(self.__secs)
yield
def __exit__(self, exc_type, exc_val, exc_tb):
signal.signal(signal.SIGALRM, signal.SIG_IGN)
return exc_val is not None
'''@contextmanager
def timeout(secs: int):
def raise_timeout(signum=None, frame=None):
raise TimeoutError("Function timeout! - {0}s".format(secs))
if secs <= 0:
secs = 0
raise_timeout()
signal.signal(signalnum=signal.SIGALRM, handler=raise_timeout)
signal.alarm(secs)
try:
yield
except TimeoutError:
pass
finally:
signal.signal(signalnum=signal.SIGALRM, handler=signal.SIG_IGN)'''
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment