Added function to regex check the episodes season title, if the season title has "Season X" sequentially in the name, then that will be used as the season example - https://www.crunchyroll.com/series/G79H23ZGP/tales-of-wedding-rings
		
			
				
	
	
		
			707 lines
		
	
	
		
			32 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			707 lines
		
	
	
		
			32 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import base64
 | 
						|
import hashlib
 | 
						|
import json
 | 
						|
import re
 | 
						|
from codecs import Codec
 | 
						|
from collections.abc import Generator
 | 
						|
from datetime import datetime, timedelta
 | 
						|
from http.cookiejar import CookieJar
 | 
						|
from typing import Optional, Union
 | 
						|
import click
 | 
						|
from langcodes import Language
 | 
						|
 | 
						|
from unshackle.core.console import console
 | 
						|
from unshackle.core.constants import AnyTrack
 | 
						|
from unshackle.core.credential import Credential
 | 
						|
from unshackle.core.manifests import DASH
 | 
						|
from unshackle.core.search_result import SearchResult
 | 
						|
from unshackle.core.service import Service
 | 
						|
from unshackle.core.session import session
 | 
						|
from unshackle.core.titles import Episode, Movie, Movies, Series, Title_T, Titles_T
 | 
						|
from unshackle.core.tracks import Chapter, Subtitle, Tracks, Video, Chapters
 | 
						|
 | 
						|
class CR(Service):
 | 
						|
 | 
						|
    """
 | 
						|
    Service code for Crunchyroll
 | 
						|
    Author: TPD94
 | 
						|
    Version: 1.0.6
 | 
						|
    Authorization: Cookies for web endpoints, Credentials for TV endpoints, Cookies/Credentials for both. Cookies required.
 | 
						|
    Security: FHD@L3
 | 
						|
    Use Series ID/URL (for example - https://www.crunchyroll.com/series/GG5H5XQ7D/kaiju-no-8) or Series ID (for example - GG5H5XQ7D).
 | 
						|
    """
 | 
						|
 | 
						|
    @staticmethod
 | 
						|
    @click.command(name="CR", short_help="https://crunchyroll.com/",
 | 
						|
                   help="""
 | 
						|
                       Service code for Crunchyroll\n
 | 
						|
                       Author: TPD94\n
 | 
						|
                       Version: 1.0.6\n
 | 
						|
                       Authorization: Cookies for web endpoints, Credentials for TV endpoints, Cookies/Credentials for both. Cookies required.\n
 | 
						|
                       Security: FHD@L3\n
 | 
						|
                       Use Series ID/URL (for example - https://www.crunchyroll.com/series/GG5H5XQ7D/kaiju-no-8) or Series ID (for example - GG5H5XQ7D).
 | 
						|
                       """
 | 
						|
                   )
 | 
						|
    @click.argument("title", type=str)
 | 
						|
    @click.pass_context
 | 
						|
 | 
						|
    def cli(ctx, **kwargs):
 | 
						|
        return CR(ctx, **kwargs)
 | 
						|
 | 
						|
    def __init__(self, ctx, title):
 | 
						|
        super().__init__(ctx)
 | 
						|
 | 
						|
        # Extract Series ID from URL if applicable
 | 
						|
        match = re.search(r'crunchyroll\.com/series/([^/]+)', title)
 | 
						|
 | 
						|
        # Get match if found, otherwise just use the users input for title
 | 
						|
        self.title = match.group(1) if match else title
 | 
						|
 | 
						|
        # Initiate empty authorization token for network web requests
 | 
						|
        self.auth_token_web = None
 | 
						|
 | 
						|
        # Initiate empty token expiry to store expiry time of authorization token above
 | 
						|
        self.auth_token_expiry_web = None
 | 
						|
 | 
						|
        # Initiate empty authorization token for network TV requests
 | 
						|
        self.auth_token_tv = None
 | 
						|
 | 
						|
        # Initiate empty token expiry to store expiry time of authorization token above
 | 
						|
        self.auth_token_expiry_tv = None
 | 
						|
 | 
						|
        self.cookies = None
 | 
						|
 | 
						|
        self.credential = None
 | 
						|
 | 
						|
        self.initial_login = False
 | 
						|
 | 
						|
    def extract_season_number_from_string(self, season_title: str) -> int | None:
 | 
						|
        match = re.search(r'\bSeason\s+(\d+)\b', season_title, re.IGNORECASE)
 | 
						|
        return int(match.group(1)) if match else None
 | 
						|
 | 
						|
    def get_session(self):
 | 
						|
 | 
						|
        # Create a session using curl_cffi as it can impersonate browsers and avoid bot detection by Crunchyroll
 | 
						|
        return session("chrome124")
 | 
						|
 | 
						|
    def authenticate(self, cookies: Optional[CookieJar] = None, credential: Optional[Credential] = None) -> None:
 | 
						|
        # Run the super method to load the cookies without writing redundant code
 | 
						|
        if not self.initial_login:
 | 
						|
            super().authenticate(cookies, credential)
 | 
						|
            # Raise error if no cookies, Crunchyroll has implemented recaptcha, so authorization via credentials is not implemented
 | 
						|
            if not cookies and not self.initial_login:
 | 
						|
                raise EnvironmentError("Service requires cookies for authentication.")
 | 
						|
            if cookies:
 | 
						|
                self.cookies = cookies
 | 
						|
            elif hasattr(self, 'cookies'):
 | 
						|
                cookies = self.cookies
 | 
						|
 | 
						|
            if credential:
 | 
						|
                self.credential = credential
 | 
						|
            elif hasattr(self, 'credential'):
 | 
						|
                credential = self.credential
 | 
						|
 | 
						|
            self.initial_login = True
 | 
						|
 | 
						|
        # If authenticate is being called for the first time and cookies are present, retrieve an authorization token
 | 
						|
        if cookies and self.auth_token_web is None:
 | 
						|
 | 
						|
            # Update the session with the loaded cookies.
 | 
						|
            self.session.cookies.update(cookies)
 | 
						|
 | 
						|
            # Send the POST request for an authorization token
 | 
						|
            self.auth_token_web = self.session.post(
 | 
						|
                url=self.config['endpoints']['token'],
 | 
						|
                headers={
 | 
						|
                    'Authorization': 'Basic bm9haWhkZXZtXzZpeWcwYThsMHE6' # This is generated by crunchyroll and is occasionally updated
 | 
						|
                },
 | 
						|
                data={
 | 
						|
                    'device_id': 'b74d25a3-c507-4d89-b0d8-8276577b916f', # This is the default firefox device ID
 | 
						|
                    'device_type': 'Firefox on Windows', # Device type really doesn't matter here, just using this to match
 | 
						|
                    'grant_type': 'etp_rt_cookie',
 | 
						|
                }
 | 
						|
            ).json()['access_token']
 | 
						|
 | 
						|
            # Update the token expiry, Crunchyroll offers 5 minutes between expiry.
 | 
						|
            self.auth_token_expiry_web = datetime.now() + timedelta(minutes=4)
 | 
						|
 | 
						|
        if not credential and not self.initial_login:
 | 
						|
            console.log("Only cookies detected, can only fetch web manifests")
 | 
						|
 | 
						|
        if credential and self.auth_token_tv is None:
 | 
						|
            # Send the POST request for an authorization token
 | 
						|
            self.auth_token_tv = self.session.post(
 | 
						|
                url=self.config['endpoints']['token'],
 | 
						|
                headers={
 | 
						|
                    'Content-Type': 'application/x-www-form-urlencoded',
 | 
						|
                },
 | 
						|
                data = {
 | 
						|
                    'grant_type': 'password',
 | 
						|
                    'username': credential.username,
 | 
						|
                    'password': credential.password,
 | 
						|
                    'scope': 'offline_access',
 | 
						|
                    'client_id': 'anydazwaxclrocanwho3',
 | 
						|
                    'client_secret': '88gnIsucV-Q7sYrY29uOW_JGlMqx1mBN',
 | 
						|
                    'device_type': 'ANDROIDTV',
 | 
						|
                    'device_id': 'b70699f1-94ae-4cc9-970b-1f58f3dff32c',
 | 
						|
                    'device_name': 'emulator_x86_arm'
 | 
						|
                }
 | 
						|
            ).json()['access_token']
 | 
						|
 | 
						|
            # Update the token expiry, Crunchyroll offers 5 minutes between expiry.
 | 
						|
            self.auth_token_expiry_tv = datetime.now() + timedelta(minutes=4)
 | 
						|
 | 
						|
 | 
						|
        # If there is already an authorization token for web, and it is expired, get a new one
 | 
						|
        if self.auth_token_web is not None and datetime.now() > self.auth_token_expiry_web:
 | 
						|
 | 
						|
            # Send the POST request for an authorization token
 | 
						|
            refresh_response = self.session.post(
 | 
						|
                url=self.config['endpoints']['token'],
 | 
						|
                headers={
 | 
						|
                    'Authorization': 'Basic bm9haWhkZXZtXzZpeWcwYThsMHE6' # This is generated by crunchyroll and is occasionally updated
 | 
						|
                },
 | 
						|
                data={
 | 
						|
                    'device_id': 'b74d25a3-c507-4d89-b0d8-8276577b916f', # This is the default firefox device ID
 | 
						|
                    'device_type': 'Firefox on Windows', # Device type really doesn't matter here, just using this to match
 | 
						|
                    'grant_type': 'etp_rt_cookie',
 | 
						|
                }
 | 
						|
            )
 | 
						|
 | 
						|
            # Update the authorization token
 | 
						|
            self.auth_token_web = refresh_response.json()['access_token']
 | 
						|
 | 
						|
            # Update the token expiry time
 | 
						|
            self.auth_token_expiry_web = datetime.now() + timedelta(minutes=4)
 | 
						|
 | 
						|
        # If there is already an authorization token for TV, and it is expired, get a new one
 | 
						|
        if self.auth_token_tv is not None and datetime.now() > self.auth_token_expiry_tv:
 | 
						|
 | 
						|
            # Send a POST request for an authorization token
 | 
						|
            refresh_response = self.session.post(
 | 
						|
                url=self.config['endpoints']['token'],
 | 
						|
                headers={
 | 
						|
                    'Content-Type': 'application/x-www-form-urlencoded',
 | 
						|
                },
 | 
						|
                data={
 | 
						|
                    'grant_type': 'password',
 | 
						|
                    'username': self.credential.username,
 | 
						|
                    'password': self.credential.password,
 | 
						|
                    'scope': 'offline_access',
 | 
						|
                    'client_id': 'anydazwaxclrocanwho3',
 | 
						|
                    'client_secret': '88gnIsucV-Q7sYrY29uOW_JGlMqx1mBN',
 | 
						|
                    'device_type': 'ANDROIDTV',
 | 
						|
                    'device_id': 'b70699f1-94ae-4cc9-970b-1f58f3dff32c',
 | 
						|
                    'device_name': 'emulator_x86_arm'
 | 
						|
                }
 | 
						|
            )
 | 
						|
 | 
						|
            # Update the authorization token
 | 
						|
            self.auth_token_tv = refresh_response.json()['access_token']
 | 
						|
 | 
						|
            # Update the token expiry time
 | 
						|
            self.auth_token_expiry_tv = datetime.now() + timedelta(minutes=4)
 | 
						|
 | 
						|
    def get_titles(self) -> Titles_T:
 | 
						|
 | 
						|
        # Initialize empty list for all season IDs of series.
 | 
						|
        season_ids = []
 | 
						|
 | 
						|
        # Initialize a special counter for episodes labeled as 24.9, 12.5, etc. Devine/Unshackle does not allow floats.
 | 
						|
        special_counter = 1000
 | 
						|
 | 
						|
        # Send a GET request to get the series info
 | 
						|
        series_response = self.session.get(
 | 
						|
            url=self.config['endpoints']['seasons'].format(series_id=self.title),
 | 
						|
            headers={
 | 
						|
                'Authorization': f'Bearer {self.auth_token_web}'
 | 
						|
            }
 | 
						|
        ).json()
 | 
						|
 | 
						|
        # Append each season to the season_ids list
 | 
						|
        for season in series_response['data']:
 | 
						|
            season_ids.append(season['id'])
 | 
						|
 | 
						|
        # Initialize empty list for episodes
 | 
						|
        episodes = []
 | 
						|
 | 
						|
        # Iterate through the season_ids
 | 
						|
        for season_id in season_ids:
 | 
						|
 | 
						|
            # Send a GET request for the season to get episode information
 | 
						|
            episodes_response = self.session.get(
 | 
						|
                url=self.config['endpoints']['episodes'].format(season_id=season_id),
 | 
						|
                headers={
 | 
						|
                    'Authorization': f'Bearer {self.auth_token_web}'
 | 
						|
                }
 | 
						|
            ).json()
 | 
						|
 | 
						|
            # Iterate through each episode in the season response
 | 
						|
            for episode in episodes_response['data']:
 | 
						|
 | 
						|
                # If the episode doesn't have a number, or it is in a season not displayed in chronological order, add 1 to the special counter
 | 
						|
                if episode['episode_number'] is None or isinstance(episode['episode_number'], float):
 | 
						|
                    special_counter += 1
 | 
						|
 | 
						|
                # Append the episode to the episodes list
 | 
						|
                episodes.append(Episode(
 | 
						|
                    id_=episode['id'],
 | 
						|
                    service=self.__class__,
 | 
						|
                    title=episode['series_title'],
 | 
						|
                    season=int(episode['season_display_number']) if episode['season_display_number'] != '' else self.extract_season_number_from_string(episode['season_title']) if self.extract_season_number_from_string(episode['season_title']) is not None else episode['season_sequence_number'] if episode['season_display_number'] == '' and episode['season_sequence_number'] == 1 else 1 if episode['season_sequence_number'] == 0 else 0,
 | 
						|
                    number = episode['episode_number'] if isinstance(episode['episode_number'], int) else special_counter,
 | 
						|
                    name=episode['title'] if episode['title'] else episode['season_title'],
 | 
						|
                    year=episode['episode_air_date'][:4],
 | 
						|
                    language=episode['audio_locale']
 | 
						|
                ))
 | 
						|
 | 
						|
        # Return the series
 | 
						|
        return Series(episodes)
 | 
						|
 | 
						|
    def get_tracks(self, title: Title_T) -> Tracks:
 | 
						|
 | 
						|
        # Initialize tracks class
 | 
						|
        tracks = Tracks()
 | 
						|
 | 
						|
        # Initialize current_tracks list to keep track of tracks already added to the tracks class
 | 
						|
        current_tracks = []
 | 
						|
 | 
						|
        # Initialize current_subtitles to avoid duplicated when iterating over all available versions of episodes
 | 
						|
        current_subtitles = []
 | 
						|
 | 
						|
        # Send a GET request to the web endpoint to get episode response for web devices
 | 
						|
        episode_response = self.session.get(
 | 
						|
            url=self.config['endpoints']['episode_web'].format(episode=title.id),
 | 
						|
            headers={
 | 
						|
                'Authorization': f'Bearer {self.auth_token_web}'
 | 
						|
            }
 | 
						|
        ).json()
 | 
						|
 | 
						|
        # Grab the deletion token, opening the GET request above counts as a concurrent stream
 | 
						|
        # Crunchyroll only allows 3 concurrent streams to be open at once, so we most "close" the stream when all information needed has been extracted
 | 
						|
        deletion_token = episode_response['token']
 | 
						|
 | 
						|
        # Initialize an empty list for different versions of the episode
 | 
						|
        available_versions = []
 | 
						|
 | 
						|
        # Append all the available versions to the available_versions list
 | 
						|
        for version in episode_response['versions']:
 | 
						|
            available_versions.append(version['guid'])
 | 
						|
 | 
						|
        # Update the session headers so that session may be passed to DASH.from_url()
 | 
						|
        self.session.headers.update({
 | 
						|
            'Authorization': f'Bearer {self.auth_token_web}'
 | 
						|
        })
 | 
						|
 | 
						|
        # Add the original response tracks
 | 
						|
        tracks.add(DASH.from_url(url=episode_response['url'], session=self.session).to_tracks(language=episode_response['audioLocale']))
 | 
						|
 | 
						|
        # Clear the sessions headers
 | 
						|
        self.session.headers.clear()
 | 
						|
 | 
						|
        # Clear any subtitles included in the MPD, they will be retrieved manually
 | 
						|
        tracks.subtitles.clear()
 | 
						|
 | 
						|
        # Iterate through subtitles available in the original response
 | 
						|
        for subtitle in episode_response['subtitles']:
 | 
						|
 | 
						|
            # If the subtitle isn't none
 | 
						|
            if subtitle != 'none' and episode_response['audioLocale'] != 'en-US':
 | 
						|
 | 
						|
                # Append the language to the current_subtitles list to keep track of which are already added to tracks
 | 
						|
                current_subtitles.append(episode_response['subtitles'][subtitle]['language'])
 | 
						|
 | 
						|
                # Add the subtitle to the tracks object
 | 
						|
                tracks.add(Subtitle(
 | 
						|
                    url=episode_response['subtitles'][subtitle]['url'],
 | 
						|
                    codec=Subtitle.Codec.from_mime(episode_response['subtitles'][subtitle]['format']),
 | 
						|
                    language=episode_response['subtitles'][subtitle]['language'],
 | 
						|
                ))
 | 
						|
 | 
						|
        # Update current_tracks with the tracks now in tracks object
 | 
						|
        for track in tracks:
 | 
						|
            current_tracks.append(track.id)
 | 
						|
            track.data['endpoint_type'] = 'web'
 | 
						|
            track.data['GUID'] = title.id
 | 
						|
 | 
						|
        # Now that tracks have been extracted, close the "stream"
 | 
						|
        self.session.delete(
 | 
						|
            url=self.config['endpoints']['delete'].format(episode=title.id, delete_token=deletion_token),
 | 
						|
            headers={
 | 
						|
                'Authorization': f'Bearer {self.auth_token_web}'
 | 
						|
            }
 | 
						|
        )
 | 
						|
 | 
						|
        if self.auth_token_tv is not None:
 | 
						|
            # Send a GET request to the Android/Google TV endpoint to get episode response for TV devices
 | 
						|
            episode_response = self.session.get(
 | 
						|
                url=self.config['endpoints']['episode_tv'].format(episode=title.id),
 | 
						|
                headers={
 | 
						|
                    'Accept': 'application/json',
 | 
						|
                    'Accept-Charset': 'UTF-8',
 | 
						|
                    'Accept-Encoding': 'gzip',
 | 
						|
                    'Authorization': f'Bearer {self.auth_token_tv}',
 | 
						|
                    'User-Agent': 'Crunchyroll/ANDROIDTV/3.46.0_22275 (Android 16; en-US; AOSP TV on x86)',
 | 
						|
                },
 | 
						|
            ).json()
 | 
						|
 | 
						|
            # Grab the deletion token, opening the GET request above counts as a concurrent stream
 | 
						|
            # Crunchyroll only allows 3 concurrent streams to be open at once, so we most "close" the stream when all information needed has been extracted
 | 
						|
            deletion_token = episode_response['token']
 | 
						|
 | 
						|
            ## Update the headers to reflect TV device
 | 
						|
            self.session.headers.update({
 | 
						|
                'Accept': 'application/json',
 | 
						|
                'Accept-Charset': 'UTF-8',
 | 
						|
                'Accept-Encoding': 'gzip',
 | 
						|
                'Authorization': f'Bearer {self.auth_token_tv}',
 | 
						|
                'User-Agent': 'Crunchyroll/ANDROIDTV/3.46.0_22275 (Android 16; en-US; AOSP TV on x86)',
 | 
						|
            })
 | 
						|
 | 
						|
            # Grab the new tracks into a variable
 | 
						|
            new_tracks = DASH.from_url(url=episode_response['url'], session=self.session).to_tracks(
 | 
						|
                language=episode_response['audioLocale'])
 | 
						|
 | 
						|
            # Clear the subtitles
 | 
						|
            new_tracks.subtitles.clear()
 | 
						|
 | 
						|
            # Iterate through the new_tracks
 | 
						|
            for track in new_tracks:
 | 
						|
                # See if the track already exists in current_tracks
 | 
						|
                if track.id not in current_tracks:
 | 
						|
                    # Append the track ID if it doesn't
 | 
						|
                    current_tracks.append(track.id)
 | 
						|
                    # Add track endpoint type
 | 
						|
                    track.data['endpoint_type'] = 'tv'
 | 
						|
                    track.data['GUID'] = title.id
 | 
						|
                    # Add the track to the tracks object
 | 
						|
                    tracks.add(track)
 | 
						|
 | 
						|
            # Iterate through subtitles available in the original response
 | 
						|
            for subtitle in episode_response['subtitles']:
 | 
						|
 | 
						|
                # If the subtitle isn't none
 | 
						|
                if subtitle != 'none' and subtitle not in current_subtitles and episode_response['audioLocale'] != 'en-US':
 | 
						|
                    current_subtitles.append(episode_response['subtitles'][subtitle]['language'])
 | 
						|
 | 
						|
                    # Add the subtitle to the tracks object
 | 
						|
                    tracks.add(Subtitle(
 | 
						|
                        url=episode_response['subtitles'][subtitle]['url'],
 | 
						|
                        codec=Subtitle.Codec.from_mime(episode_response['subtitles'][subtitle]['format']),
 | 
						|
                        language=episode_response['subtitles'][subtitle]['language'],
 | 
						|
                    ))
 | 
						|
 | 
						|
            # Now that tracks have been extracted, close the "stream"
 | 
						|
            self.session.delete(
 | 
						|
                url=self.config['endpoints']['delete'].format(episode=title.id, delete_token=deletion_token),
 | 
						|
                headers={
 | 
						|
                    'Authorization': f'Bearer {self.auth_token_tv}'
 | 
						|
                }
 | 
						|
            )
 | 
						|
 | 
						|
            # Clear the headers for next iterations below
 | 
						|
            self.session.headers.clear()
 | 
						|
 | 
						|
        for version in available_versions:
 | 
						|
            # Send a GET request to the web endpoint to get episode response for web devices
 | 
						|
            episode_response = self.session.get(
 | 
						|
                url=self.config['endpoints']['episode_web'].format(episode=version),
 | 
						|
                headers={
 | 
						|
                    'Authorization': f'Bearer {self.auth_token_web}'
 | 
						|
                }
 | 
						|
            ).json()
 | 
						|
 | 
						|
            ### TRY STATEMENT HERE FOR DEBUGGING ###
 | 
						|
            try:
 | 
						|
                # Grab the deletion token, opening the GET request above counts as a concurrent stream
 | 
						|
                # Crunchyroll only allows 3 concurrent streams to be open at once, so we most "close" the stream when all information needed has been extracted
 | 
						|
                deletion_token = episode_response['token']
 | 
						|
 | 
						|
                # Update the session headers so that session may be passed to DASH.from_url()
 | 
						|
                self.session.headers.update({
 | 
						|
                    'Authorization': f'Bearer {self.auth_token_web}'
 | 
						|
                })
 | 
						|
 | 
						|
                # Grab the new tracks into a variable
 | 
						|
                new_tracks = DASH.from_url(url=episode_response['url'], session=self.session).to_tracks(language=episode_response['audioLocale'])
 | 
						|
 | 
						|
                # Clear the subtitles on the tracks
 | 
						|
                new_tracks.subtitles.clear()
 | 
						|
 | 
						|
                # Iterate through available subtitles
 | 
						|
                for subtitle in episode_response['subtitles']:
 | 
						|
                    # If the subtitle isn't empty, continue
 | 
						|
                    if subtitle != 'none':
 | 
						|
                        # If the subtitle is not in current subtitles continue
 | 
						|
                        if episode_response['subtitles'][subtitle]['language'] not in current_subtitles:
 | 
						|
                            # Append the language to current subtitles list
 | 
						|
                            current_subtitles.append(episode_response['subtitles'][subtitle]['language'])
 | 
						|
                            # Add the subtitle to the tracks
 | 
						|
                            tracks.add(Subtitle(
 | 
						|
                                url=episode_response['subtitles'][subtitle]['url'],
 | 
						|
                                codec=Subtitle.Codec.from_mime(episode_response['subtitles'][subtitle]['format']),
 | 
						|
                                language=episode_response['subtitles'][subtitle]['language'],
 | 
						|
                            ))
 | 
						|
                # Iterate through the tracks
 | 
						|
                for track in new_tracks:
 | 
						|
                    # if the track ID isn't in the current tracks continue
 | 
						|
                    if track.id not in current_tracks:
 | 
						|
                        # Append the new track ad
 | 
						|
                        current_tracks.append(track.id)
 | 
						|
                        # Add track endpoint type
 | 
						|
                        track.data['endpoint_type'] = 'web'
 | 
						|
                        # Add track GUID
 | 
						|
                        track.data['GUID'] = version
 | 
						|
                        # Add it
 | 
						|
                        tracks.add(track)
 | 
						|
                # Now that tracks have been extracted, close the "stream"
 | 
						|
                self.session.delete(
 | 
						|
                    url=self.config['endpoints']['delete'].format(episode=version, delete_token=deletion_token),
 | 
						|
                    headers={
 | 
						|
                        'Authorization': f'Bearer {self.auth_token_web}'
 | 
						|
                    }
 | 
						|
                )
 | 
						|
                # Clear headers for the TV endpoint iteration
 | 
						|
                self.session.headers.clear()
 | 
						|
            except:
 | 
						|
                continue
 | 
						|
            if self.auth_token_tv is not None:
 | 
						|
                # Send a GET request to the Android/Google TV endpoint to get episode response for TV devices
 | 
						|
                episode_response = self.session.get(
 | 
						|
                    url=self.config['endpoints']['episode_tv'].format(episode=version),
 | 
						|
                    headers={
 | 
						|
                        'Accept': 'application/json',
 | 
						|
                        'Accept-Charset': 'UTF-8',
 | 
						|
                        'Accept-Encoding': 'gzip',
 | 
						|
                        'Authorization': f'Bearer {self.auth_token_tv}',
 | 
						|
                        'User-Agent': 'Crunchyroll/ANDROIDTV/3.46.0_22275 (Android 16; en-US; AOSP TV on x86)',
 | 
						|
                    },
 | 
						|
                ).json()
 | 
						|
 | 
						|
                # Grab the deletion token, opening the GET request above counts as a concurrent stream
 | 
						|
                # Crunchyroll only allows 3 concurrent streams to be open at once, so we most "close" the stream when all information needed has been extracted
 | 
						|
                deletion_token = episode_response['token']
 | 
						|
 | 
						|
                ## Update the headers to reflect TV device
 | 
						|
                self.session.headers.update({
 | 
						|
                    'Accept': 'application/json',
 | 
						|
                    'Accept-Charset': 'UTF-8',
 | 
						|
                    'Accept-Encoding': 'gzip',
 | 
						|
                    'Authorization': f'Bearer {self.auth_token_tv}',
 | 
						|
                    'User-Agent': 'Crunchyroll/ANDROIDTV/3.46.0_22275 (Android 16; en-US; AOSP TV on x86)',
 | 
						|
                })
 | 
						|
 | 
						|
                # Grab the new tracks into a variable
 | 
						|
                new_tracks = DASH.from_url(url=episode_response['url'], session=self.session).to_tracks(
 | 
						|
                    language=episode_response['audioLocale'])
 | 
						|
 | 
						|
                # Clear subtitles
 | 
						|
                new_tracks.subtitles.clear()
 | 
						|
 | 
						|
                # Iterate through the new_tracks
 | 
						|
                for track in new_tracks:
 | 
						|
                    # See if the track already exists in current_tracks
 | 
						|
                    if track.id not in current_tracks:
 | 
						|
                        # Append the track ID if it doesn't
 | 
						|
                        current_tracks.append(track.id)
 | 
						|
                        # Add the track endpoint type
 | 
						|
                        track.data['endpoint_type'] = 'tv'
 | 
						|
                        # Add track guid
 | 
						|
                        track.data['GUID'] = version
 | 
						|
                        # Add the track to the tracks object
 | 
						|
                        tracks.add(track)
 | 
						|
 | 
						|
                # Iterate through subtitles available in the original response
 | 
						|
                for subtitle in episode_response['subtitles']:
 | 
						|
 | 
						|
                    # If the subtitle isn't none
 | 
						|
                    if subtitle != 'none' and subtitle not in current_subtitles:
 | 
						|
                        # Add the subtitle to the tracks object
 | 
						|
                        tracks.add(Subtitle(
 | 
						|
                            url=episode_response['subtitles'][subtitle]['url'],
 | 
						|
                            codec=Subtitle.Codec.from_mime(episode_response['subtitles'][subtitle]['format']),
 | 
						|
                            language=episode_response['subtitles'][subtitle]['language'],
 | 
						|
                        ))
 | 
						|
 | 
						|
                # Now that tracks have been extracted, close the "stream"
 | 
						|
                self.session.delete(
 | 
						|
                    url=self.config['endpoints']['delete'].format(episode=version, delete_token=deletion_token),
 | 
						|
                    headers={
 | 
						|
                        'Authorization': f'Bearer {self.auth_token_tv}'
 | 
						|
                    }
 | 
						|
                )
 | 
						|
 | 
						|
                # Clear the headers for next iterations below
 | 
						|
                self.session.headers.clear()
 | 
						|
 | 
						|
        return tracks
 | 
						|
 | 
						|
    def get_chapters(self, title: Title_T) -> Chapters:
 | 
						|
        # All things created equal, this should be the same for all video tracks
 | 
						|
 | 
						|
        # Initialize chapters class
 | 
						|
        chapters = Chapters()
 | 
						|
 | 
						|
        # Send GET request for the chapters
 | 
						|
        chapter_response = self.session.get(
 | 
						|
            url=self.config['endpoints']['chapters'].format(episode=title.id),
 | 
						|
        )
 | 
						|
 | 
						|
        if chapter_response.status_code == 200:
 | 
						|
            chapter_response = chapter_response.json()
 | 
						|
            # Check for intro chapter
 | 
						|
            if chapter_response.get('intro'):
 | 
						|
                try:
 | 
						|
                    # Add the chapter, may not exist
 | 
						|
                    chapters.add(Chapter(
 | 
						|
                        timestamp=chapter_response['intro']['start'] * 1000,
 | 
						|
                        name=chapter_response['intro']['type'].capitalize(),
 | 
						|
                    ))
 | 
						|
                # If it doesn't exist, move on to the next
 | 
						|
                except:
 | 
						|
                    pass
 | 
						|
 | 
						|
            # Check for the credits chapter
 | 
						|
            if chapter_response.get('credits'):
 | 
						|
                try:
 | 
						|
                    # Add the chapter, may not exist
 | 
						|
                    chapters.add(Chapter(
 | 
						|
                        timestamp=chapter_response['credits']['start'] * 1000,
 | 
						|
                        name=chapter_response['credits']['type'].capitalize(),
 | 
						|
                    ))
 | 
						|
                # If it doesn't exist, move on to the next
 | 
						|
                except:
 | 
						|
                    pass
 | 
						|
 | 
						|
            # Check for the preview chapter
 | 
						|
            if chapter_response.get('preview'):
 | 
						|
                try:
 | 
						|
                    # Add the chapter, may not exist
 | 
						|
                    chapters.add(Chapter(
 | 
						|
                        timestamp=chapter_response['preview']['start'] * 1000,
 | 
						|
                        name=chapter_response['preview']['type'].capitalize(),
 | 
						|
                    ))
 | 
						|
                # If it doesn't exist, move on to the next
 | 
						|
                except:
 | 
						|
                    pass
 | 
						|
 | 
						|
            # Check for recap chapter
 | 
						|
            if chapter_response.get('recap'):
 | 
						|
                try:
 | 
						|
                    # Add the chapter, may not exist
 | 
						|
                    chapters.add(Chapter(
 | 
						|
                        timestamp=chapter_response['recap']['start'] * 1000,
 | 
						|
                        name=chapter_response['recap']['type'].capitalize(),
 | 
						|
                    ))
 | 
						|
                # If it doesn't exist, move on to return statement
 | 
						|
                except:
 | 
						|
                    pass
 | 
						|
 | 
						|
        # Return the chapters
 | 
						|
        return chapters
 | 
						|
 | 
						|
    def get_widevine_license(self, *, challenge: bytes, title: Title_T, track: AnyTrack) -> Optional[Union[bytes, str]]:
 | 
						|
        self.authenticate()
 | 
						|
        if track.data['endpoint_type'] == 'tv':
 | 
						|
            # Get the episode response
 | 
						|
            episode_response = self.session.get(
 | 
						|
                url=self.config['endpoints']['episode_tv'].format(episode=track.data['GUID']),
 | 
						|
                headers={
 | 
						|
                    'Accept': 'application/json',
 | 
						|
                    'Accept-Charset': 'UTF-8',
 | 
						|
                    'Accept-Encoding': 'gzip',
 | 
						|
                    'Authorization': f'Bearer {self.auth_token_tv}',
 | 
						|
                    'User-Agent': 'Crunchyroll/ANDROIDTV/3.46.0_22275 (Android 16; en-US; AOSP TV on x86)',
 | 
						|
                },
 | 
						|
            ).json()
 | 
						|
 | 
						|
            # Grab the deletion token, opening the GET request above counts as a concurrent stream
 | 
						|
            # Crunchyroll only allows 3 concurrent streams to be open at once, so we most "close" the stream when all information needed has been extracted
 | 
						|
            deletion_token = episode_response['token']
 | 
						|
 | 
						|
            # Send a post request to the license server
 | 
						|
            license_response = self.session.post(
 | 
						|
                url=self.config['endpoints']['license_tv'],
 | 
						|
                headers={
 | 
						|
                    'Accept': 'application/octet-stream',
 | 
						|
                    'Accept-Encoding': 'gzip',
 | 
						|
                    'Authorization': f'Bearer {self.auth_token_tv}',
 | 
						|
                    'Content-Type': 'application/octet-stream',
 | 
						|
                    'Host': 'cr-license-proxy.prd.crunchyrollsvc.com',
 | 
						|
                    'x-cr-content-id': track.data['GUID'],
 | 
						|
                    'x-cr-video-token': deletion_token
 | 
						|
                },
 | 
						|
                data=challenge
 | 
						|
            )
 | 
						|
 | 
						|
            # Now that license has been extracted, close the "stream"
 | 
						|
            self.session.delete(
 | 
						|
                url=self.config['endpoints']['delete'].format(episode=track.data['GUID'], delete_token=deletion_token),
 | 
						|
                headers={
 | 
						|
                    'Authorization': f'Bearer {self.auth_token_tv}'
 | 
						|
                }
 | 
						|
            )
 | 
						|
 | 
						|
            return license_response.content
 | 
						|
 | 
						|
        if track.data['endpoint_type'] == 'web':
 | 
						|
            # Get the episode response
 | 
						|
            episode_response = self.session.get(
 | 
						|
                url=self.config['endpoints']['episode_web'].format(episode=track.data['GUID']),
 | 
						|
                headers={
 | 
						|
                    'Authorization': f'Bearer {self.auth_token_web}'
 | 
						|
                }
 | 
						|
            ).json()
 | 
						|
 | 
						|
            # Grab the deletion token, opening the GET request above counts as a concurrent stream
 | 
						|
            # Crunchyroll only allows 3 concurrent streams to be open at once, so we most "close" the stream when all information needed has been extracted
 | 
						|
            deletion_token = episode_response['token']
 | 
						|
 | 
						|
            # Get the license
 | 
						|
            license_response = self.session.post(
 | 
						|
                url=self.config['endpoints']['license_web'],
 | 
						|
                headers={
 | 
						|
                    'Accept': '*/*',
 | 
						|
                    'content-type': 'application/octet-stream',
 | 
						|
                    'x-cr-content-id': track.data['GUID'],
 | 
						|
                    'x-cr-video-token': deletion_token,
 | 
						|
                    'authorization': f'Bearer {self.auth_token_web}'
 | 
						|
                },
 | 
						|
                data=challenge
 | 
						|
            ).json()
 | 
						|
 | 
						|
            # Now that license has been extracted, close the "stream"
 | 
						|
            self.session.delete(
 | 
						|
                url=self.config['endpoints']['delete'].format(episode=track.data['GUID'], delete_token=deletion_token),
 | 
						|
                headers={
 | 
						|
                    'Authorization': f'Bearer {self.auth_token_web}'
 | 
						|
                }
 | 
						|
            )
 | 
						|
 | 
						|
            return license_response['license']
 | 
						|
 | 
						|
    def on_track_downloaded(self, track: AnyTrack) -> None:
 | 
						|
        if isinstance(track, Subtitle):
 | 
						|
            track.convert(codec=Subtitle.Codec.SubRip)
 | 
						|
        self.authenticate()
 | 
						|
 | 
						|
    def search(self) -> Generator[SearchResult, None, None]:
 | 
						|
 | 
						|
        # Get the search results
 | 
						|
        search_results = self.session.get(
 | 
						|
            url=self.config['endpoints']['search'].format(search_keyword=self.title),
 | 
						|
            headers={
 | 
						|
                'Authorization': f'Bearer {self.auth_token_web}',
 | 
						|
            }
 | 
						|
        ).json()
 | 
						|
 | 
						|
        # Iterate through series responses, create generator for results.
 | 
						|
        for result_type in search_results['data']:
 | 
						|
            if result_type['type'] == 'series':
 | 
						|
                for series_results in result_type['items']:
 | 
						|
                    yield SearchResult(
 | 
						|
                        id_=series_results['id'],
 | 
						|
                        title=series_results['title'],
 | 
						|
                        description=series_results['description']
 | 
						|
                    )
 |