- Update version number - Added check for `skip-event` AKA chapters endpoint for 200 (OK) status code before attempting to get chapters, if not 200, return empty chapters
704 lines
32 KiB
Python
704 lines
32 KiB
Python
import base64
|
|
import hashlib
|
|
import json
|
|
import re
|
|
from codecs import Codec
|
|
from collections.abc import Generator
|
|
from datetime import datetime, timedelta
|
|
from http.cookiejar import CookieJar
|
|
from typing import Optional, Union
|
|
import click
|
|
from langcodes import Language
|
|
|
|
from unshackle.core.console import console
|
|
from unshackle.core.constants import AnyTrack
|
|
from unshackle.core.credential import Credential
|
|
from unshackle.core.manifests import DASH
|
|
from unshackle.core.search_result import SearchResult
|
|
from unshackle.core.service import Service
|
|
from unshackle.core.session import session
|
|
from unshackle.core.titles import Episode, Movie, Movies, Series, Title_T, Titles_T
|
|
from unshackle.core.tracks import Chapter, Subtitle, Tracks, Video, Chapters
|
|
|
|
class CR(Service):
|
|
|
|
"""
|
|
Service code for Crunchyroll
|
|
Author: TPD94
|
|
Version: 1.0.4
|
|
Authorization: Cookies for web endpoints, Credentials for TV endpoints, Cookies/Credentials for both. Cookies required.
|
|
Security: FHD@L3
|
|
Use Series ID/URL (for example - https://www.crunchyroll.com/series/GG5H5XQ7D/kaiju-no-8) or Series ID (for example - GG5H5XQ7D).
|
|
"""
|
|
|
|
@staticmethod
|
|
@click.command(name="CR", short_help="https://crunchyroll.com/",
|
|
help="""
|
|
Service code for Crunchyroll\n
|
|
Author: TPD94\n
|
|
Version: 1.0.4\n
|
|
Authorization: Cookies for web endpoints, Credentials for TV endpoints, Cookies/Credentials for both. Cookies required.\n
|
|
Security: FHD@L3\n
|
|
Use Series ID/URL (for example - https://www.crunchyroll.com/series/GG5H5XQ7D/kaiju-no-8) or Series ID (for example - GG5H5XQ7D).
|
|
"""
|
|
)
|
|
@click.argument("title", type=str)
|
|
@click.pass_context
|
|
|
|
def cli(ctx, **kwargs):
|
|
return CR(ctx, **kwargs)
|
|
|
|
def __init__(self, ctx, title):
|
|
super().__init__(ctx)
|
|
|
|
# Extract Series ID from URL if applicable
|
|
match = re.search(r'crunchyroll\.com/series/([^/]+)', title)
|
|
|
|
# Get match if found, otherwise just use the users input for title
|
|
self.title = match.group(1) if match else title
|
|
|
|
# Initiate empty authorization token for network web requests
|
|
self.auth_token_web = None
|
|
|
|
# Initiate empty token expiry to store expiry time of authorization token above
|
|
self.auth_token_expiry_web = None
|
|
|
|
# Initiate empty authorization token for network TV requests
|
|
self.auth_token_tv = None
|
|
|
|
# Initiate empty token expiry to store expiry time of authorization token above
|
|
self.auth_token_expiry_tv = None
|
|
|
|
self.cookies = None
|
|
|
|
self.credential = None
|
|
|
|
self.initial_login = False
|
|
|
|
def get_session(self):
|
|
|
|
# Create a session using curl_cffi as it can impersonate browsers and avoid bot detection by Crunchyroll
|
|
return session("chrome124")
|
|
|
|
def authenticate(self, cookies: Optional[CookieJar] = None, credential: Optional[Credential] = None) -> None:
|
|
# Run the super method to load the cookies without writing redundant code
|
|
if not self.initial_login:
|
|
super().authenticate(cookies, credential)
|
|
if cookies:
|
|
self.cookies = cookies
|
|
elif hasattr(self, 'cookies'):
|
|
cookies = self.cookies
|
|
|
|
if credential:
|
|
self.credential = credential
|
|
elif hasattr(self, 'credential'):
|
|
credential = self.credential
|
|
|
|
self.initial_login = True
|
|
|
|
# Raise error if no cookies, Crunchyroll has implemented recaptcha, so authorization via credentials is not implemented
|
|
if not cookies and not self.initial_login:
|
|
raise EnvironmentError("Service requires cookies for authentication.")
|
|
|
|
# If authenticate is being called for the first time and cookies are present, retrieve an authorization token
|
|
if cookies and self.auth_token_web is None:
|
|
|
|
# Update the session with the loaded cookies.
|
|
self.session.cookies.update(cookies)
|
|
|
|
# Send the POST request for an authorization token
|
|
self.auth_token_web = self.session.post(
|
|
url=self.config['endpoints']['token'],
|
|
headers={
|
|
'Authorization': 'Basic bm9haWhkZXZtXzZpeWcwYThsMHE6' # This is generated by crunchyroll and is occasionally updated
|
|
},
|
|
data={
|
|
'device_id': 'b74d25a3-c507-4d89-b0d8-8276577b916f', # This is the default firefox device ID
|
|
'device_type': 'Firefox on Windows', # Device type really doesn't matter here, just using this to match
|
|
'grant_type': 'etp_rt_cookie',
|
|
}
|
|
).json()['access_token']
|
|
|
|
# Update the token expiry, Crunchyroll offers 5 minutes between expiry.
|
|
self.auth_token_expiry_web = datetime.now() + timedelta(minutes=4)
|
|
|
|
if not credential and not self.initial_login:
|
|
console.log("Only cookies detected, can only fetch web manifests")
|
|
|
|
if credential and self.auth_token_tv is None:
|
|
# Send the POST request for an authorization token
|
|
self.auth_token_tv = self.session.post(
|
|
url=self.config['endpoints']['token'],
|
|
headers={
|
|
'Content-Type': 'application/x-www-form-urlencoded',
|
|
},
|
|
data = {
|
|
'grant_type': 'password',
|
|
'username': credential.username,
|
|
'password': credential.password,
|
|
'scope': 'offline_access',
|
|
'client_id': 'anydazwaxclrocanwho3',
|
|
'client_secret': '88gnIsucV-Q7sYrY29uOW_JGlMqx1mBN',
|
|
'device_type': 'ANDROIDTV',
|
|
'device_id': 'b70699f1-94ae-4cc9-970b-1f58f3dff32c',
|
|
'device_name': 'emulator_x86_arm'
|
|
}
|
|
).json()['access_token']
|
|
|
|
# Update the token expiry, Crunchyroll offers 5 minutes between expiry.
|
|
self.auth_token_expiry_tv = datetime.now() + timedelta(minutes=4)
|
|
|
|
|
|
# If there is already an authorization token for web, and it is expired, get a new one
|
|
if self.auth_token_web is not None and datetime.now() > self.auth_token_expiry_web:
|
|
|
|
# Send the POST request for an authorization token
|
|
refresh_response = self.session.post(
|
|
url=self.config['endpoints']['token'],
|
|
headers={
|
|
'Authorization': 'Basic bm9haWhkZXZtXzZpeWcwYThsMHE6' # This is generated by crunchyroll and is occasionally updated
|
|
},
|
|
data={
|
|
'device_id': 'b74d25a3-c507-4d89-b0d8-8276577b916f', # This is the default firefox device ID
|
|
'device_type': 'Firefox on Windows', # Device type really doesn't matter here, just using this to match
|
|
'grant_type': 'etp_rt_cookie',
|
|
}
|
|
)
|
|
|
|
# Update the authorization token
|
|
self.auth_token_web = refresh_response.json()['access_token']
|
|
|
|
# Update the token expiry time
|
|
self.auth_token_expiry_web = datetime.now() + timedelta(minutes=4)
|
|
|
|
# If there is already an authorization token for TV, and it is expired, get a new one
|
|
if self.auth_token_tv is not None and datetime.now() > self.auth_token_expiry_tv:
|
|
|
|
# Send a POST request for an authorization token
|
|
refresh_response = self.session.post(
|
|
url=self.config['endpoints']['token'],
|
|
headers={
|
|
'Content-Type': 'application/x-www-form-urlencoded',
|
|
},
|
|
data={
|
|
'grant_type': 'password',
|
|
'username': self.credential.username,
|
|
'password': self.credential.password,
|
|
'scope': 'offline_access',
|
|
'client_id': 'anydazwaxclrocanwho3',
|
|
'client_secret': '88gnIsucV-Q7sYrY29uOW_JGlMqx1mBN',
|
|
'device_type': 'ANDROIDTV',
|
|
'device_id': 'b70699f1-94ae-4cc9-970b-1f58f3dff32c',
|
|
'device_name': 'emulator_x86_arm'
|
|
}
|
|
)
|
|
|
|
# Update the authorization token
|
|
self.auth_token_tv = refresh_response.json()['access_token']
|
|
|
|
# Update the token expiry time
|
|
self.auth_token_expiry_tv = datetime.now() + timedelta(minutes=4)
|
|
|
|
def get_titles(self) -> Titles_T:
|
|
|
|
# Initialize empty list for all season IDs of series.
|
|
season_ids = []
|
|
|
|
# Initialize a special counter for episodes labeled as 24.9, 12.5, etc. Devine/Unshackle does not allow floats.
|
|
special_counter = 1000
|
|
|
|
# Send a GET request to get the series info
|
|
series_response = self.session.get(
|
|
url=self.config['endpoints']['seasons'].format(series_id=self.title),
|
|
headers={
|
|
'Authorization': f'Bearer {self.auth_token_web}'
|
|
}
|
|
).json()
|
|
|
|
# Append each season to the season_ids list
|
|
for season in series_response['data']:
|
|
season_ids.append(season['id'])
|
|
|
|
# Initialize empty list for episodes
|
|
episodes = []
|
|
|
|
# Iterate through the season_ids
|
|
for season_id in season_ids:
|
|
|
|
# Send a GET request for the season to get episode information
|
|
episodes_response = self.session.get(
|
|
url=self.config['endpoints']['episodes'].format(season_id=season_id),
|
|
headers={
|
|
'Authorization': f'Bearer {self.auth_token_web}'
|
|
}
|
|
).json()
|
|
|
|
# Iterate through each episode in the season response
|
|
for episode in episodes_response['data']:
|
|
|
|
# If the episode doesn't have a number, or it is in a season not displayed in chronological order, add 1 to the special counter
|
|
if episode['episode_number'] is None or isinstance(episode['episode_number'], float):
|
|
special_counter += 1
|
|
|
|
# Append the episode to the episodes list
|
|
episodes.append(Episode(
|
|
id_=episode['id'],
|
|
service=self.__class__,
|
|
title=episode['series_title'],
|
|
season=int(episode['season_display_number']) if episode['season_display_number'] != '' else episode['season_sequence_number'] if episode['season_display_number'] == '' and episode['season_sequence_number'] == 1 else 1 if episode['season_sequence_number'] == 0 else 0,
|
|
number = episode['episode_number'] if isinstance(episode['episode_number'], int) else special_counter,
|
|
name=episode['title'] if episode['title'] else episode['season_title'],
|
|
year=episode['episode_air_date'][:4],
|
|
language=episode['audio_locale']
|
|
))
|
|
|
|
# Return the series
|
|
return Series(episodes)
|
|
|
|
def get_tracks(self, title: Title_T) -> Tracks:
|
|
|
|
# Initialize tracks class
|
|
tracks = Tracks()
|
|
|
|
# Initialize current_tracks list to keep track of tracks already added to the tracks class
|
|
current_tracks = []
|
|
|
|
# Initialize current_subtitles to avoid duplicated when iterating over all available versions of episodes
|
|
current_subtitles = []
|
|
|
|
# Send a GET request to the web endpoint to get episode response for web devices
|
|
episode_response = self.session.get(
|
|
url=self.config['endpoints']['episode_web'].format(episode=title.id),
|
|
headers={
|
|
'Authorization': f'Bearer {self.auth_token_web}'
|
|
}
|
|
).json()
|
|
|
|
# Grab the deletion token, opening the GET request above counts as a concurrent stream
|
|
# Crunchyroll only allows 3 concurrent streams to be open at once, so we most "close" the stream when all information needed has been extracted
|
|
deletion_token = episode_response['token']
|
|
|
|
# Initialize an empty list for different versions of the episode
|
|
available_versions = []
|
|
|
|
# Append all the available versions to the available_versions list
|
|
for version in episode_response['versions']:
|
|
available_versions.append(version['guid'])
|
|
|
|
# Update the session headers so that session may be passed to DASH.from_url()
|
|
self.session.headers.update({
|
|
'Authorization': f'Bearer {self.auth_token_web}'
|
|
})
|
|
|
|
# Add the original response tracks
|
|
tracks.add(DASH.from_url(url=episode_response['url'], session=self.session).to_tracks(language=episode_response['audioLocale']))
|
|
|
|
# Clear the sessions headers
|
|
self.session.headers.clear()
|
|
|
|
# Clear any subtitles included in the MPD, they will be retrieved manually
|
|
tracks.subtitles.clear()
|
|
|
|
# Iterate through subtitles available in the original response
|
|
for subtitle in episode_response['subtitles']:
|
|
|
|
# If the subtitle isn't none
|
|
if subtitle != 'none' and episode_response['audioLocale'] != 'en-US':
|
|
|
|
# Append the language to the current_subtitles list to keep track of which are already added to tracks
|
|
current_subtitles.append(episode_response['subtitles'][subtitle]['language'])
|
|
|
|
# Add the subtitle to the tracks object
|
|
tracks.add(Subtitle(
|
|
url=episode_response['subtitles'][subtitle]['url'],
|
|
codec=Subtitle.Codec.from_mime(episode_response['subtitles'][subtitle]['format']),
|
|
language=episode_response['subtitles'][subtitle]['language'],
|
|
))
|
|
|
|
# Update current_tracks with the tracks now in tracks object
|
|
for track in tracks:
|
|
current_tracks.append(track.id)
|
|
track.data['endpoint_type'] = 'web'
|
|
track.data['GUID'] = title.id
|
|
|
|
# Now that tracks have been extracted, close the "stream"
|
|
self.session.delete(
|
|
url=self.config['endpoints']['delete'].format(episode=title.id, delete_token=deletion_token),
|
|
headers={
|
|
'Authorization': f'Bearer {self.auth_token_web}'
|
|
}
|
|
)
|
|
|
|
if self.auth_token_tv is not None:
|
|
# Send a GET request to the Android/Google TV endpoint to get episode response for TV devices
|
|
episode_response = self.session.get(
|
|
url=self.config['endpoints']['episode_tv'].format(episode=title.id),
|
|
headers={
|
|
'Accept': 'application/json',
|
|
'Accept-Charset': 'UTF-8',
|
|
'Accept-Encoding': 'gzip',
|
|
'Authorization': f'Bearer {self.auth_token_tv}',
|
|
'User-Agent': 'Crunchyroll/ANDROIDTV/3.46.0_22275 (Android 16; en-US; AOSP TV on x86)',
|
|
},
|
|
).json()
|
|
|
|
# Grab the deletion token, opening the GET request above counts as a concurrent stream
|
|
# Crunchyroll only allows 3 concurrent streams to be open at once, so we most "close" the stream when all information needed has been extracted
|
|
deletion_token = episode_response['token']
|
|
|
|
## Update the headers to reflect TV device
|
|
self.session.headers.update({
|
|
'Accept': 'application/json',
|
|
'Accept-Charset': 'UTF-8',
|
|
'Accept-Encoding': 'gzip',
|
|
'Authorization': f'Bearer {self.auth_token_tv}',
|
|
'User-Agent': 'Crunchyroll/ANDROIDTV/3.46.0_22275 (Android 16; en-US; AOSP TV on x86)',
|
|
})
|
|
|
|
# Grab the new tracks into a variable
|
|
new_tracks = DASH.from_url(url=episode_response['url'], session=self.session).to_tracks(
|
|
language=episode_response['audioLocale'])
|
|
|
|
# Clear the subtitles
|
|
new_tracks.subtitles.clear()
|
|
|
|
# Iterate through the new_tracks
|
|
for track in new_tracks:
|
|
# See if the track already exists in current_tracks
|
|
if track.id not in current_tracks:
|
|
# Append the track ID if it doesn't
|
|
current_tracks.append(track.id)
|
|
# Add track endpoint type
|
|
track.data['endpoint_type'] = 'tv'
|
|
track.data['GUID'] = title.id
|
|
# Add the track to the tracks object
|
|
tracks.add(track)
|
|
|
|
# Iterate through subtitles available in the original response
|
|
for subtitle in episode_response['subtitles']:
|
|
|
|
# If the subtitle isn't none
|
|
if subtitle != 'none' and subtitle not in current_subtitles and episode_response['audioLocale'] != 'en-US':
|
|
current_subtitles.append(episode_response['subtitles'][subtitle]['language'])
|
|
|
|
# Add the subtitle to the tracks object
|
|
tracks.add(Subtitle(
|
|
url=episode_response['subtitles'][subtitle]['url'],
|
|
codec=Subtitle.Codec.from_mime(episode_response['subtitles'][subtitle]['format']),
|
|
language=episode_response['subtitles'][subtitle]['language'],
|
|
))
|
|
|
|
# Now that tracks have been extracted, close the "stream"
|
|
self.session.delete(
|
|
url=self.config['endpoints']['delete'].format(episode=title.id, delete_token=deletion_token),
|
|
headers={
|
|
'Authorization': f'Bearer {self.auth_token_tv}'
|
|
}
|
|
)
|
|
|
|
# Clear the headers for next iterations below
|
|
self.session.headers.clear()
|
|
|
|
for version in available_versions:
|
|
# Send a GET request to the web endpoint to get episode response for web devices
|
|
episode_response = self.session.get(
|
|
url=self.config['endpoints']['episode_web'].format(episode=version),
|
|
headers={
|
|
'Authorization': f'Bearer {self.auth_token_web}'
|
|
}
|
|
).json()
|
|
|
|
### TRY STATEMENT HERE FOR DEBUGGING ###
|
|
try:
|
|
# Grab the deletion token, opening the GET request above counts as a concurrent stream
|
|
# Crunchyroll only allows 3 concurrent streams to be open at once, so we most "close" the stream when all information needed has been extracted
|
|
deletion_token = episode_response['token']
|
|
|
|
# Update the session headers so that session may be passed to DASH.from_url()
|
|
self.session.headers.update({
|
|
'Authorization': f'Bearer {self.auth_token_web}'
|
|
})
|
|
|
|
# Grab the new tracks into a variable
|
|
new_tracks = DASH.from_url(url=episode_response['url'], session=self.session).to_tracks(language=episode_response['audioLocale'])
|
|
|
|
# Clear the subtitles on the tracks
|
|
new_tracks.subtitles.clear()
|
|
|
|
# Iterate through available subtitles
|
|
for subtitle in episode_response['subtitles']:
|
|
# If the subtitle isn't empty, continue
|
|
if subtitle != 'none':
|
|
# If the subtitle is not in current subtitles continue
|
|
if episode_response['subtitles'][subtitle]['language'] not in current_subtitles:
|
|
# Append the language to current subtitles list
|
|
current_subtitles.append(episode_response['subtitles'][subtitle]['language'])
|
|
# Add the subtitle to the tracks
|
|
tracks.add(Subtitle(
|
|
url=episode_response['subtitles'][subtitle]['url'],
|
|
codec=Subtitle.Codec.from_mime(episode_response['subtitles'][subtitle]['format']),
|
|
language=episode_response['subtitles'][subtitle]['language'],
|
|
))
|
|
# Iterate through the tracks
|
|
for track in new_tracks:
|
|
# if the track ID isn't in the current tracks continue
|
|
if track.id not in current_tracks:
|
|
# Append the new track ad
|
|
current_tracks.append(track.id)
|
|
# Add track endpoint type
|
|
track.data['endpoint_type'] = 'web'
|
|
# Add track GUID
|
|
track.data['GUID'] = version
|
|
# Add it
|
|
tracks.add(track)
|
|
# Now that tracks have been extracted, close the "stream"
|
|
self.session.delete(
|
|
url=self.config['endpoints']['delete'].format(episode=version, delete_token=deletion_token),
|
|
headers={
|
|
'Authorization': f'Bearer {self.auth_token_web}'
|
|
}
|
|
)
|
|
# Clear headers for the TV endpoint iteration
|
|
self.session.headers.clear()
|
|
except:
|
|
continue
|
|
if self.auth_token_tv is not None:
|
|
# Send a GET request to the Android/Google TV endpoint to get episode response for TV devices
|
|
episode_response = self.session.get(
|
|
url=self.config['endpoints']['episode_tv'].format(episode=version),
|
|
headers={
|
|
'Accept': 'application/json',
|
|
'Accept-Charset': 'UTF-8',
|
|
'Accept-Encoding': 'gzip',
|
|
'Authorization': f'Bearer {self.auth_token_tv}',
|
|
'User-Agent': 'Crunchyroll/ANDROIDTV/3.46.0_22275 (Android 16; en-US; AOSP TV on x86)',
|
|
},
|
|
).json()
|
|
|
|
# Grab the deletion token, opening the GET request above counts as a concurrent stream
|
|
# Crunchyroll only allows 3 concurrent streams to be open at once, so we most "close" the stream when all information needed has been extracted
|
|
deletion_token = episode_response['token']
|
|
|
|
## Update the headers to reflect TV device
|
|
self.session.headers.update({
|
|
'Accept': 'application/json',
|
|
'Accept-Charset': 'UTF-8',
|
|
'Accept-Encoding': 'gzip',
|
|
'Authorization': f'Bearer {self.auth_token_tv}',
|
|
'User-Agent': 'Crunchyroll/ANDROIDTV/3.46.0_22275 (Android 16; en-US; AOSP TV on x86)',
|
|
})
|
|
|
|
# Grab the new tracks into a variable
|
|
new_tracks = DASH.from_url(url=episode_response['url'], session=self.session).to_tracks(
|
|
language=episode_response['audioLocale'])
|
|
|
|
# Clear subtitles
|
|
new_tracks.subtitles.clear()
|
|
|
|
# Iterate through the new_tracks
|
|
for track in new_tracks:
|
|
# See if the track already exists in current_tracks
|
|
if track.id not in current_tracks:
|
|
# Append the track ID if it doesn't
|
|
current_tracks.append(track.id)
|
|
# Add the track endpoint type
|
|
track.data['endpoint_type'] = 'tv'
|
|
# Add track guid
|
|
track.data['GUID'] = version
|
|
# Add the track to the tracks object
|
|
tracks.add(track)
|
|
|
|
# Iterate through subtitles available in the original response
|
|
for subtitle in episode_response['subtitles']:
|
|
|
|
# If the subtitle isn't none
|
|
if subtitle != 'none' and subtitle not in current_subtitles:
|
|
# Add the subtitle to the tracks object
|
|
tracks.add(Subtitle(
|
|
url=episode_response['subtitles'][subtitle]['url'],
|
|
codec=Subtitle.Codec.from_mime(episode_response['subtitles'][subtitle]['format']),
|
|
language=episode_response['subtitles'][subtitle]['language'],
|
|
))
|
|
|
|
# Now that tracks have been extracted, close the "stream"
|
|
self.session.delete(
|
|
url=self.config['endpoints']['delete'].format(episode=version, delete_token=deletion_token),
|
|
headers={
|
|
'Authorization': f'Bearer {self.auth_token_tv}'
|
|
}
|
|
)
|
|
|
|
# Clear the headers for next iterations below
|
|
self.session.headers.clear()
|
|
|
|
return tracks
|
|
|
|
def get_chapters(self, title: Title_T) -> Chapters:
|
|
# All things created equal, this should be the same for all video tracks
|
|
|
|
# Initialize chapters class
|
|
chapters = Chapters()
|
|
|
|
# Send GET request for the chapters
|
|
chapter_response = self.session.get(
|
|
url=self.config['endpoints']['chapters'].format(episode=title.id),
|
|
)
|
|
|
|
if chapter_response.status_code == 200:
|
|
chapter_response = chapter_response.json()
|
|
# Check for intro chapter
|
|
if chapter_response.get('intro'):
|
|
try:
|
|
# Add the chapter, may not exist
|
|
chapters.add(Chapter(
|
|
timestamp=chapter_response['intro']['start'] * 1000,
|
|
name=chapter_response['intro']['type'].capitalize(),
|
|
))
|
|
# If it doesn't exist, move on to the next
|
|
except:
|
|
pass
|
|
|
|
# Check for the credits chapter
|
|
if chapter_response.get('credits'):
|
|
try:
|
|
# Add the chapter, may not exist
|
|
chapters.add(Chapter(
|
|
timestamp=chapter_response['credits']['start'] * 1000,
|
|
name=chapter_response['credits']['type'].capitalize(),
|
|
))
|
|
# If it doesn't exist, move on to the next
|
|
except:
|
|
pass
|
|
|
|
# Check for the preview chapter
|
|
if chapter_response.get('preview'):
|
|
try:
|
|
# Add the chapter, may not exist
|
|
chapters.add(Chapter(
|
|
timestamp=chapter_response['preview']['start'] * 1000,
|
|
name=chapter_response['preview']['type'].capitalize(),
|
|
))
|
|
# If it doesn't exist, move on to the next
|
|
except:
|
|
pass
|
|
|
|
# Check for recap chapter
|
|
if chapter_response.get('recap'):
|
|
try:
|
|
# Add the chapter, may not exist
|
|
chapters.add(Chapter(
|
|
timestamp=chapter_response['recap']['start'] * 1000,
|
|
name=chapter_response['recap']['type'].capitalize(),
|
|
))
|
|
# If it doesn't exist, move on to return statement
|
|
except:
|
|
pass
|
|
|
|
# Return the chapters
|
|
return chapters
|
|
|
|
def get_widevine_license(self, *, challenge: bytes, title: Title_T, track: AnyTrack) -> Optional[Union[bytes, str]]:
|
|
self.authenticate()
|
|
if track.data['endpoint_type'] == 'tv':
|
|
# Get the episode response
|
|
episode_response = self.session.get(
|
|
url=self.config['endpoints']['episode_tv'].format(episode=track.data['GUID']),
|
|
headers={
|
|
'Accept': 'application/json',
|
|
'Accept-Charset': 'UTF-8',
|
|
'Accept-Encoding': 'gzip',
|
|
'Authorization': f'Bearer {self.auth_token_tv}',
|
|
'User-Agent': 'Crunchyroll/ANDROIDTV/3.46.0_22275 (Android 16; en-US; AOSP TV on x86)',
|
|
},
|
|
).json()
|
|
|
|
# Grab the deletion token, opening the GET request above counts as a concurrent stream
|
|
# Crunchyroll only allows 3 concurrent streams to be open at once, so we most "close" the stream when all information needed has been extracted
|
|
deletion_token = episode_response['token']
|
|
|
|
# Send a post request to the license server
|
|
license_response = self.session.post(
|
|
url=self.config['endpoints']['license_tv'],
|
|
headers={
|
|
'Accept': 'application/octet-stream',
|
|
'Accept-Encoding': 'gzip',
|
|
'Authorization': f'Bearer {self.auth_token_tv}',
|
|
'Content-Type': 'application/octet-stream',
|
|
'Host': 'cr-license-proxy.prd.crunchyrollsvc.com',
|
|
'x-cr-content-id': track.data['GUID'],
|
|
'x-cr-video-token': deletion_token
|
|
},
|
|
data=challenge
|
|
)
|
|
|
|
# Now that license has been extracted, close the "stream"
|
|
self.session.delete(
|
|
url=self.config['endpoints']['delete'].format(episode=track.data['GUID'], delete_token=deletion_token),
|
|
headers={
|
|
'Authorization': f'Bearer {self.auth_token_tv}'
|
|
}
|
|
)
|
|
|
|
return license_response.content
|
|
|
|
if track.data['endpoint_type'] == 'web':
|
|
# Get the episode response
|
|
episode_response = self.session.get(
|
|
url=self.config['endpoints']['episode_web'].format(episode=track.data['GUID']),
|
|
headers={
|
|
'Authorization': f'Bearer {self.auth_token_web}'
|
|
}
|
|
).json()
|
|
|
|
# Grab the deletion token, opening the GET request above counts as a concurrent stream
|
|
# Crunchyroll only allows 3 concurrent streams to be open at once, so we most "close" the stream when all information needed has been extracted
|
|
deletion_token = episode_response['token']
|
|
|
|
# Get the license
|
|
license_response = self.session.post(
|
|
url=self.config['endpoints']['license_web'],
|
|
headers={
|
|
'Accept': '*/*',
|
|
'content-type': 'application/octet-stream',
|
|
'x-cr-content-id': track.data['GUID'],
|
|
'x-cr-video-token': deletion_token,
|
|
'authorization': f'Bearer {self.auth_token_web}'
|
|
},
|
|
data=challenge
|
|
).json()
|
|
|
|
# Now that license has been extracted, close the "stream"
|
|
self.session.delete(
|
|
url=self.config['endpoints']['delete'].format(episode=track.data['GUID'], delete_token=deletion_token),
|
|
headers={
|
|
'Authorization': f'Bearer {self.auth_token_web}'
|
|
}
|
|
)
|
|
|
|
return license_response['license']
|
|
|
|
def on_track_downloaded(self, track: AnyTrack) -> None:
|
|
if isinstance(track, Subtitle):
|
|
track.convert(codec=Subtitle.Codec.SubRip)
|
|
self.authenticate()
|
|
|
|
def search(self) -> Generator[SearchResult, None, None]:
|
|
|
|
# Get the search results
|
|
search_results = self.session.get(
|
|
url=self.config['endpoints']['search'].format(search_keyword=self.title),
|
|
headers={
|
|
'Authorization': f'Bearer {self.auth_token_web}',
|
|
}
|
|
).json()
|
|
|
|
# Iterate through series responses, create generator for results.
|
|
for result_type in search_results['data']:
|
|
if result_type['type'] == 'series':
|
|
for series_results in result_type['items']:
|
|
yield SearchResult(
|
|
id_=series_results['id'],
|
|
title=series_results['title'],
|
|
description=series_results['description']
|
|
)
|