Commit 245095e7 authored by Javinator9889's avatar Javinator9889 🎼

Merge branch 'milestone_2' into 'development'

Milestone #2

See merge request !4
parents 8c9fab79 285525d3
Pipeline #76 passed with stage
in 11 minutes and 54 seconds
......@@ -102,3 +102,6 @@ venv.bak/
# mypy
.mypy_cache/
# keys folder
keys/
......@@ -18,6 +18,9 @@ cache:
before_script:
- python -V # Print out python version for debugging
- apt update
- apt install -y libchromaprint-tools --install-recommends
- pip install -r YouTubeMDBot/requirements.txt
test:pylint:
script:
......@@ -26,5 +29,4 @@ test:pylint:
test:
script:
- pip install -r YouTubeMDBot/requirements.txt
- python -m unittest $(pwd)/YouTubeMDBot/tests/*.py
......@@ -13,11 +13,3 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from .bot import PROGRAM_ARGS
from .bot import main
from .logging_utils import LoggingHandler
from .logging_utils import setup_logging
from .decorators import send_action
from .decorators import restricted
# YouTubeMDBot
# Copyright (C) 2019 - Javinator9889
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from ..api.youtube_api import YouTubeAPI
from ..api.youtube_api import YouTubeVideoData
# YouTubeMDBot
# Copyright (C) 2019 - Javinator9889
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from isodate import parse_duration
from ..constants import YOUTUBE
from ..errors import EmptyBodyError
class YouTubeVideoData(object):
def __init__(self, data: dict, ignore_errors: bool = False):
if not data.get("items"):
raise EmptyBodyError("The data object has no items")
self.id: str = ""
self.title: str = ""
self.thumbnail: str = ""
self.artist: str = ""
self.duration: float = 0.0
self.views: int = 0
self.likes: int = 0
self.dislikes: int = 0
if len(data.get("items")) >= 1:
content = data.get("items")[0]
snippet = content.get("snippet")
details = content.get("contentDetails")
statistics = content.get("statistics")
if not snippet and not ignore_errors:
raise EmptyBodyError("No information available to requested video")
elif not snippet and ignore_errors:
snippet_available = False
else:
snippet_available = True
if not details and not ignore_errors:
raise EmptyBodyError("No video details available")
elif not details and ignore_errors:
details_available = False
else:
details_available = True
if not statistics and not ignore_errors:
raise EmptyBodyError("No statistics available")
elif not statistics and ignore_errors:
statistics_available = False
else:
statistics_available = True
c_id = content.get("id", "")
self.id = c_id.get("videoId", "") if isinstance(c_id, dict) else c_id
if snippet_available:
self.title = snippet["title"]
try:
self.thumbnail = snippet["thumbnails"]["maxres"]["url"]
except KeyError:
try:
self.thumbnail = snippet["thumbnails"]["high"]["url"]
except KeyError:
try:
self.thumbnail = snippet["thumbnails"]["medium"]["url"]
except KeyError:
self.thumbnail = snippet["thumbnails"]["default"]["url"]
self.artist = snippet["channelTitle"]
if details_available:
self.duration = parse_duration(details["duration"]).total_seconds()
if statistics_available:
self.views = int(statistics["viewCount"])
self.likes = int(statistics["likeCount"])
self.dislikes = int(statistics["dislikeCount"])
class YouTubeAPI(object):
def __init__(self):
from googleapiclient.discovery import build
self.__youtube = build(serviceName=YOUTUBE["api"]["name"],
version=YOUTUBE["api"]["version"],
developerKey=YOUTUBE["key"])
def search(self, term: str):
return self.__youtube.search().list(
q=term,
type="video",
part="id,snippet",
maxResults=1
).execute()
@staticmethod
def video_details(video_id: str) -> YouTubeVideoData:
try:
import ujson as json
except ImportError:
import json
from urllib.request import urlopen
api_url = YOUTUBE["endpoint"].format(video_id, YOUTUBE["key"])
data = urlopen(url=api_url)
return YouTubeVideoData(data=json.loads(data.read()))
# YouTubeMDBot
# Copyright (C) 2019 - Javinator9889
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from ..audio.ffmpeg import FFmpegOpener
from ..audio.ffmpeg import ffmpeg_available
from ..audio.fpcalc import FPCalc
# YouTubeMDBot
# Copyright (C) 2019 - Javinator9889
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from io import BytesIO
from subprocess import PIPE
from subprocess import Popen
def ffmpeg_available() -> bool:
try:
proc = Popen(["ffmpeg", "-version"],
stdout=PIPE,
stderr=PIPE)
except OSError:
return False
else:
proc.wait()
return proc.returncode == 0
class FFmpegOpener(object):
def __init__(self, data: bytes):
io = BytesIO(data)
self.__ffmpeg_proc = Popen(["ffmpeg", "-i", "-", "-f", "s16le", "-"],
stdout=PIPE, stderr=PIPE, stdin=io)
self.__out = None
self.__err = None
def open(self) -> int:
self.__out, self.__err = self.__ffmpeg_proc.communicate()
return self.__ffmpeg_proc.returncode
def get_output(self) -> bytes:
return self.__out
def get_extra(self) -> bytes:
return self.__err
# YouTubeMDBot
# Copyright (C) 2019 - Javinator9889
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import re
from subprocess import PIPE
from subprocess import Popen
from ..constants import FPCALC
def is_fpcalc_available() -> bool:
try:
proc = Popen(["fpcalc", "-v"], stdout=PIPE, stderr=PIPE)
except OSError:
return False
else:
proc.wait()
class FPCalc(object):
def __init__(self, audio: bytes):
fpcalc = Popen(FPCALC, stdout=PIPE, stdin=PIPE)
out, _ = fpcalc.communicate(audio)
res = out.decode("utf-8")
duration_pattern = "[^=]\\d+\\n"
fingerprint_pattern = "[^=]*$"
duration = re.search(duration_pattern, res)
fingerprint = re.search(fingerprint_pattern, res)
self.__duration: int = int(duration.group(0))
self.__fp: str = str(fingerprint.group(0))
def duration(self) -> int:
return self.__duration
def fingerprint(self) -> str:
return self.__fp
......@@ -13,7 +13,6 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from .. import LoggingHandler
class StartHandler(object):
......@@ -21,4 +20,5 @@ class StartHandler(object):
self._user_data = {}
def start(self, bot, update):
self._user_data[]
pass
# TODO
......@@ -13,4 +13,7 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from ..constants.app_constants import ydl_cli_options
from ..constants.app_constants import ACOUSTID_KEY
from ..constants.app_constants import FPCALC
from ..constants.app_constants import YDL_CLI_OPTIONS
from ..constants.app_constants import YOUTUBE
......@@ -13,5 +13,24 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
ydl_cli_options = ["youtube-dl", "--format", "bestaudio[ext=m4a]", "--quiet", "--output",
import os
# YouTube DL options
YDL_CLI_OPTIONS = ["youtube-dl", "--format", "bestaudio[ext=m4a]", "--quiet", "--output",
"-"]
# FPCalc command
FPCALC = ["fpcalc", "-"]
# API keys
ACOUSTID_KEY = os.environ["ACOUSTID_KEY"]
YOUTUBE = {
"key": os.environ["YOUTUBE_KEY"],
"api": {
"name": "youtube",
"version": "v3"
},
"endpoint":
"https://www.googleapis.com/youtube/v3/videos?"
"part=id,snippet,contentDetails,statistics&id={0}&key={1}"
}
......@@ -48,7 +48,6 @@ def restricted(func):
def wrapped(update, context, *args, **kwargs):
user_id = update.effective_user.id
if user_id not in PROGRAM_ARGS["admin"]:
logging.warning("Unauthorized access denied for {}.".format(user_id))
return
return func(update, context, *args, **kwargs)
return wrapped
......@@ -16,13 +16,13 @@
from io import BytesIO
from typing import Tuple
from ..constants.app_constants import ydl_cli_options
from ..constants.app_constants import YDL_CLI_OPTIONS
class YouTubeDownloader(object):
def __init__(self, url: str):
self.__url: str = url
self.__options: list = ydl_cli_options.copy()
self.__options: list = YDL_CLI_OPTIONS.copy()
self.__options.append(self.__url)
def download(self) -> Tuple[BytesIO, bytes]:
......@@ -37,7 +37,7 @@ class YouTubeDownloader(object):
return BytesIO(stdout), stdout
else:
raise RuntimeError("youtube-dl downloader exception - more info: " +
str(stderr))
str(stderr.decode("utf-8")))
def get_url(self) -> str:
return self.__url
# YouTubeMDBot
# Copyright (C) 2019 - Javinator9889
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
class EmptyBodyError(Exception):
pass
# YouTubeMDBot
# Copyright (C) 2019 - Javinator9889
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
class InvalidCredentialsError(Exception):
pass
# YouTubeMDBot
# Copyright (C) 2019 - Javinator9889
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
class NoMatchError(Exception):
"""Raises an error when there is no match available"""
pass
# YouTubeMDBot
# Copyright (C) 2019 - Javinator9889
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from ..errors.EmptyBodyError import EmptyBodyError
from ..errors.NoMatchError import NoMatchError
......@@ -14,7 +14,83 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import acoustid
import musicbrainzngs
try:
import ujson as json
except ImportError:
import json
from ..audio import FPCalc
from ..api import YouTubeAPI
from ..utils import youtube_utils
from ..constants import ACOUSTID_KEY
from ..downloader import YouTubeDownloader
class MetadataIdentifier(object):
def __init__(self, filename: str = None, audio: str = None):
def __init__(self, audio: bytes, downloader: YouTubeDownloader = None):
self.audio = audio
self.result: json = None
self.artist: str = ""
self.title: str = ""
self.release_id: str = ""
self.recording_id: str = ""
self.score: float = 0.0
self.cover: bytes = bytes(0)
self.duration: int = 0
self.youtube_data: bool = False
self.youtube_id: str = ""
self._downloader = downloader
@staticmethod
def _is_valid_result(data: json) -> bool:
if "results" not in data:
return False
elif data["status"] != "ok":
return False
elif len(data["results"]) == 0:
return False
else:
if "recordings" not in data["results"][0]:
return False
else:
return True
def identify_audio(self) -> json:
fingerprint = FPCalc(self.audio)
data: json = acoustid.lookup(apikey=ACOUSTID_KEY,
fingerprint=fingerprint.fingerprint(),
duration=fingerprint.duration(),
meta="recordings releaseids")
self.result = data
if self._is_valid_result(data):
for result in data["results"]:
if "recordings" not in result:
break
self.score = result["score"]
for recording in result["recordings"]:
if recording.get("artists"):
names = [artist["name"] for artist in recording["artists"]]
self.artist = "; ".join(names)
else:
self.artist = "Unknown"
self.title = recording["title"]
self.release_id = recording["releases"][0]["id"]
self.recording_id = recording["id"]
self.duration = recording["duration"]
self.cover = musicbrainzngs.get_image_front(self.release_id)
break
break
elif self._downloader:
from urllib.request import urlopen
video_id = youtube_utils.get_yt_video_id(self._downloader.get_url())
video_data = YouTubeAPI.video_details(video_id)
self.title = video_data.title
self.artist = video_data.artist
self.duration = video_data.duration
self.cover = urlopen(video_data.thumbnail).read()
self.youtube_id = video_data.id
self.youtube_data = True
return data
......@@ -13,3 +13,4 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from ..metadata.MetadataIdentifier import MetadataIdentifier
isodate
google-api-python-client
musicbrainzngs
ujson
youtube_dl
pyacoustid
python-telegram-bot
import threading
import unittest
from pprint import pprint
from time import sleep
from time import time
from YouTubeMDBot.downloader import YouTubeDownloader
from YouTubeMDBot.metadata import MetadataIdentifier
class IdentifierTest(unittest.TestCase):
lock = threading.Lock()
threads = 0
max = 0
song_info = {}
def test_identification(self):
url = "https://www.youtube.com/watch?v=YQHsXMglC9A"
downloader = YouTubeDownloader(url=url)
audio, data = downloader.download()
with open("hello.m4a", "wb") as song:
song.write(data)
identifier = MetadataIdentifier(audio=data)
results = identifier.identify_audio()
print("{0} by {1} - score: {2} / 1\n"
"\thttps://musicbrainz.org/recording/{3}\n"
"\thttps://musicbrainz.org/release/{4}\n\n"
.format(identifier.title, identifier.artist,
identifier.score,
identifier.recording_id, identifier.release_id))
with open("cover.jpg", "wb") as cover:
cover.write(identifier.cover)
pprint(results)
def test_multiple_download_identification(self):
yt1 = YouTubeDownloader(url="https://www.youtube.com/watch?v=Inm-N5rLUSI")
yt2 = YouTubeDownloader(url="https://www.youtube.com/watch?v=-_ZwpOdXXcA")
yt3 = YouTubeDownloader(url="https://www.youtube.com/watch?v=WOGWZD5iT10")
yt4 = YouTubeDownloader(url="https://www.youtube.com/watch?v=GfKV9KaNJXc")
t1 = threading.Thread(target=self.find_metadata, args=(yt1,))
t2 = threading.Thread(target=self.find_metadata, args=(yt2,))
t3 = threading.Thread(target=self.find_metadata, args=(yt3,))
t4 = threading.Thread(target=self.find_metadata, args=(yt4,))
self.max = 4
t1.start()
t2.start()
t3.start()
t4.start()
while self.threads < self.max:
sleep(1)
pprint(self.song_info)
def barrier(self):
with self.lock:
self.threads += 1
def getThreads(self):
with self.lock:
return self.threads
def find_metadata(self, downloader: YouTubeDownloader):
st_dl_t = time()
_, data = downloader.download()
f_dl_t = time()
print("Downloaded {} - elapsed time: {:.1f}s".format(downloader.get_url(),
f_dl_t - st_dl_t))
identifier = MetadataIdentifier(audio=data, downloader=downloader)
identifier.identify_audio()
self.song_info[downloader.get_url()] = {
"title": identifier.title,
"artist": identifier.artist
}
if not identifier.youtube_data:
self.song_info[downloader.get_url()]["score"] = identifier.score
self.song_info[downloader.get_url()]["record_id"] = \
"https://musicbrainz.org/recording/{0}".format(identifier.recording_id)
self.song_info[downloader.get_url()]["release_id"] = \
"https://musicbrainz.org/release/{0}".format(identifier.release_id)
else:
self.song_info[downloader.get_url()]["duration"] = identifier.duration
self.song_info[downloader.get_url()]["id"] = identifier.youtube_id
self.song_info[downloader.get_url()]["youtube_data"] = True
self.barrier()
if __name__ == '__main__':
unittest.main()
import unittest
from YouTubeMDBot.api import YouTubeAPI
from YouTubeMDBot.api import YouTubeVideoData
class TestSearch(unittest.TestCase):
def test_search(self):
s = YouTubeAPI()
search: dict = s.search(term="test")
data = YouTubeVideoData(data=search, ignore_errors=True)
print("Title: {0}\n"
"Artist: {1}\n"
"Thumbnail: {2}\n"
"Duration: {3}\n"
"Views: {4}\n"
"Likes: {5}\n"
"Dislikes: {6}\n"
"Id: {7}".format(data.title, data.artist, data.thumbnail,
data.duration, data.views, data.likes,
data.dislikes, data.id))
if __name__ == '__main__':
unittest.main()
# YouTubeMDBot
# Copyright (C) 2019 - Javinator9889
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from ..utils import youtube_utils
# YouTubeMDBot
# Copyright (C) 2019 - Javinator9889
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
def get_yt_video_id(url: str) -> str:
# initial version: http://stackoverflow.com/a/7936523/617185 \
# by Mikhail Kashkin(http://stackoverflow.com/users/85739/mikhail-kashkin)
"""Returns Video_ID extracting from the given url of Youtube
Examples of URLs:
Valid:
'http://youtu.be/_lOT2p_FCvA',
'www.youtube.com/watch?v=_lOT2p_FCvA&feature=feedu',
'http://www.youtube.com/embed/_lOT2p_FCvA',
'http://www.youtube.com/v/_lOT2p_FCvA?version=3&amp;hl=en_US',
'https://www.youtube.com/watch?v=rTHlyTphWP0&index=6&list=PLjeDyYvG6-40qawYNR4juzvSOg-ezZ2a6',