New - Spotify Service

master
ToonsHub 2024-03-07 23:13:18 +01:00
parent 20d1d9cf62
commit 41b1e29ebb
3 changed files with 401 additions and 0 deletions

View File

@ -0,0 +1,272 @@
import base64
import re
import math
import json
import time
import datetime
import logging
from abc import ABCMeta, abstractmethod
from http.cookiejar import CookieJar
from typing import Iterable, Optional, Union
from urllib.parse import urlparse
import click
import requests
from requests.adapters import HTTPAdapter, Retry
from rich.padding import Padding
from rich.rule import Rule
from devine.core.service import Service
from devine.core.titles import Series, Movies, Movie, Episode
from devine.core.cacher import Cacher
from devine.core.config import config
from devine.core.console import console
from devine.core.constants import AnyTrack
from devine.core.credential import Credential
from devine.core.titles import Song, Album, Title_T, Titles_T
from devine.core.tracks import Video, Audio, Subtitle, Tracks
from devine.core.utilities import get_ip_info
from devine.core.manifests import HLS, DASH
from devine.core.drm import DRM_T, ClearKey, Widevine
from devine.utils import base62
from pywidevine.pssh import PSSH
class SPOT(Service):
"""
Service code for Spotify
Written by ToonsHub
Reference: https://github.com/glomatico/spotify-aac-downloader
Authorization: Cookies (Free - 128kbps and Premium - 256kbps)
Security: AAC@L3
"""
# Static method, this method belongs to the class
@staticmethod
# The command name, must much the service tag (and by extension the service folder)
@click.command(name="SPOT", short_help="https://open.spotify.com", help=__doc__)
# Using track/playlist/album/artist page URL
@click.argument("title", type=str)
# Pass the context back to the CLI with arguments
@click.pass_context
def cli(ctx, **kwargs):
return SPOT(ctx, **kwargs)
# Accept the CLI arguments by overriding the constructor (The __init__() method)
def __init__(self, ctx, title):
# Pass the title argument to self so it's accessable across all methods
self.title = title
self.is_premium = False
# Overriding the constructor
super().__init__(ctx)
# Defining an authinticate function
def authenticate(self, cookies: Optional[CookieJar], credential: Optional[Credential] = None):
# Check for cookies
if not cookies:
raise Exception("Cookies are required for performing this action.")
# Authenticate using Cookies
self.session.headers.update(
{
'accept': 'application/json',
'accept-language': 'en',
"app-platform": "WebPlayer",
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/108.0.0.0 Safari/537.36",
}
)
self.session.cookies.update(cookies)
home_page = self.session.get("https://open.spotify.com/").text
token = re.search(r'accessToken":"(.*?)"', home_page).group(1)
self.is_premium = re.search(r'isPremium":(.*?),', home_page).group(1) == 'true'
self.session.headers.update(
{
"authorization": f"Bearer {token}",
}
)
# Function to determine the type of collection
def getCollectionTypeAndId(self):
_type = self.title.split("open.spotify.com/")[1].split("/")[0]
_id = self.title.split(_type + "/")[1].split("?")[0]
return _type, _id
# Defining a function to return titles
def get_titles(self):
songs = []
_type, _id = self.getCollectionTypeAndId()
if _type == 'album':
album = self.session.get(self.config['endpoints']['albums'].format(id=_id)).json()
album_next_url = album["tracks"]["next"]
while album_next_url is not None:
album_next = self.session.get(album_next_url).json()
album["tracks"]["items"].extend(album_next["items"])
album_next_url = album_next["next"]
# Get the episode metadata by iterating through each season id
for song in album["tracks"]["items"]:
# Set a class for each song
song_class = Song(
id_=song["id"],
name=song["name"],
artist=", ".join([ artist["name"] for artist in song["artists"] ]),
album=album["name"],
track=song["track_number"],
disc=song["disc_number"],
year=int(album["release_date"][:4].strip()),
service=self.__class__
)
# Append it to the list
songs.append(song_class)
elif _type == "playlist":
playlist = self.session.get(
self.config['endpoints']['playlists'].format(id=_id)
).json()
playlist_next_url = playlist["tracks"]["next"]
while playlist_next_url is not None:
playlist_next = self.session.get(playlist_next_url).json()
playlist["tracks"]["items"].extend(playlist_next["items"])
playlist_next_url = playlist_next["next"]
# Get the episode metadata by iterating through each season id
for song in playlist["tracks"]["items"]:
song = song["track"]
# Set a class for each song
song_class = Song(
id_=song["id"],
name=song["name"],
artist=", ".join([ artist["name"] for artist in song["artists"] ]),
album=song["album"]["name"],
track=song["track_number"],
disc=song["disc_number"],
year=int(song["album"]["release_date"][:4].strip()),
service=self.__class__
)
# Append it to the list
songs.append(song_class)
elif _type == "artist":
playlist = self.session.get(
self.config['endpoints']['artists'].format(id=_id)
).json()
# Get the episode metadata by iterating through each season id
for song in playlist["tracks"]:
# Set a class for each song
song_class = Song(
id_=song["id"],
name=song["name"],
artist=", ".join([ artist["name"] for artist in song["artists"] ]),
album=song["album"]["name"],
track=song["track_number"],
disc=song["disc_number"],
year=int(song["album"]["release_date"][:4].strip()),
service=self.__class__
)
# Append it to the list
songs.append(song_class)
elif _type == "track":
song = self.session.get(
self.config['endpoints']['tracks'].format(id=_id)
).json()
# Set a class for each song
song_class = Song(
id_=song["id"],
name=song["name"],
artist=", ".join([ artist["name"] for artist in song["artists"] ]),
album=song["album"]["name"],
track=song["track_number"],
disc=song["disc_number"],
year=int(song["album"]["release_date"][:4].strip()),
service=self.__class__
)
# Append it to the list
songs.append(song_class)
return Album(songs)
# Get DRM
def get_spotify_drm(self) -> DRM_T:
pssh = requests.get(
self.config['endpoints']['pssh'].format(file_id=self.file_id)
).json()["pssh"]
return Widevine(
pssh=PSSH(pssh)
)
# Defining a function to get tracks
def get_tracks(self, title: Title_T) -> Tracks:
self.audio_quality = "MP4_256_DUAL" if self.is_premium else "MP4_128_DUAL"
# Get FileID
gid = hex(base62.decode(title.id, base62.CHARSET_INVERTED))[2:].zfill(32)
metadata = self.session.get(
self.config['endpoints']['metadata'].format(gid=gid)
).json()
audio_files = metadata.get("file")
if audio_files is None:
if metadata.get("alternative") is not None:
audio_files = metadata["alternative"][0]["file"]
else:
return None
self.file_id = next(
i["file_id"] for i in audio_files if i["format"] == self.audio_quality
)
# Get stream URL
stream_url = self.session.get(
self.config['endpoints']['stream'].format(file_id=self.file_id)
).json()["cdnurl"][2]
# Get & Set DRM
drm = [self.get_spotify_drm()]
# Set the tracks
tracks = Tracks()
tracks.add(Audio(
url=stream_url,
drm=drm,
codec=Audio.Codec.AAC,
language=metadata.get("language_of_performance", ["en"])[0],
bitrate=256000 if self.is_premium else 128000,
channels=2
))
# Return the tracks
return tracks
# Defining a function to get chapters
def get_chapters(self, title):
return []
# Defining a function to get widevine license keys
def get_widevine_license(self, *, challenge: bytes, title: Title_T, track: AnyTrack) -> Optional[Union[bytes, str]]:
# Send the post request to the license server
license_raw = self.session.post(
self.config['endpoints']['license'],
data=challenge
)
# Return the license
return base64.b64encode(license_raw.content).decode()

View File

@ -0,0 +1,9 @@
endpoints:
albums: https://api.spotify.com/v1/albums/{id}
playlists: https://api.spotify.com/v1/playlists/{id}
artists: https://api.spotify.com/v1/artists/{id}/top-tracks
tracks: https://api.spotify.com/v1/tracks/{id}
pssh: https://seektables.scdn.co/seektable/{file_id}.json
metadata: https://spclient.wg.spotify.com/metadata/4/track/{gid}?market=from_token
stream: https://gue1-spclient.spotify.com/storage-resolve/v2/files/audio/interactive/11/{file_id}?version=10000000&product=9&platform=39&alt=json
license: https://gae2-spclient.spotify.com/widevine-license/v1/audio/license

120
devine/utils/base62.py Normal file
View File

@ -0,0 +1,120 @@
# -*- coding: utf-8 -*-
"""
base62
~~~~~~
Originated from http://blog.suminb.com/archives/558
"""
__title__ = "base62"
__author__ = "Sumin Byeon"
__email__ = "suminb@gmail.com"
__version__ = "1.0.0"
BASE = 62
CHARSET_DEFAULT = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
CHARSET_INVERTED = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
def encode(n, charset=CHARSET_DEFAULT):
"""Encodes a given integer ``n``."""
chs = []
while n > 0:
n, r = divmod(n, BASE)
chs.insert(0, charset[r])
if not chs:
return "0"
return "".join(chs)
def encodebytes(barray, charset=CHARSET_DEFAULT):
"""Encodes a bytestring into a base62 string.
:param barray: A byte array
:type barray: bytes
:rtype: str
"""
_check_type(barray, bytes)
# Count the number of leading zeros.
leading_zeros_count = 0
for i in range(len(barray)):
if barray[i] != 0:
break
leading_zeros_count += 1
# Encode the leading zeros as "0" followed by a character indicating the count.
# This pattern may occur several times if there are many leading zeros.
n, r = divmod(leading_zeros_count, len(charset) - 1)
zero_padding = f"0{charset[-1]}" * n
if r:
zero_padding += f"0{charset[r]}"
# Special case: the input is empty, or is entirely null bytes.
if leading_zeros_count == len(barray):
return zero_padding
value = encode(int.from_bytes(barray, "big"), charset=charset)
return zero_padding + value
def decode(encoded, charset=CHARSET_DEFAULT):
"""Decodes a base62 encoded value ``encoded``.
:type encoded: str
:rtype: int
"""
_check_type(encoded, str)
l, i, v = len(encoded), 0, 0
for x in encoded:
v += _value(x, charset=charset) * (BASE ** (l - (i + 1)))
i += 1
return v
def decodebytes(encoded, charset=CHARSET_DEFAULT):
"""Decodes a string of base62 data into a bytes object.
:param encoded: A string to be decoded in base62
:type encoded: str
:rtype: bytes
"""
leading_null_bytes = b""
while encoded.startswith("0") and len(encoded) >= 2:
leading_null_bytes += b"\x00" * _value(encoded[1], charset)
encoded = encoded[2:]
decoded = decode(encoded, charset=charset)
buf = bytearray()
while decoded > 0:
buf.append(decoded & 0xFF)
decoded //= 256
buf.reverse()
return leading_null_bytes + bytes(buf)
def _value(ch, charset):
"""Decodes an individual digit of a base62 encoded string."""
try:
return charset.index(ch)
except ValueError:
raise ValueError("base62: Invalid character (%s)" % ch)
def _check_type(value, expected_type):
"""Checks if the input is in an appropriate type."""
if not isinstance(value, expected_type):
msg = "Expected {} object, not {}".format(
expected_type, value.__class__.__name__
)
raise TypeError(msg)