Commit 07ce5352 authored by Javinator9889's avatar Javinator9889 🎼

Possible metadata method - need to be tested with concurrent video downlaod

parent 919fe624
Pipeline #65 failed with stage
in 1 minute and 18 seconds
......@@ -102,3 +102,6 @@ venv.bak/
# mypy
.mypy_cache/
# keys folder
keys/
......@@ -18,6 +18,7 @@ cache:
before_script:
- python -V # Print out python version for debugging
- apt install libchromaprint-tools
test:pylint:
script:
......
......@@ -13,4 +13,6 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from ..audio.audio_utils import AudioUtils
from ..audio.ffmpeg import FFmpegOpener
from ..audio.ffmpeg import ffmpeg_available
from ..audio.fpcalc import FPCalc
# YouTubeMDBot
# Copyright (C) 2019 - Javinator9889
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from io import BytesIO
from subprocess import PIPE
from subprocess import Popen
def ffmpeg_available() -> bool:
try:
proc = Popen(["ffmpeg", "-version"],
stdout=PIPE,
stderr=PIPE)
except OSError:
return False
else:
proc.wait()
return proc.returncode == 0
class FFmpegOpener(object):
def __init__(self, data: bytes):
io = BytesIO(data)
self.__ffmpeg_proc = Popen(["ffmpeg", "-i", "-", "-f", "s16le", "-"],
stdout=PIPE, stderr=PIPE, stdin=io)
self.__out = None
self.__err = None
def open(self) -> int:
self.__out, self.__err = self.__ffmpeg_proc.communicate()
return self.__ffmpeg_proc.returncode
def get_output(self) -> bytes:
return self.__out
def get_extra(self) -> bytes:
return self.__err
# YouTubeMDBot
# Copyright (C) 2019 - Javinator9889
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import re
from subprocess import PIPE
from subprocess import Popen
from ..constants import FPCALC
def is_fpcalc_available() -> bool:
try:
proc = Popen(["fpcalc", "-v"], stdout=PIPE, stderr=PIPE)
except OSError:
return False
else:
proc.wait()
class FPCalc(object):
def __init__(self, audio: bytes):
fpcalc = Popen(FPCALC, stdout=PIPE, stdin=PIPE)
out, _ = fpcalc.communicate(audio)
res = out.decode("utf-8")
duration_pattern = "[^=]\\d+\\n"
fingerprint_pattern = "[^=]*$"
duration = re.search(duration_pattern, res)
fingerprint = re.search(fingerprint_pattern, res)
self.__duration: int = int(duration.group(0))
self.__fp: str = str(fingerprint.group(0))
def duration(self) -> int:
return self.__duration
def fingerprint(self) -> str:
return self.__fp
......@@ -13,4 +13,6 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from ..constants.app_constants import ydl_cli_options
from ..constants.app_constants import ACOUSTID_KEY
from ..constants.app_constants import FPCALC
from ..constants.app_constants import YDL_CLI_OPTIONS
......@@ -13,5 +13,9 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
ydl_cli_options = ["youtube-dl", "--format", "bestaudio[ext=m4a]", "--quiet", "--output",
import os
YDL_CLI_OPTIONS = ["youtube-dl", "--format", "bestaudio[ext=m4a]", "--quiet", "--output",
"-"]
FPCALC = ["fpcalc", "-"]
ACOUSTID_KEY = os.environ["ACOUSTID_KEY"]
......@@ -16,13 +16,13 @@
from io import BytesIO
from typing import Tuple
from ..constants.app_constants import ydl_cli_options
from ..constants.app_constants import YDL_CLI_OPTIONS
class YouTubeDownloader(object):
def __init__(self, url: str):
self.__url: str = url
self.__options: list = ydl_cli_options.copy()
self.__options: list = YDL_CLI_OPTIONS.copy()
self.__options.append(self.__url)
def download(self) -> Tuple[BytesIO, bytes]:
......@@ -37,7 +37,7 @@ class YouTubeDownloader(object):
return BytesIO(stdout), stdout
else:
raise RuntimeError("youtube-dl downloader exception - more info: " +
str(stderr))
str(stderr.decode("utf-8")))
def get_url(self) -> str:
return self.__url
......@@ -13,26 +13,3 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from io import BytesIO
import soundfile
class AudioUtils(object):
def __init__(self, audio: BytesIO):
self.__audio = soundfile.SoundFile(audio)
def get_audio_samplerate(self) -> int:
return self.__audio.samplerate
def get_audio_channels(self) -> int:
return self.__audio.channels
def get_audio_duration(self) -> float:
return self.__audio.frames / self.get_audio_samplerate()
def get_audio_name(self) -> str:
return self.__audio.name
def get_audio_format(self) -> str:
return self.__audio.format
......@@ -13,24 +13,73 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from io import BytesIO
import acoustid
import musicbrainzngs
try:
import ujson as json
except ImportError:
import json
from .. import AudioUtils
from ..audio import FPCalc
from ..constants import ACOUSTID_KEY
class MetadataIdentifier(object):
def __init__(self, audio: BytesIO, raw: bytes):
self.__audio = raw
self.__audio_info = AudioUtils(audio)
def _calculate_fingerprint(self) -> bytes:
return acoustid.fingerprint(self.__audio_info.get_audio_samplerate(),
self.__audio_info.get_audio_channels(),
iter(self.__audio))
def identify_audio(self) -> list:
fingerprint = self._calculate_fingerprint()
return acoustid.lookup(None, fingerprint,
self.__audio_info.get_audio_duration())
def __init__(self, audio: bytes):
self.__fingerprint = FPCalc(audio)
self.__result: json = None
self.__artist: str = ""
self.__title: str = ""
self.__release_id: str = ""
self.__recording_id: str = ""
self.__score: float = 0.0
self.__cover: bytes = bytes(0)
def identify_audio(self) -> json:
data: json = acoustid.lookup(apikey=ACOUSTID_KEY,
fingerprint=self.__fingerprint.fingerprint(),
duration=self.__fingerprint.duration(),
meta="recordings releaseids")
self.__result = data
if data["status"] == "ok" and "results" in data:
result = data["results"][0]
score = result["score"]
recording = result["recordings"][0]
if recording.get("artists"):
names = [artist["name"] for artist in recording["artists"]]
artist_name = "; ".join(names)
else:
artist_name = None
title = recording.get("title")
release_id = recording["releases"][0]["id"]
recording_id = recording.get("id")
self.__score = score
self.__title = title
self.__recording_id = recording_id
self.__release_id = release_id
self.__artist = artist_name
self.__cover = musicbrainzngs.get_image_front(release_id)
return data
def get_title(self) -> str:
return self.__title
def get_score(self) -> float:
return self.__score
def get_artist(self) -> str:
return self.__artist
def get_recording_id(self) -> str:
return self.__recording_id
def get_release_id(self) -> str:
return self.__release_id
def get_cover(self) -> bytes:
return self.__cover
def get_results(self) -> json:
return self.__result
......@@ -13,3 +13,4 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from ..metadata.MetadataIdentifier import MetadataIdentifier
SoundFile
musicbrainzngs
ujson
youtube_dl
pyacoustid
python-telegram-bot
import unittest
from pprint import pprint
from YouTubeMDBot.downloader import YouTubeDownloader
from YouTubeMDBot.metadata import MetadataIdentifier
class IdentifierTest(unittest.TestCase):
def test_identification(self):
url = "https://www.youtube.com/watch?v=YQHsXMglC9A"
downloader = YouTubeDownloader(url=url)
audio, data = downloader.download()
with open("hello.m4a", "wb") as song:
song.write(data)
identifier = MetadataIdentifier(audio=data)
results = identifier.identify_audio()
print("{0} by {1} - score: {2} / 1\n"
"\thttps://musicbrainz.org/recording/{3}\n"
"\thttps://musicbrainz.org/release/{4}\n\n"
.format(identifier.get_title(), identifier.get_artist(),
identifier.get_score(),
identifier.get_recording_id(), identifier.get_release_id()))
with open("cover.jpg", "wb") as cover:
cover.write(identifier.get_cover())
pprint(results)
if __name__ == '__main__':
unittest.main()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment