diff --git a/.flake8 b/.flake8 index af259f9..400107b 100644 --- a/.flake8 +++ b/.flake8 @@ -1,3 +1,5 @@ [flake8] max-line-length = 100 ignore = E203,E402,E501,W503,E226 +per-file-ignores = + pyabc2/sources/_lzstring.py: E111,E225,E261,E302,E303,E703 diff --git a/.gitignore b/.gitignore index 3f077ff..d3d314b 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,7 @@ .vscode pyabc2/sources/_* !pyabc2/sources/__init__.py +!pyabc2/sources/_lzstring.py poetry.lock venv*/ docs/api/ diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index c620b1b..b0a33be 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -14,12 +14,13 @@ repos: rev: '7.0.0' hooks: - id: isort + exclude: ^pyabc2/sources/_lzstring.py - repo: https://github.com/psf/black-pre-commit-mirror rev: '25.12.0' hooks: - id: black - exclude: ^examples/ + exclude: ^examples/|^pyabc2/sources/_lzstring.py - repo: https://github.com/PyCQA/flake8 rev: '7.3.0' diff --git a/docs/api.rst b/docs/api.rst index 85a7548..1078a45 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -133,3 +133,20 @@ Functions: the_session.load the_session.load_meta the_session.load_url + +Eskin ABC Tools +--------------- + +.. automodule:: pyabc2.sources.eskin + +Functions: + +.. currentmodule:: pyabc2.sources + +.. autosummary:: + :toctree: api/ + + eskin.load_meta + eskin.load_url + eskin.abctools_url_to_abc + eskin.abc_to_abctools_url diff --git a/docs/changes.md b/docs/changes.md index 7fafd85..a154a6a 100644 --- a/docs/changes.md +++ b/docs/changes.md @@ -5,6 +5,7 @@ * Fix loading The Session sets data ({pull}`77`) * Fix HTML display of pitch classes with double accidentals ({pull}`76`) * Fix Norbeck URL gen for multi-word tune types (e.g., slip jig, set dance) +* Add initial support for loading Eskin ABC Transcription Tools tunebooks ({pull}`86`) ## v0.1.0 (2025-07-02) diff --git a/docs/examples/sources.ipynb b/docs/examples/sources.ipynb index 14d3418..83437f0 100644 --- a/docs/examples/sources.ipynb +++ b/docs/examples/sources.ipynb @@ -17,7 +17,7 @@ "metadata": {}, "outputs": [], "source": [ - "from pyabc2.sources import load_example, norbeck, the_session" + "from pyabc2.sources import load_example, norbeck, the_session, eskin" ] }, { @@ -360,11 +360,70 @@ " )\n", ");" ] + }, + { + "cell_type": "markdown", + "id": "31", + "metadata": {}, + "source": [ + "## Eskin\n", + "\n", + "Michael Eskin has tunebooks available at , viewable with his ABC Transcription Tools.\n", + "\n", + "We can load selected tunebooks from there, e.g. the King Street Sessions:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "32", + "metadata": {}, + "outputs": [], + "source": [ + "df = eskin.load_meta(\"kss\")\n", + "df" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "33", + "metadata": {}, + "outputs": [], + "source": [ + "df.group.value_counts()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "34", + "metadata": {}, + "outputs": [], + "source": [ + "from pyabc2 import Tune\n", + "\n", + "Tune(df.query(\"group == 'jigs'\").iloc[0].abc)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "35", + "metadata": {}, + "outputs": [], + "source": [ + "from IPython.display import display, Markdown\n", + "\n", + "url = \"https://michaeleskin.com/abctools/abctools.html?lzw=BoLgUAKiBiD2BOACCALApogMrAbhg8gGaICyArgM4CWAxmAEogUA2VADogFZUDmYAwiExUAXon4BDePFjNmYEiACcAegAcYTCACM6sAGkQAcTBGAogBFEFs0cQBBIwCFEAHwdG7zgCaI0333dzKxs7Rxo3RCc0DCd7F3MzRBBXMB5-PxVCFR4EpxUaFUDEdN80HgAjRAkAJmJ3Uszs3Id8wuL-F28nMKdAtIy0LJy8gqLIxvKq2olIipimnIxankjOxG7e+zdUoA\"\n", + "display(Markdown(f\"<{url}>\"))\n", + "eskin.load_url(url)" + ] } ], "metadata": { "kernelspec": { - "display_name": "venv", + "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, diff --git a/pyabc2/sources/__init__.py b/pyabc2/sources/__init__.py index 0ae10db..d662665 100644 --- a/pyabc2/sources/__init__.py +++ b/pyabc2/sources/__init__.py @@ -84,6 +84,7 @@ def load_url(url: str) -> Tune: - Norbeck (``norbeck.nu/abc/``) - The Session (``thesession.org``) + - Eskin ABC Tools (``michaeleskin.com/abctools/``) See Also -------- @@ -92,12 +93,14 @@ def load_url(url: str) -> Tune: """ from urllib.parse import urlsplit - from . import norbeck, the_session + from . import eskin, norbeck, the_session res = urlsplit(url) if res.netloc in norbeck._URL_NETLOCS: return norbeck.load_url(url) elif res.netloc in the_session._URL_NETLOCS: return the_session.load_url(url) + elif res.netloc in eskin._URL_NETLOCS: + return eskin.load_url(url) else: raise NotImplementedError(f"loading URL from {res.netloc} not implemented.") diff --git a/pyabc2/sources/_lzstring.py b/pyabc2/sources/_lzstring.py new file mode 100644 index 0000000..bc7aacd --- /dev/null +++ b/pyabc2/sources/_lzstring.py @@ -0,0 +1,444 @@ +# https://github.com/gkovacs/lz-string-python/blob/f1c109544413c1ba910c4af99337c14da1680441/lzstring/__init__.py +# - Remove dep on future, fix 3.12 support (PR#6) +# - Remove unused imports +# - Fix UTF-16 decompress (PR#7) + +from __future__ import division +from __future__ import unicode_literals +from __future__ import print_function +from __future__ import absolute_import +from builtins import range +from builtins import chr +from builtins import object +import math + + +keyStrBase64 = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/=" +keyStrUriSafe = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+-$" +baseReverseDic = {}; + +class Object(object): + def __init__(self, **kwargs): + for k, v in kwargs.items(): + setattr(self, k, v) + + +def getBaseValue(alphabet, character): + if alphabet not in baseReverseDic: + baseReverseDic[alphabet] = {} + for i in range(len(alphabet)): + baseReverseDic[alphabet][alphabet[i]] = i + return baseReverseDic[alphabet][character] + + +def _compress(uncompressed, bitsPerChar, getCharFromInt): + if (uncompressed is None): + return "" + + context_dictionary = {} + context_dictionaryToCreate= {} + context_c = "" + context_wc = "" + context_w = "" + context_enlargeIn = 2 # Compensate for the first entry which should not count + context_dictSize = 3 + context_numBits = 2 + context_data = [] + context_data_val = 0 + context_data_position = 0 + + for ii in range(len(uncompressed)): + if isinstance(uncompressed, (bytes)): + context_c = chr(uncompressed[ii]) + else: + context_c = uncompressed[ii] + if context_c not in context_dictionary: + context_dictionary[context_c] = context_dictSize + context_dictSize += 1 + context_dictionaryToCreate[context_c] = True + + context_wc = context_w + context_c + if context_wc in context_dictionary: + context_w = context_wc + else: + if context_w in context_dictionaryToCreate: + if ord(context_w[0]) < 256: + for i in range(context_numBits): + context_data_val = (context_data_val << 1) + if context_data_position == bitsPerChar-1: + context_data_position = 0 + context_data.append(getCharFromInt(context_data_val)) + context_data_val = 0 + else: + context_data_position += 1 + value = ord(context_w[0]) + for i in range(8): + context_data_val = (context_data_val << 1) | (value & 1) + if context_data_position == bitsPerChar - 1: + context_data_position = 0 + context_data.append(getCharFromInt(context_data_val)) + context_data_val = 0 + else: + context_data_position += 1 + value = value >> 1 + + else: + value = 1 + for i in range(context_numBits): + context_data_val = (context_data_val << 1) | value + if context_data_position == bitsPerChar - 1: + context_data_position = 0 + context_data.append(getCharFromInt(context_data_val)) + context_data_val = 0 + else: + context_data_position += 1 + value = 0 + value = ord(context_w[0]) + for i in range(16): + context_data_val = (context_data_val << 1) | (value & 1) + if context_data_position == bitsPerChar - 1: + context_data_position = 0 + context_data.append(getCharFromInt(context_data_val)) + context_data_val = 0 + else: + context_data_position += 1 + value = value >> 1 + context_enlargeIn -= 1 + if context_enlargeIn == 0: + context_enlargeIn = math.pow(2, context_numBits) + context_numBits += 1 + del context_dictionaryToCreate[context_w] + else: + value = context_dictionary[context_w] + for i in range(context_numBits): + context_data_val = (context_data_val << 1) | (value & 1) + if context_data_position == bitsPerChar - 1: + context_data_position = 0 + context_data.append(getCharFromInt(context_data_val)) + context_data_val = 0 + else: + context_data_position += 1 + value = value >> 1 + + context_enlargeIn -= 1 + if context_enlargeIn == 0: + context_enlargeIn = math.pow(2, context_numBits) + context_numBits += 1 + + # Add wc to the dictionary. + context_dictionary[context_wc] = context_dictSize + context_dictSize += 1 + context_w = str(context_c) + + # Output the code for w. + if context_w != "": + if context_w in context_dictionaryToCreate: + if ord(context_w[0]) < 256: + for i in range(context_numBits): + context_data_val = (context_data_val << 1) + if context_data_position == bitsPerChar-1: + context_data_position = 0 + context_data.append(getCharFromInt(context_data_val)) + context_data_val = 0 + else: + context_data_position += 1 + value = ord(context_w[0]) + for i in range(8): + context_data_val = (context_data_val << 1) | (value & 1) + if context_data_position == bitsPerChar - 1: + context_data_position = 0 + context_data.append(getCharFromInt(context_data_val)) + context_data_val = 0 + else: + context_data_position += 1 + value = value >> 1 + else: + value = 1 + for i in range(context_numBits): + context_data_val = (context_data_val << 1) | value + if context_data_position == bitsPerChar - 1: + context_data_position = 0 + context_data.append(getCharFromInt(context_data_val)) + context_data_val = 0 + else: + context_data_position += 1 + value = 0 + value = ord(context_w[0]) + for i in range(16): + context_data_val = (context_data_val << 1) | (value & 1) + if context_data_position == bitsPerChar - 1: + context_data_position = 0 + context_data.append(getCharFromInt(context_data_val)) + context_data_val = 0 + else: + context_data_position += 1 + value = value >> 1 + context_enlargeIn -= 1 + if context_enlargeIn == 0: + context_enlargeIn = math.pow(2, context_numBits) + context_numBits += 1 + del context_dictionaryToCreate[context_w] + else: + value = context_dictionary[context_w] + for i in range(context_numBits): + context_data_val = (context_data_val << 1) | (value & 1) + if context_data_position == bitsPerChar - 1: + context_data_position = 0 + context_data.append(getCharFromInt(context_data_val)) + context_data_val = 0 + else: + context_data_position += 1 + value = value >> 1 + + context_enlargeIn -= 1 + if context_enlargeIn == 0: + context_enlargeIn = math.pow(2, context_numBits) + context_numBits += 1 + + # Mark the end of the stream + value = 2 + for i in range(context_numBits): + context_data_val = (context_data_val << 1) | (value & 1) + if context_data_position == bitsPerChar - 1: + context_data_position = 0 + context_data.append(getCharFromInt(context_data_val)) + context_data_val = 0 + else: + context_data_position += 1 + value = value >> 1 + + # Flush the last char + while True: + context_data_val = (context_data_val << 1) + if context_data_position == bitsPerChar - 1: + context_data.append(getCharFromInt(context_data_val)) + break + else: + context_data_position += 1 + + return "".join(context_data) + + +def _decompress(length, resetValue, getNextValue): + dictionary = {} + enlargeIn = 4 + dictSize = 4 + numBits = 3 + entry = "" + result = [] + + data = Object( + val=getNextValue(0), + position=resetValue, + index=1 + ) + + for i in range(3): + dictionary[i] = i + + bits = 0 + maxpower = math.pow(2, 2) + power = 1 + + while power != maxpower: + resb = data.val & data.position + data.position >>= 1 + if data.position == 0: + data.position = resetValue + data.val = getNextValue(data.index) + data.index += 1 + + bits |= power if resb > 0 else 0 + power <<= 1; + + next = bits + if next == 0: + bits = 0 + maxpower = math.pow(2, 8) + power = 1 + while power != maxpower: + resb = data.val & data.position + data.position >>= 1 + if data.position == 0: + data.position = resetValue + data.val = getNextValue(data.index) + data.index += 1 + bits |= power if resb > 0 else 0 + power <<= 1 + c = chr(bits) + elif next == 1: + bits = 0 + maxpower = math.pow(2, 16) + power = 1 + while power != maxpower: + resb = data.val & data.position + data.position >>= 1 + if data.position == 0: + data.position = resetValue; + data.val = getNextValue(data.index) + data.index += 1 + bits |= power if resb > 0 else 0 + power <<= 1 + c = chr(bits) + elif next == 2: + return "" + + dictionary[3] = c + w = c + result.append(c) + counter = 0 + while True: + counter += 1 + if data.index > length: + return "" + + bits = 0 + maxpower = math.pow(2, numBits) + power = 1 + while power != maxpower: + resb = data.val & data.position + data.position >>= 1 + if data.position == 0: + data.position = resetValue; + data.val = getNextValue(data.index) + data.index += 1 + bits |= power if resb > 0 else 0 + power <<= 1 + + c = bits + if c == 0: + bits = 0 + maxpower = math.pow(2, 8) + power = 1 + while power != maxpower: + resb = data.val & data.position + data.position >>= 1 + if data.position == 0: + data.position = resetValue + data.val = getNextValue(data.index) + data.index += 1 + bits |= power if resb > 0 else 0 + power <<= 1 + + dictionary[dictSize] = chr(bits) + dictSize += 1 + c = dictSize - 1 + enlargeIn -= 1 + elif c == 1: + bits = 0 + maxpower = math.pow(2, 16) + power = 1 + while power != maxpower: + resb = data.val & data.position + data.position >>= 1 + if data.position == 0: + data.position = resetValue; + data.val = getNextValue(data.index) + data.index += 1 + bits |= power if resb > 0 else 0 + power <<= 1 + dictionary[dictSize] = chr(bits) + dictSize += 1 + c = dictSize - 1 + enlargeIn -= 1 + elif c == 2: + return "".join(result) + + + if enlargeIn == 0: + enlargeIn = math.pow(2, numBits) + numBits += 1 + + if c in dictionary: + entry = dictionary[c] + else: + if c == dictSize: + entry = w + w[0] + else: + return None + result.append(entry) + + # Add w+entry[0] to the dictionary. + dictionary[dictSize] = w + entry[0] + dictSize += 1 + enlargeIn -= 1 + + w = entry + if enlargeIn == 0: + enlargeIn = math.pow(2, numBits) + numBits += 1 + + +class LZString(object): + @staticmethod + def compress(uncompressed): + return _compress(uncompressed, 16, chr) + + @staticmethod + def compressToUint8Array(uncompressed): + return bytes([ord(x) for x in _compress(uncompressed, 8, chr)]) + + + @staticmethod + def compressToUTF16(uncompressed): + if uncompressed is None: + return "" + return _compress(uncompressed, 15, lambda a: chr(a+32)) + " " + + @staticmethod + def compressToBase64(uncompressed): + if uncompressed is None: + return "" + res = _compress(uncompressed, 6, lambda a: keyStrBase64[a]) + # To produce valid Base64 + end = len(res) % 4 + if end > 0: + res += "="*(4 - end) + return res + + @staticmethod + def compressToEncodedURIComponent(uncompressed): + if uncompressed is None: + return "" + return _compress(uncompressed, 6, lambda a: keyStrUriSafe[a]) + + @staticmethod + def decompress(compressed): + if compressed is None: + return "" + if compressed == "": + return None + return _decompress(len(compressed), 32768, lambda index: ord(compressed[index])) + + @staticmethod + def decompressFromUint8Array(compressed): + if compressed is None: + return "" + if compressed == "": + return None + return _decompress(len(compressed), 128, lambda index: compressed[index]) + + @staticmethod + def decompressFromUTF16(compressed): + if compressed is None: + return "" + if compressed == "": + return None + return _decompress(len(compressed), 16384, lambda index: ord(compressed[index]) - 32) + + @staticmethod + def decompressFromBase64(compressed): + if compressed is None: + return "" + if compressed == "": + return None + return _decompress(len(compressed), 32, lambda index: getBaseValue(keyStrBase64, compressed[index])) + + @staticmethod + def decompressFromEncodedURIComponent(compressed): + if compressed is None: + return "" + if compressed == "": + return None + compressed = compressed.replace(" ", "+") + return _decompress(len(compressed), 32, lambda index: getBaseValue(keyStrUriSafe, compressed[index])) diff --git a/pyabc2/sources/eskin.py b/pyabc2/sources/eskin.py new file mode 100644 index 0000000..03defab --- /dev/null +++ b/pyabc2/sources/eskin.py @@ -0,0 +1,305 @@ +""" +Load data from the Eskin ABC Transcription Tools tunebook websites +(https://michaeleskin.com/tunebooks.html). + +Requires: + +* `requests `__ +""" + +import json +import re +from pathlib import Path +from typing import TYPE_CHECKING, Literal, NamedTuple, Tuple, Union +from urllib.parse import parse_qs, urlsplit + +from pyabc2 import Tune +from pyabc2._util import get_logger as _get_logger +from pyabc2.sources._lzstring import LZString + +if TYPE_CHECKING: # pragma: no cover + import pandas + +logger = _get_logger(__name__) + +HERE = Path(__file__).parent + +SAVE_TO = HERE / "_eskin" + +_TBWS = "https://michaeleskin.com/tunebook_websites" +_CCE_SD = "https://michaeleskin.com/cce_sd" +_TUNEBOOK_KEY_TO_URL = { + # https://michaeleskin.com/tunebooks.html#websites_irish + "kss": f"{_TBWS}/king_street_sessions_tunebook_17Jan2025.html", + "oflaherty_2025": f"{_TBWS}/oflahertys_2025_retreat_tunes_final.html", + "carp": f"{_TBWS}/carp_celtic_jam_tunebook_17Jan2025.html", + "hardy_2024": f"{_TBWS}/paul_hardy_2024_8feb2025.html", + "hardy_2025": f"{_TBWS}/paul_hardy_2025_12aug2025.html", + "cce_dublin_2001": f"{_CCE_SD}/cce_dublin_2001_tunebook_17Jan2025.html", + "cce_san_diego_jan2025": f"{_CCE_SD}/cce_san_diego_tunes_31jan2025.html", + "cce_san_diego_nov2025": f"{_CCE_SD}/cce_san_diego_tunes_10nov2025.html", + # https://michaeleskin.com/tunebooks.html#websites_18th_century_collections + "aird": f"{_TBWS}/james_aird_campin_18jan2025.html", + "playford1": f"{_TBWS}/playford_1_partington_17jan2025.html", + "playford2": f"{_TBWS}/playford_2_partington_17jan2025.html", + "playford3": f"{_TBWS}/playford_3_partington_20jan2025.html", +} +"""Mapping of tunebook keys (defined here, not by Eskin; e.g. 'kss' for King Street Sessions) +to tunebook website URLs, which come from this page: +https://michaeleskin.com/tunebooks.html +""" + +# Definitive versions +_TUNEBOOK_ALIAS = { + "cce_san_diego": "cce_san_diego_nov2025", +} +for _alias, _target in _TUNEBOOK_ALIAS.items(): + _TUNEBOOK_KEY_TO_URL[_alias] = _TUNEBOOK_KEY_TO_URL[_target] + +_URL_NETLOCS = {"michaeleskin.com", "www.michaeleskin.com"} + + +def abctools_url_to_abc( + url: str, + *, + remove_prefs: Union[str, Tuple[str, ...], Literal[False]] = ( + r"%%titlefont ", + r"%%subtitlefont ", + r"%%infofont ", + r"%%partsfont ", + r"%%textfont ", + r"%%tempofont ", + r"%irish_rolls_on", + r"%swing", + r"%abcjs_", + r"%%MIDI ", + r"%add_all_playback_links", + ), +) -> str: + """Extract the ABC from an Eskin abctools (``michaeleskin.com/abctools/``) share URL. + + More info: https://michaeleskin.com/tools/generate_share_link.html + + Parameters + ---------- + remove_prefs + Remove lines starting with these prefixes. + Use ``False`` or an empty iterable to keep all lines instead. + """ + + if not remove_prefs: + remove_prefs = () + elif isinstance(remove_prefs, str): + remove_prefs = (remove_prefs,) + + res = urlsplit(url) + if res.netloc not in _URL_NETLOCS: + logger.debug(f"Unexpected Eskin URL netloc: {res.netloc}") + if not res.path.startswith("/abctools/"): + logger.debug(f"Unexpected Eskin URL path: {res.path}") + + query_params = parse_qs(res.query) + try: + (lzw,) = query_params["lzw"] + except Exception as e: + raise ValueError("URL does not contain required 'lzw' parameter") from e + # Note `+` has been replaced with space by parse_qs + # Note js LZString.compressToEncodedURIComponent() is used to compress/encode the ABC + + try: + abc = LZString.decompressFromEncodedURIComponent(lzw) + except Exception as e: + raise RuntimeError("Failed to decompress LZString data") from e + if abc is None: + raise RuntimeError("Failed to decompress LZString data") + + wanted_lines = [ + line.strip() for line in abc.splitlines() if not line.lstrip().startswith(remove_prefs) + ] + + return "\n".join(wanted_lines) + + +def abc_to_abctools_url(abc: str) -> str: + """Create an Eskin abctools (``michaeleskin.com/abctools/``) share URL for `abc`. + + More info: https://michaeleskin.com/tools/generate_share_link.html + """ + + # Must start with 'X:' (seems value is not required) + if not abc.lstrip().startswith("X"): + abc = "X:\n" + abc + + lzw = LZString.compressToEncodedURIComponent(abc) + + return f"https://michaeleskin.com/abctools/abctools.html?lzw={lzw}" + + +class EskinTunebookInfo(NamedTuple): + key: str + url: str + stem: str + path: Path + + +def get_tunebook_info(key: str) -> EskinTunebookInfo: + key = key.lower() + try: + url = _TUNEBOOK_KEY_TO_URL[key] + except KeyError: + raise ValueError( + f"Unknown Eskin tunebook key: {key!r}. Valid options: {sorted(_TUNEBOOK_KEY_TO_URL)}." + ) from None + stem = Path(urlsplit(url).path).stem + + return EskinTunebookInfo( + key=key, + url=url, + stem=stem, + path=SAVE_TO / f"{stem}.json.gz", + ) + + +def _download_data(key: str): + """Extract and save the tune data from the tunebook webpage as JSON.""" + import gzip + + import requests + + tb_info = get_tunebook_info(key) + + r = requests.get(tb_info.url, timeout=5) + r.raise_for_status() + html = r.text + + # First find the tune type options by searching for 'tunes = type;' + types = sorted(set(re.findall(r"tunes = (.*?);", html))) + if types: + pass + elif "const tunes=[" in html: # no types, just one list of tunes + types = ["tunes"] + else: + raise RuntimeError("Unable to detect tune types") + + # Then the data are in like `const reels=[{...}, ...];` + all_data = {} + for type_ in types: + m = re.search(rf"const {type_}=\[(.*?)\];", html, flags=re.DOTALL) + if m is None: + raise RuntimeError(f"Unable to find data for type {type_!r}") + s_data = "[" + m.group(1) + "]" + + try: + data = json.loads(s_data) + except json.JSONDecodeError as e: + w = 25 + a = max(0, e.pos - w) + b = min(len(s_data), e.pos + w) + raise RuntimeError( + f"Error parsing JSON data for Eskin tunebook {key!r} group {type_!r}. " + f"Context ({a}:{b}): {s_data[a:b]!r}" + ) from e + + for d in data: + d["name"] = d.pop("Name") + d["abc"] = abctools_url_to_abc(d.pop("URL")) + if d: # pragma: no cover + logger.debug(f"Extra fields in Eskin tune data: {sorted(d)}") + + all_data[type_] = data + + SAVE_TO.mkdir(exist_ok=True) + with gzip.open(tb_info.path, "wt") as f: + json.dump(all_data, f, indent=2) + + +def _load_data(key: str): + """Load the data from the saved JSON.""" + import gzip + + with gzip.open(get_tunebook_info(key).path, "rt") as f: + return json.load(f) + + +def load_meta(key: str, *, redownload: bool = False) -> "pandas.DataFrame": + """Load the tunebook data, no parsing. + + Parameters + ---------- + key + Tunebook key (ID), e.g. ``'kss'`` for King Street Sessions. + + .. list-table:: + :header-rows: 1 + :widths: 15 85 + + * - Key + - Description + * - ``aird`` + - James Aird's Airs by Jack Campin + * - ``carp`` + - CARP Celtic Jam Tunebook + * - ``cce_dublin_2001`` + - CCE Dublin 2001 + * - ``cce_san_diego`` + - CCE San Diego + * - ``hardy_{2024,2025}`` + - Paul Hardy's Session Tunebook + * - ``kss`` + - King Street Sessions Tunebook + * - ``oflaherty_2025`` + - O'Flaherty's Retreat Tunes + * - ``playford{1,2,3}`` + - Playford vols. 1--3 + + See https://michaeleskin.com/tunebooks.html + for more information. + redownload + Re-download the data file. + + See Also + -------- + :doc:`/examples/sources` + """ + import pandas as pd + + tb_info = get_tunebook_info(key) + + fp = tb_info.path + if not fp.is_file() or redownload: + print("downloading...", end=" ", flush=True) + _download_data(key) + print("done") + + data = _load_data(key) + + dfs = [] + for group, tunes in data.items(): + df_ = pd.DataFrame(tunes) + df_["group"] = group + dfs.append(df_) + df = pd.concat(dfs, ignore_index=True) + + return df + + +def load_url(url: str) -> Tune: + """Load tune from an Eskin abctools (``michaeleskin.com/abctools/``) share URL. + + Notes + ----- + The ABC is encoded in the URL, so we don't need to load the page. + """ + abc = abctools_url_to_abc(url) + return Tune(abc) + + +if __name__ == "__main__": # pragma: no cover + from . import load_example_abc + + abc = load_example_abc("For the Love of Music") + url = abc_to_abctools_url(abc) + print(url) + + kss = load_meta("kss") + print(kss.keys()) diff --git a/pyproject.toml b/pyproject.toml index a629735..9e6402d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -58,3 +58,9 @@ markers = ["slow"] exclude = ["^venv"] install_types = true ignore_missing_imports = true + +[tool.pyright] +ignore = ["pyabc2/sources/_lzstring.py"] + +[tool.coverage.run] +omit = ["pyabc2/sources/_lzstring.py"] diff --git a/tests/test_sources.py b/tests/test_sources.py index 4950245..823648a 100644 --- a/tests/test_sources.py +++ b/tests/test_sources.py @@ -5,7 +5,15 @@ from pyabc2 import Key from pyabc2.parse import Tune -from pyabc2.sources import examples, load_example, load_example_abc, load_url, norbeck, the_session +from pyabc2.sources import ( + eskin, + examples, + load_example, + load_example_abc, + load_url, + norbeck, + the_session, +) NORBECK_IRISH_COUNT = 2733 @@ -230,15 +238,17 @@ def test_int_downcast(): assert s3.dtype == expected_dtype_ext -def test_load_url_the_session(): - tune = load_url("https://thesession.org/tunes/10000") +@pytest.mark.parametrize("netloc", sorted(the_session._URL_NETLOCS)) +def test_load_url_the_session(netloc): + tune = load_url(f"https://{netloc}/tunes/10000") assert tune.title == "Brian Quinn's" -def test_load_url_norbeck(): +@pytest.mark.parametrize("netloc", sorted(norbeck._URL_NETLOCS)) +def test_load_url_norbeck(netloc): import requests - url = "https://norbeck.nu/abc/display.asp?rhythm=slip+jig&ref=106" + url = f"https://{netloc}/abc/display.asp?rhythm=slip+jig&ref=106" try: tune = load_url(url) except requests.exceptions.ReadTimeout as e: @@ -247,6 +257,160 @@ def test_load_url_norbeck(): assert tune.title == "For The Love Of Music" +@pytest.mark.parametrize("netloc", sorted(eskin._URL_NETLOCS)) +def test_load_url_eskin(netloc): + url = f"https://{netloc}/abctools/abctools.html?lzw=BoLgUAKiBiD2BOACCALApogMrAbhg8gGaICyArgM4CWAxmAEogUA2VADogFZUDmYAwiExUAXon4BDePFjNmYEiACcAegAcYTCACM6sAGkQAcTBGAogBFEFs0cQBBIwCFEAHwdG7zgCaI0333dzKxs7Rxo3RCc0DCd7F3MzRBBXMB5-PxVCFR4EpxUaFUDEdN80HgAjRAkAJmJ3Uszs3Id8wuL-F28nMKdAtIy0LJy8gqLIxvKq2olIipimnIxankjOxG7e+zdUoA" + tune = load_url(url) + assert tune.title == "For The Love Of Music" + + def test_load_url_invalid_domain(): with pytest.raises(NotImplementedError): _ = load_url("https://www.google.com") + + +def test_eskin_tunebook_bad_url_redirects(): + import requests + + # Bad URL (2025 -> 3025) + # Redirects to the home page. + # Nothing in `r.history`. `allow_redirects=False` has no impact. + url = "https://michaeleskin.com/cce_sd/cce_san_diego_tunes_10nov3025.html" + r = requests.head(url, timeout=5) + r.raise_for_status() + + assert r.status_code == 302 + assert r.headers.get("Location", "").rstrip("/") == "https://michaeleskin.com" + assert r.history == [] + assert r.is_redirect + + +@pytest.mark.parametrize("key", eskin._TUNEBOOK_KEY_TO_URL) +def test_eskin_tunebook_url_exist(key): + import requests + + url = eskin._TUNEBOOK_KEY_TO_URL[key] + r = requests.head(url, timeout=5) + r.raise_for_status() + # Bad URLs seem to just redirect to his homepage, + # so we need to check the final URL + # TODO: maybe move this to the module + if ( + r.status_code == 302 + and r.headers.get("Location", "").rstrip("/") == "https://michaeleskin.com" + ): + raise ValueError(f"{key!r} URL {url} redirects to homepage") + + +def test_eskin_tunebook_url_current(): + import requests + + url = "https://michaeleskin.com/tunebooks.html" + r = requests.get(url, timeout=5) + r.raise_for_status() + if ( + r.status_code == 302 + and r.headers.get("Location", "").rstrip("/") == "https://michaeleskin.com" + ): + raise ValueError(f"URL {url} redirects to homepage") + html = r.text + + old_keys = { + "cce_san_diego_jan2025", + "hardy_2024", + } + for key, tb_url in eskin._TUNEBOOK_KEY_TO_URL.items(): + m = re.search(rf'href=["\']({tb_url})["\']', html) + if key in old_keys: + assert m is None + else: + if m is None: + raise ValueError(f"Could not find link for tunebook {key!r} in tunebooks page.") + + +@pytest.mark.parametrize("key", eskin._TUNEBOOK_KEY_TO_URL) +def test_eskin_tunebook_data_load(key): + df = eskin.load_meta(key) + + tune_group_keys = { + "airs_songs", + "hornpipes", + "jigs", + "long_dances", + "marches", + "misc_tunes", + "ocarolan", + "polkas", + "reels", + "scotchreels", + "slides", + "slipjigs", + "strathspeys", + "waltzes", + } + + if key in {"kss"}: + assert set(df.group.unique()) <= tune_group_keys + else: + assert df.group.unique().tolist() == ["tunes"] + + +def test_eskin_abc_url_parsing(): + # From https://michaeleskin.com/cce_sd/cce_san_diego_tunes_10nov2025.html + url = "https://michaeleskin.com/abctools/abctools.html?lzw=BoLgjAUApFAuCWsA2BTAZgewHawAQAUBDJQhLDXMADmigGcBXAIwWXWzyJLIrAGZa8LJkw4CxUkN4CYAB0IAnWHVGcJPSjLgoAtrIyrx3KZtqwUAD1iGuk8qYAqIBwAsUuAIJMmKAJ64IABlwAHoaAFkQABYQqIgARRAAJliAXgBOAAYIACUQHJQUJGg6AHchAHNcTIA6SABpEABxaEImAGMAKzoAfToMBiwAE0M0UiYMX1pwgEkAERncWQUMCoVCHWrp+cWmQjo6ZdWtmFmF3HaXDAUho6rs053cPYOANwwkXAA2OMfzy+uQ3enx+rSGQx6xCQPVkJF8e3aAGsekghIi6BAAEQeHSYzx8XAAIU8SVwTQAorgAD6eDxNDy4TFNPGE8HEmnY3G0jzEunkgBi1MZzLJTXpRLZ1KxOLxHlJPJJZMpNI8dIZTJZko5MsVCr5go5IrF4tZQyqVKp0q5KAqttwhFJeyFGtwFTaVUIFRQQ2dOptdodrsIzuZTDdaFd7iGpMtnLx-o9FTDIcxnuTnu9vutto9pLdKbDhAjXqG7IAukA&format=noten&ssp=10&name=The_Abbey&play=1" + + # default: explicit prefixes + abc = eskin.abctools_url_to_abc(url) + + # any % + abc_rm_any_pct = eskin.abctools_url_to_abc(url, remove_prefs="%") + + # no remove + abc_no_rm = eskin.abctools_url_to_abc(url, remove_prefs=False) + + assert abc == abc_rm_any_pct + assert sum(line.startswith("%") for line in abc_no_rm.splitlines()) > 0 + assert sum(line.startswith(r"%%") for line in abc_no_rm.splitlines()) > 0 + + +def test_eskin_abc_url_missing_param(): + url = "https://michaeleskin.com/abctools/abctools.html?" + with pytest.raises(ValueError, match="URL does not contain required 'lzw' parameter"): + _ = eskin.abctools_url_to_abc(url) + + +def test_eskin_abc_url_bad_param(): + url = "https://michaeleskin.com/abctools/abctools.html?lzw=hi" + with pytest.raises(RuntimeError, match="Failed to decompress LZString data"): + _ = eskin.abctools_url_to_abc(url) + + +def test_eskin_abc_url_bad(caplog): + url = "https://michaeleski.com/deftools/abctools.html?lzw=BoLgjAUApFAuCWsA2BTAZgewHawAQAUBDJQhLDXMADmigGcBXAIwWXWzyJLIrAGZa8LJkw4CxUkN4CYAB0IAnWHVGcJPSjLgoAtrIyrx3KZtqwUAD1iGuk8qYAqIBwAsUuAIJMmKAJ64IABlwAHoaAFkQABYQqIgARRAAJliAXgBOAAYIACUQHJQUJGg6AHchAHNcTIA6SABpEABxaEImAGMAKzoAfToMBiwAE0M0UiYMX1pwgEkAERncWQUMCoVCHWrp+cWmQjo6ZdWtmFmF3HaXDAUho6rs053cPYOANwwkXAA2OMfzy+uQ3enx+rSGQx6xCQPVkJF8e3aAGsekghIi6BAAEQeHSYzx8XAAIU8SVwTQAorgAD6eDxNDy4TFNPGE8HEmnY3G0jzEunkgBi1MZzLJTXpRLZ1KxOLxHlJPJJZMpNI8dIZTJZko5MsVCr5go5IrF4tZQyqVKp0q5KAqttwhFJeyFGtwFTaVUIFRQQ2dOptdodrsIzuZTDdaFd7iGpMtnLx-o9FTDIcxnuTnu9vutto9pLdKbDhAjXqG7IAukA&format=noten&ssp=10&name=The_Abbey&play=1" + with caplog.at_level("DEBUG"): + _ = eskin.abctools_url_to_abc(url) + + assert caplog.messages == [ + "Unexpected Eskin URL netloc: michaeleski.com", + "Unexpected Eskin URL path: /deftools/abctools.html", + ] + + +def test_eskin_abc_url_creation(): + import requests + + abc = load_example_abc("For the Love of Music") + + url = eskin.abc_to_abctools_url(abc) + r = requests.head(url, timeout=5) + r.raise_for_status() + if ( + r.status_code == 302 + and r.headers.get("Location", "").rstrip("/") == "https://michaeleskin.com" + ): + raise ValueError(f"URL {url} redirects to homepage") + + +def test_eskin_invalid_tunebook_key(): + with pytest.raises(ValueError, match="Unknown Eskin tunebook key: 'asdf'"): + _ = eskin.get_tunebook_info("asdf")