diff --git a/beetsplug/mbsubmit.py b/beetsplug/mbsubmit.py index d215e616c1..f9fdcbacbf 100644 --- a/beetsplug/mbsubmit.py +++ b/beetsplug/mbsubmit.py @@ -18,31 +18,134 @@ parseable by the MusicBrainz track parser [1]. Programmatic submitting is not implemented by MusicBrainz yet. +The plugin also allows the user to open the tracks in MusicBrainz Picard [2]. + +Another option this plugin provides is to help with creating a new release +on MusicBrainz by seeding the MusicBrainz release editor [3]. This works in +the following way: + +- Host a small web server that serves a web page. When loaded by the user, + this page will automatically POST data to MusicBrainz as described in [3]. +- The same web server also listens for a callback from MusicBrainz, see + redirect_uri [3] and will try to import an album using the newly created + release. +- jwt tokens with random keys are used to prevent using this web server in + unintended ways. + +This feature is loosely based on how browser integration is implemented in +Picard [4]. + [1] https://wiki.musicbrainz.org/History:How_To_Parse_Track_Listings +[2] https://picard.musicbrainz.org/ +[3] https://musicbrainz.org/doc/Development/Seeding/Release_Editor +[4] https://github.com/metabrainz/picard/blob/master/picard/browser/browser.py """ - +import socket import subprocess +import threading +import time +import uuid +import webbrowser +from collections import defaultdict +from dataclasses import dataclass +from secrets import token_bytes +from typing import Callable, Dict, List, Optional + +import waitress +from flask import Flask, render_template, request +from jwt import InvalidTokenError +from werkzeug.exceptions import BadRequest -from beets import ui +from beets import autotag, ui from beets.autotag import Recommendation +from beets.importer import ImportSession, ImportTask +from beets.library import Item from beets.plugins import BeetsPlugin +from beets.ui import print_ from beets.ui.commands import PromptChoice from beets.util import displayable_path +from beets.util.pipeline import PipelineThread from beetsplug.info import print_data +try: + import jwt +except ImportError: + jwt = None + + +@dataclass +class CreateReleaseTask: + """ + Represents a task for creating a single release on MusicBrainz and its current + status. + """ + + formdata: Dict[str, str] + """ + Form data to be submitted to MB. + """ + + browser_opened: bool = False + """ + True when the user has opened the link for this task in the browser. + """ + + result_release_mbid: Optional[str] = None + """ + Contains the release ID returned by MusicBrainz after the release was created. + """ + + +def join_phrase(i, total): + if i < total - 2: + return ", " + elif i < total - 1: + return " & " + else: + return "" + class MBSubmitPlugin(BeetsPlugin): def __init__(self): super().__init__() + if jwt is None: + self._log.warn( + "Cannot import PyJWT, disabling 'Create release on musicbrainz' " + "functionality" + ) + self.config.add( { "format": "$track. $title - $artist ($length)", "threshold": "medium", "picard_path": "picard", + "create_release_server_hostname": "127.0.0.1", + "create_release_server_port": 0, + "create_release_method": "show_link", + "create_release_await_mbid": True, + "create_release_default_type": None, + "create_release_default_language": None, + "create_release_default_script": None, + "create_release_default_status": None, + "create_release_default_packaging": None, + "create_release_default_edit_note": None, } ) + self.create_release_server_hostname = self.config[ + "create_release_server_hostname" + ].as_str() + self.create_release_server_port = self.config[ + "create_release_server_port" + ].as_number() + self.create_release_method = self.config[ + "create_release_method" + ].as_choice(["open_browser", "show_link"]) + self.create_release_await_mbid = self.config[ + "create_release_await_mbid" + ].as_choice([True, False]) + # Validate and store threshold. self.threshold = self.config["threshold"].as_choice( { @@ -57,14 +160,250 @@ def __init__(self): "before_choose_candidate", self.before_choose_candidate_event ) - def before_choose_candidate_event(self, session, task): + self.flask_app = Flask(__name__, template_folder="mbsubmit/templates") + self.flask_app.add_url_rule( + "/add", "add", view_func=self._create_release_add + ) + self.flask_app.add_url_rule( + "/complete_add", + "complete_add", + view_func=self._create_release_complete_add, + ) + + self._server = None + self._server_port = None + self._jwt_key = token_bytes() + self._jwt_algorithm = "HS256" + + # When the user selects "Create release on musicbrainz", the data that is going + # to get POSTed to MusicBrainz is stored in this dict using a randomly + # generated key. The token in the URL opened by the user contains this key. The + # web server looks up the data in this dictionary using the key, and generates + # the page to be displayed. + self._create_release_tasks = dict() + + def _build_formdata(self, items: List[Item], redirect_uri: Optional[str]): + formdata = dict() + + labels = set() + album_artists = set() + + track_counter = defaultdict(int) + + all_track_names = "" + + for track in items: + if "name" not in formdata and track.album: + formdata["name"] = track.album + if "type" not in formdata and track.albumtype: + formdata["type"] = track.albumtype + if "barcode" not in formdata and track.barcode: + formdata["barcode"] = track.barcode + if "events.0.date.year" not in formdata and track.year: + formdata["events.0.date.year"] = track.year + if "events.0.date.month" not in formdata and track.month: + formdata["events.0.date.month"] = track.month + if "events.0.date.day" not in formdata and track.day: + formdata["events.0.date.day"] = track.day + + if track.label: + labels.add(track.label) + + if track.albumartists: + for artist in track.albumartists: + album_artists.add(artist) + elif track.albumartist: + album_artists.add(track.albumartist) + + if track.disc: + medium_index = track.disc - 1 + else: + medium_index = 0 + + track_index = track_counter[medium_index] + + if f"mediums.{medium_index}.format" not in formdata and track.media: + formdata[f"mediums.{medium_index}.format"] = track.media + + formdata[f"mediums.{medium_index}.track.{track_index}.name"] = ( + track.title + ) + formdata[f"mediums.{medium_index}.track.{track_index}.number"] = ( + track.track + ) + formdata[f"mediums.{medium_index}.track.{track_index}.length"] = ( + int(track.length * 1000) + ) # in milliseconds + + all_track_names += f"{track.title}\n" + + if track.artists: + track_artists = track.artists + elif track.artist: + track_artists = [track.artist] + else: + track_artists = [] + + for i, artist in enumerate(track_artists): + formdata[ + ( + f"mediums.{medium_index}.track.{track_index}." + f"artist_credit.names.{i}.artist.name" + ) + ] = artist + if join_phrase(i, len(track_artists)): + formdata[ + ( + f"mediums.{medium_index}.track.{track_index}." + f"artist_credit.names.{i}.join_phrase" + ) + ] = join_phrase(i, len(track_artists)) + + track_counter[medium_index] += 1 + + for i, label in enumerate(labels): + formdata[f"labels.{i}.name"] = label + + for i, artist in enumerate(album_artists): + formdata[f"artist_credit.names.{i}.artist.name"] = artist + if join_phrase(i, len(album_artists)): + formdata[f"artist_credit.names.{i}.join_phrase"] = join_phrase( + i, len(album_artists) + ) + + if redirect_uri: + formdata["redirect_uri"] = redirect_uri + + for default_field in [ + "type", + "language", + "script", + "status", + "packaging", + "edit_note", + ]: + if ( + default_field not in formdata + and self.config[f"create_release_default_{default_field}"] + ): + formdata[default_field] = self.config[ + f"create_release_default_{default_field}" + ].get() + + return formdata + + def _get_task_from_token(self, token: str) -> CreateReleaseTask: + # Try to get the token from query args, try to decode it, and try to find the + # associated CreateReleaseTask. + try: + payload = jwt.decode( + token, + self._jwt_key, + algorithms=self._jwt_algorithm, + ) + except InvalidTokenError as e: + self._log.error(f"Invalid token: {str(e)}") + raise BadRequest() + + if ( + "task_key" in payload + and payload["task_key"] in self._create_release_tasks + ): + return self._create_release_tasks[payload["task_key"]] + else: + self._log.error("task_key does not exist") + raise BadRequest() + + def _create_release_add(self): + token = request.args.get("token") + if token is None: + self._log.error("Missing token in request") + raise BadRequest() + + task = self._get_task_from_token(token) + task.browser_opened = True + return render_template("create_release_add.html", task=task) + + def _create_release_complete_add(self): + token = request.args.get("token") + release_mbid = request.args.get("release_mbid") + if token is None or release_mbid is None: + self._log.error("Missing token or release_mbid in request") + raise BadRequest() + + task = self._get_task_from_token(token) + task.result_release_mbid = release_mbid + return render_template("create_release_complete_add.html", task=task) + + def _start_server(self) -> bool: + if self._server: + return True + + if (port := self.create_release_server_port) == 0: + # Find a free port for us to use. The OS will select a random available one. + # We can't pass 0 to waitress.create_server directly, this won't work when + # using hostnames instead of IP addresses for + # create_release_server_hostname, waitress will then bind to multiple + # sockets, with different ports for each. + with socket.socket() as s: + s.bind((self.create_release_server_hostname, 0)) + port = s.getsockname()[1] + + try: + self._server = waitress.create_server( + self.flask_app, + host=self.create_release_server_hostname, + port=port, + ) + threading.Thread(target=self._server.run, daemon=True).start() + self._server_port = port + return True + except (PermissionError, ValueError, OSError) as e: + self._log.error( + f"Failed to start internal web server on " + f"{self.create_release_server_hostname}:{port}: {str(e)}" + ) + self._server = None + return False + + def _stop_server(self): + if self._server: + self._server.close() + self._server = None + self._server_port = None + + def _wait_for_condition(self, condition: Callable): + t = threading.current_thread() + while not condition(): + time.sleep(0.5) + # When running in multithreaded mode, wait for either condition to be true + # or until the executing thread wants to abort (such as when the user + # presses CTRL+c). + if isinstance(t, PipelineThread) and t.abort_flag: + raise KeyboardInterrupt() + + # When not running in multithreaded mode, KeyboardInterrupt will get + # propagated to this plugin as usual + + def before_choose_candidate_event( + self, session: ImportSession, task: ImportTask + ): if task.rec <= self.threshold: - return [ + choices = [ PromptChoice("p", "Print tracks", self.print_tracks), PromptChoice("o", "Open files with Picard", self.picard), ] + if jwt is not None and task.is_album: + choices += [ + PromptChoice( + "c", + "Create release on musicbrainz", + self.create_release_on_musicbrainz, + ), + ] + return choices - def picard(self, session, task): + def picard(self, session: ImportSession, task: ImportTask): paths = [] for p in task.paths: paths.append(displayable_path(p)) @@ -75,25 +414,99 @@ def picard(self, session, task): except OSError as exc: self._log.error(f"Could not open picard, got error:\n{exc}") - def print_tracks(self, session, task): - for i in sorted(task.items, key=lambda i: i.track): + def print_tracks(self, session: ImportSession, task: ImportTask): + self._print_tracks(task.items) + + def _print_tracks(self, items: List[Item]): + for i in sorted(items, key=lambda i: i.track): print_data(None, i, self.config["format"].as_str()) + def create_release_on_musicbrainz( + self, session: ImportSession, task: ImportTask + ): + return self._create_release_on_musicbrainz(task.items) + + def _create_release_on_musicbrainz(self, items: List[Item]): + if not self._start_server(): + return + task_key = str(uuid.uuid4()) + token = jwt.encode( + {"task_key": task_key}, self._jwt_key, algorithm=self._jwt_algorithm + ) + + url = ( + f"http://{self.create_release_server_hostname}:" + f"{self._server_port}/add?token={token}" + ) + redirect_uri = ( + f"http://{self.create_release_server_hostname}:" + f"{self._server_port}/complete_add?token={token}" + ) + + self._log.debug( + f"New create release task with task_key {task_key}, serving at {url}" + ) + + self._create_release_tasks[task_key] = CreateReleaseTask( + formdata=self._build_formdata( + items=items, + redirect_uri=( + redirect_uri if self.create_release_await_mbid else None + ), + ), + ) + + if self.create_release_method == "open_browser": + webbrowser.open(url) + elif self.create_release_method == "show_link": + print_(f"Open the following URL in your browser: {url}") + else: + return + + self._wait_for_condition( + lambda: self._create_release_tasks[task_key].browser_opened + ) + + if not self.create_release_await_mbid: + return + + print_("Waiting for MusicBrainz release ID...") + + self._wait_for_condition( + lambda: self._create_release_tasks[task_key].result_release_mbid + ) + mbid = self._create_release_tasks[task_key].result_release_mbid + + self._log.debug(f"Got release_mbid {mbid} for task_key {task_key}") + + _, _, prop = autotag.tag_album(items, search_ids=[mbid]) + return prop + def commands(self): """Add beet UI commands for mbsubmit.""" mbsubmit_cmd = ui.Subcommand( - "mbsubmit", help="Submit Tracks to MusicBrainz" + "mbsubmit", help="submit tracks to MusicBrainz" ) - def func(lib, opts, args): + def mbsubmit_cmd_func(lib, opts, args): items = lib.items(ui.decargs(args)) - self._mbsubmit(items) + self._print_tracks(items) + + mbsubmit_cmd.func = mbsubmit_cmd_func - mbsubmit_cmd.func = func + mbcreate_cmd = ui.Subcommand( + "mbsubmit-create", help="create release on MusicBrainz" + ) - return [mbsubmit_cmd] + def mbcreate_cmd_func(lib, ops, args): + items = lib.items(ui.decargs(args)) + print_(f"{len(items)} matching item(s) found.") + if len(items) == 0: + return + self._print_tracks(items) + self.create_release_await_mbid = False + self._create_release_on_musicbrainz(items) - def _mbsubmit(self, items): - """Print track information to be submitted to MusicBrainz.""" - for i in sorted(items, key=lambda i: i.track): - print_data(None, i, self.config["format"].as_str()) + mbcreate_cmd.func = mbcreate_cmd_func + + return [mbsubmit_cmd, mbcreate_cmd] diff --git a/beetsplug/mbsubmit/templates/create_release_add.html b/beetsplug/mbsubmit/templates/create_release_add.html new file mode 100644 index 0000000000..b85beaba0d --- /dev/null +++ b/beetsplug/mbsubmit/templates/create_release_add.html @@ -0,0 +1,12 @@ + + + +
+ + + diff --git a/beetsplug/mbsubmit/templates/create_release_complete_add.html b/beetsplug/mbsubmit/templates/create_release_complete_add.html new file mode 100644 index 0000000000..af8bce1510 --- /dev/null +++ b/beetsplug/mbsubmit/templates/create_release_complete_add.html @@ -0,0 +1,6 @@ + + + + +Release {{ task.result_release_mbid }} added. You can close this browser window now and return to beets.
+ diff --git a/docs/changelog.rst b/docs/changelog.rst index 3725e4993e..48786e7fe7 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -6,6 +6,11 @@ Unreleased Changelog goes here! Please add your entry to the bottom of one of the lists below! +New features: + +* :doc:`plugins/mbsubmit`: Add new prompt choice "Create release on musicbrainz", automating + the process as much as possible. + Bug fixes: * Improved naming of temporary files by separating the random part with the file extension. diff --git a/docs/plugins/mbsubmit.rst b/docs/plugins/mbsubmit.rst index 0e86ddc698..5109180fa0 100644 --- a/docs/plugins/mbsubmit.rst +++ b/docs/plugins/mbsubmit.rst @@ -9,11 +9,24 @@ that is parseable by MusicBrainz's `track parser`_. The prompt choices are: - Print the tracks to stdout in a format suitable for MusicBrainz's `track parser`_. +- Create a new release on MusicBrainz, opens + https://musicbrainz.org/release/add in a new browser window with + fields pre-populated using existing metadata. + - Open the program `Picard`_ with the unmatched folder as an input, allowing you to start submitting the unmatched release to MusicBrainz with many input fields already filled in, thanks to Picard reading the preexisting tags of the files. +To create new releases on MusicBrainz with this plugin you need to install the +`PyJWT`_ library with: + +.. code-block:: console + + $ pip install "beets[mbsubmit]" + +.. _PyJWT: https://pyjwt.readthedocs.io/en/stable/ + For the last option, `Picard`_ is assumed to be installed and available on the machine including a ``picard`` executable. Picard developers list `download options`_. `other GNU/Linux distributions`_ may distribute Picard via their @@ -34,7 +47,7 @@ choice is demonstrated:: No matching release found for 3 tracks. For help, see: https://beets.readthedocs.org/en/latest/faq.html#nomatch [U]se as-is, as Tracks, Group albums, Skip, Enter search, enter Id, aBort, - Print tracks, Open files with Picard? p + Print tracks, Open files with Picard, Create release on musicbrainz? p 01. An Obscure Track - An Obscure Artist (3:37) 02. Another Obscure Track - An Obscure Artist (2:05) 03. The Third Track - Another Obscure Artist (3:02) @@ -56,6 +69,24 @@ the recommended workflow is to copy the output of the ``Print tracks`` choice and paste it into the parser that can be found by clicking on the "Track Parser" button on MusicBrainz "Tracklist" tab. +Create release on MusicBrainz +----------------------------- + +The https://musicbrainz.org/release/add page can be seeded with existing +metadata, as described here: https://musicbrainz.org/doc/Development/Seeding/Release_Editor. +This works in the following way: + +1. When you select the option to create a release, a local web server is started. +2. You point your web browser to that web server, either by clicking a link + displayed in the console, or by having beets open the link automatically. +3. The opened web page will redirect you to MusicBrainz, and the form fields + will be prepopulated with metadata found in the files. MusicBrainz may + ask you to confirm the action. +4. You edit the release on MusicBrainz and click "Enter edit" to finish. +5. MusicBrainz will redirect you to the local web server, submitting the ID + of the newly created release. +6. beets will add the release using the release ID returned by MusicBrainz. + Configuration ------------- @@ -70,12 +101,41 @@ file. The following options are available: Default: ``medium`` (causing the choice to be displayed for all albums that have a recommendation of medium strength or lower). Valid values: ``none``, ``low``, ``medium``, ``strong``. +- **create_release_server_hostname**: The host name of the local web server used for the + 'Create release on musicbrainz' functionality. The default is '127.0.0.1'. + Adjust this if beets is running on a different host in your local network. + Be aware that this web server is not secured in any way. +- **create_release_server_port**: The port for the local web server. By default, + beets will choose a random available port for you. +- **create_release_method**: Either 'open_browser' to automatically open a new + window/tab in your local browser or 'show_link' to simply show the link on + the console. +- **create_release_await_mbid**: Whether or not to wait for you to create the + release on MusicBrainz. If true, waits for a callback from MusicBrainz with + the new release ID and proceeds to add the unmatched album using that Id. + If false, simply shows the select action prompt again. Default: true. +- **create_release_default_type**: The default release type when none can be + identified from the unmatched files. + See https://musicbrainz.org/doc/Release_Group/Type +- **create_release_default_language**: The default language as an `ISO 639-3`_ + code (eng, deu, jpn). +- **create_release_default_script**: The default script as an `ISO 15924`_ code + (Latn, Cyrl). +- **create_release_default_status**: The default status. Possible values: + official, promotion, bootleg, pseudo-release. +- **create_release_default_packaging**: The default packaging. + See https://musicbrainz.org/doc/Release/Packaging +- **create_release_default_edit_note**: The default edit note when submitting + new releases. - **picard_path**: The path to the ``picard`` executable. Could be an absolute path, and if not, ``$PATH`` is consulted. The default value is simply ``picard``. Windows users will have to find and specify the absolute path to their ``picard.exe``. That would probably be: ``C:\Program Files\MusicBrainz Picard\picard.exe``. +.. _ISO 639-3: https://en.wikipedia.org/wiki/List_of_ISO_639-3_codes +.. _ISO 15924: https://en.wikipedia.org/wiki/ISO_15924 + Please note that some values of the ``threshold`` configuration option might require other ``beets`` command line switches to be enabled in order to work as intended. In particular, setting a threshold of ``strong`` will only display diff --git a/poetry.lock b/poetry.lock index 1e3b4cd1d0..38f63a42a9 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.8.3 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.8.2 and should not be changed by hand. [[package]] name = "accessible-pygments" @@ -1842,6 +1842,23 @@ pycairo = ">=1.16" dev = ["flake8", "pytest", "pytest-cov"] docs = ["sphinx (>=4.0,<5.0)", "sphinx-rtd-theme (>=0.5,<2.0)"] +[[package]] +name = "pyjwt" +version = "2.8.0" +description = "JSON Web Token implementation in Python" +optional = false +python-versions = ">=3.7" +files = [ + {file = "PyJWT-2.8.0-py3-none-any.whl", hash = "sha256:59127c392cc44c2da5bb3192169a91f429924e17aff6534d70fdc02ab3e04320"}, + {file = "PyJWT-2.8.0.tar.gz", hash = "sha256:57e28d156e3d5c10088e0c68abb90bfac3df82b40a71bd0daa20c65ccd5c23de"}, +] + +[package.extras] +crypto = ["cryptography (>=3.4.0)"] +dev = ["coverage[toml] (==5.0.4)", "cryptography (>=3.4.0)", "pre-commit", "pytest (>=6.0.0,<7.0.0)", "sphinx (>=4.5.0,<5.0.0)", "sphinx-rtd-theme", "zope.interface"] +docs = ["sphinx (>=4.5.0,<5.0.0)", "sphinx-rtd-theme", "zope.interface"] +tests = ["coverage[toml] (==5.0.4)", "pytest (>=6.0.0,<7.0.0)"] + [[package]] name = "pylast" version = "5.3.0" @@ -2649,6 +2666,21 @@ h2 = ["h2 (>=4,<5)"] socks = ["pysocks (>=1.5.6,!=1.5.7,<2.0)"] zstd = ["zstandard (>=0.18.0)"] +[[package]] +name = "waitress" +version = "3.0.0" +description = "Waitress WSGI server" +optional = false +python-versions = ">=3.8.0" +files = [ + {file = "waitress-3.0.0-py3-none-any.whl", hash = "sha256:2a06f242f4ba0cc563444ca3d1998959447477363a2d7e9b8b4d75d35cfd1669"}, + {file = "waitress-3.0.0.tar.gz", hash = "sha256:005da479b04134cdd9dd602d1ee7c49d79de0537610d653674cc6cbde222b8a1"}, +] + +[package.extras] +docs = ["Sphinx (>=1.8.1)", "docutils", "pylons-sphinx-themes (>=1.0.9)"] +testing = ["coverage (>=5.0)", "pytest", "pytest-cov"] + [[package]] name = "werkzeug" version = "3.0.3" @@ -2707,6 +2739,7 @@ kodiupdate = ["requests"] lastgenre = ["pylast"] lastimport = ["pylast"] lyrics = ["beautifulsoup4", "langdetect", "requests"] +mbsubmit = ["flask", "pyjwt", "waitress"] metasync = ["dbus-python"] mpdstats = ["python-mpd2"] plexupdate = ["requests"] @@ -2720,4 +2753,4 @@ web = ["flask", "flask-cors"] [metadata] lock-version = "2.0" python-versions = ">=3.8,<4" -content-hash = "740281ee3ddba4c6015eab9cfc24bb947e8816e3b7f5a6bebeb39ff2413d7ac3" +content-hash = "ee38f50549700bc840963a37dc25d2bc85b1b0f842df73f86e90db1a8e6260a5" diff --git a/pyproject.toml b/pyproject.toml index 3ef11ac148..6e5c9bf7ff 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -62,6 +62,8 @@ reflink = { version = "*", optional = true } requests = { version = "*", optional = true } requests-oauthlib = { version = ">=0.6.1", optional = true } soco = { version = "*", optional = true } +pyjwt = { version = "*", optional = true } +waitress = { version = "*", optional = true } [tool.poetry.group.test.dependencies] beautifulsoup4 = "*" @@ -80,6 +82,8 @@ rarfile = "*" reflink = "*" requests_oauthlib = "*" responses = ">=0.3.0" +pyjwt = "*" +waitress = "*" [tool.poetry.group.format.dependencies] isort = { version = "<5.14", extras = ["colors"] } @@ -127,6 +131,7 @@ kodiupdate = ["requests"] lastgenre = ["pylast"] lastimport = ["pylast"] lyrics = ["beautifulsoup4", "langdetect", "requests"] +mbsubmit = ["pyjwt", "flask", "waitress"] metasync = ["dbus-python"] mpdstats = ["python-mpd2"] plexupdate = ["requests"] diff --git a/test/plugins/test_mbsubmit.py b/test/plugins/test_mbsubmit.py index 40024bc714..903c9bbd5b 100644 --- a/test/plugins/test_mbsubmit.py +++ b/test/plugins/test_mbsubmit.py @@ -11,10 +11,12 @@ # # The above copyright notice and this permission notice shall be # included in all copies or substantial portions of the Software. - - import unittest +from unittest.mock import patch + +import jwt +from beets.test._common import item from beets.test.helper import ( AutotagStub, ImportHelper, @@ -23,6 +25,7 @@ capture_stdout, control_stdin, ) +from beetsplug.mbsubmit import CreateReleaseTask, MBSubmitPlugin class MBSubmitPluginTest( @@ -51,7 +54,7 @@ def test_print_tracks_output(self): # Manually build the string for comparing the output. tracklist = ( - "Open files with Picard? " + "Create release on musicbrainz? " "01. Tag Title 1 - Tag Artist (0:01)\n" "02. Tag Title 2 - Tag Artist (0:01)" ) @@ -72,6 +75,188 @@ def test_print_tracks_output_as_tracks(self): ) self.assertIn(tracklist, output.getvalue()) + @patch.object(MBSubmitPlugin, "_wait_for_condition", autospec=True) + def test_create_release(self, wait_for_condition_mock): + self.matcher.matching = AutotagStub.BAD + + def _wait_for_condition(plugin: MBSubmitPlugin, condition): + self.assertEqual(1, len(plugin._create_release_tasks)) + task_id = list(plugin._create_release_tasks.keys())[0] + if wait_for_condition_mock.call_count == 1: + plugin._create_release_tasks[task_id].browser_opened = True + if wait_for_condition_mock.call_count == 2: + plugin._create_release_tasks[task_id].result_release_mbid = ( + "new_id" + ) + + wait_for_condition_mock.side_effect = _wait_for_condition + + with control_stdin("\n".join(["c", "s"])): + # Create release on MusicBrainz, Skip + self.importer.run() + + self.assertEqual(2, wait_for_condition_mock.call_count) + + def test_create_release_server_add(self): + plugin = MBSubmitPlugin() + client = plugin.flask_app.test_client() + + r = client.get("/") + self.assertEqual(404, r.status_code) + + r = client.get(("/add")) + self.assertEqual(400, r.status_code) + + r = client.get(("/add?token=12356")) + self.assertEqual(400, r.status_code) + + token = jwt.encode( + {"task_key": "unique_key"}, + plugin._jwt_key, + algorithm=plugin._jwt_algorithm, + ) + + r = client.get((f"/add?token={token}")) + self.assertEqual(400, r.status_code) + + task = CreateReleaseTask( + {"a": 1, "b": "Something'test\"", "c": 6767.74} + ) + plugin._create_release_tasks["unique_key"] = task + + self.assertFalse(task.browser_opened) + + r = client.get((f"/add?token={token}")) + self.assertEqual(200, r.status_code) + self.assertIn('', r.text) + self.assertIn( + '', + r.text, + ) + self.assertIn('', r.text) + + self.assertTrue(task.browser_opened) + + r = client.get(("/complete_add")) + self.assertEqual(400, r.status_code) + + r = client.get(("/complete_add?token=12356")) + self.assertEqual(400, r.status_code) + + r = client.get((f"/complete_add?token={token}")) + self.assertEqual(400, r.status_code) + + self.assertIsNone(task.result_release_mbid) + + r = client.get(f"/complete_add?token={token}&release_mbid=the_new_id") + self.assertEqual(200, r.status_code) + + self.assertEqual("the_new_id", task.result_release_mbid) + + def test_build_formdata_empty(self): + plugin = MBSubmitPlugin() + self.assertDictEqual({}, plugin._build_formdata([], None)) + + def test_build_formdata_redirect(self): + plugin = MBSubmitPlugin() + self.assertDictEqual( + {"redirect_uri": "redirect_to_somewhere"}, + plugin._build_formdata([], "redirect_to_somewhere"), + ) + + def test_build_formdata_items(self): + plugin = MBSubmitPlugin() + item1 = item(self.lib) + item1.track = 1 + item1.title = "Track 1" + item1.albumtype = "Album" + item1.barcode = 1234567890 + item1.media = "CD" + + item2 = item(self.lib) + item2.track = 2 + item2.artists = ["a", "b"] + item2.title = "Track 2" + item2.albumtype = "Album" + item2.barcode = 1234567890 + item2.media = "CD" + + item3 = item(self.lib) + item3.track = 3 + item3.disc = None + item3.artists = ["a", "b", "c"] + item3.title = "Track 3" + item3.albumtype = "Album" + item3.barcode = 1234567890 + item3.media = "Digital Media" + + self.maxDiff = None + + self.assertDictEqual( + { + "name": "the album", + "barcode": "1234567890", + "type": "Album", + "events.0.date.year": 1, + "events.0.date.month": 2, + "events.0.date.day": 3, + "artist_credit.names.0.artist.name": "the album artist", + "mediums.5.format": "CD", + "mediums.5.track.0.artist_credit.names.0.artist.name": "the artist", + "mediums.5.track.0.length": 60000, + "mediums.5.track.0.name": "Track 1", + "mediums.5.track.0.number": 1, + "mediums.5.track.1.artist_credit.names.0.artist.name": "a", + "mediums.5.track.1.artist_credit.names.0.join_phrase": " & ", + "mediums.5.track.1.artist_credit.names.1.artist.name": "b", + "mediums.5.track.1.length": 60000, + "mediums.5.track.1.name": "Track 2", + "mediums.5.track.1.number": 2, + "mediums.0.format": "Digital Media", + "mediums.0.track.0.artist_credit.names.0.artist.name": "a", + "mediums.0.track.0.artist_credit.names.0.join_phrase": ", ", + "mediums.0.track.0.artist_credit.names.1.artist.name": "b", + "mediums.0.track.0.artist_credit.names.1.join_phrase": " & ", + "mediums.0.track.0.artist_credit.names.2.artist.name": "c", + "mediums.0.track.0.length": 60000, + "mediums.0.track.0.name": "Track 3", + "mediums.0.track.0.number": 3, + }, + plugin._build_formdata([item1, item2, item3], None), + ) + + def test_build_formdata_defaults(self): + plugin = MBSubmitPlugin() + plugin.config["create_release_default_type"] = "Album" + plugin.config["create_release_default_language"] = "eng" + plugin.config["create_release_default_script"] = "Latn" + plugin.config["create_release_default_status"] = "Official" + plugin.config["create_release_default_packaging"] = "Box" + plugin.config["create_release_default_edit_note"] = ( + "Created via beets mbsubmit plugin" + ) + self.assertDictEqual( + { + "type": "Album", + "language": "eng", + "script": "Latn", + "status": "Official", + "packaging": "Box", + "edit_note": "Created via beets mbsubmit plugin", + }, + plugin._build_formdata([], None), + ) + + def test_build_formdata_defaults_override(self): + plugin = MBSubmitPlugin() + plugin.config["create_release_default_type"] = "Album" + + item1 = item(self.lib) + item1.albumtype = "Single" + + formdata = plugin._build_formdata([item1], None) + self.assertEqual(formdata["type"], "Single") + def suite(): return unittest.TestLoader().loadTestsFromName(__name__)