master
TPD94 2024-03-06 23:41:44 -05:00
parent 0da99a0250
commit 1fdd4c1834
2 changed files with 148 additions and 90 deletions

View File

@ -1,8 +1,11 @@
import base64
import uuid
import math
import time
import datetime
import logging
import re
from abc import ABCMeta, abstractmethod
from http.cookiejar import CookieJar
from typing import Optional, Union
@ -20,16 +23,17 @@ from devine.core.console import console
from devine.core.constants import AnyTrack
from devine.core.credential import Credential
from devine.core.titles import Title_T, Titles_T
from devine.core.tracks import Chapters, Tracks
from devine.core.tracks import Video, Audio, Subtitle, Chapter, Chapters, Tracks
from devine.core.utilities import get_ip_info
from devine.core.manifests import HLS, DASH
class CR(Service):
"""
Service code for Crunchyroll
Written by TPD94
Written by TPD94 and Toonshub, rewritten by an actual dev
Authorization: Cookies (Free and Paid Titles)
Authorization: None (Free) | Cookies (Free and Paid Titles)
Security: L3 FHD
"""
@ -52,77 +56,97 @@ class CR(Service):
# Pass the series_id argument to self so it's accessable across all methods
self.title = title
result = re.match(r"^https?://(?:www\.|beta\.)?crunchyroll\.com/series/(?P<id>[A-Z0-9]+)", self.title)
if result:
self.title = result.group(1)
self.no_login = False
self.token = None
self.token_expiry = 0
# Overriding the constructor
super().__init__(ctx)
# Define function to get a session for service
@staticmethod
def get_session() -> requests.Session:
# Start a session
session = requests.Session()
# Set the headers
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:123.0) Gecko/20100101 Firefox/123.0',
'Accept': 'application/json, text/plain, */*',
'Accept-Language': 'en-US,en;q=0.5',
}
# Update the headers for the session
session.headers.update(headers)
# return the session
return session
# Defining an authinticate function
def authenticate(self, cookies: Optional[CookieJar], credential: Optional[Credential] = None):
# Check for cached token
if self.token_expiry > time.time():
return self.token
# Login session
login_session = requests.Session()
# Load the cookies for login session
login_session.cookies.get(cookies)
# Check whether cookies are available
if not cookies:
self.no_login = True
if cookies:
# Load the cookies for login session
login_session.cookies.get(cookies)
# Add cookies
login_session.cookies.update(cookies)
# Add cookies
login_session.cookies.update(cookies)
# Load the cookies for self sessions later
self.session.cookies.get(cookies)
# Load the cookies for self sessions later
self.session.cookies.get(cookies)
# Add cookies for self sessions later
self.session.cookies.update(cookies)
# Add cookies for self sessions later
self.session.cookies.update(cookies)
# Set headers for the token request
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:121.0) Gecko/20100101 Firefox/121.0',
# Setting to Firefox
'Authorization': 'Basic bm9haWhkZXZtXzZpeWcwYThsMHE6', # Seems to be the same across all browsers on PC
'ETP-Anonymous-ID': f'{uuid.uuid4()}', # Device ID, can be a randomized UUID
'Origin': 'https://www.crunchyroll.com', # Crunchyroll origin
'Referer': 'https://www.crunchyroll.com/', # Crunchyroll referer
}
# If cookies are not available, log in anonymously
if self.no_login:
# Set headers for the token request
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:121.0) Gecko/20100101 Firefox/121.0',
# Setting to Firefox
"Authorization":"Basic Y3Jfd2ViOg==", # Seems to be the same across all browsers on PC
'ETP-Anonymous-ID': f'{uuid.uuid4()}', # Device ID, can be a randomized UUID
'Origin': 'https://www.crunchyroll.com', # Crunchyroll origin
'Referer': 'https://www.crunchyroll.com/', # Crunchyroll referer
}
# Set data for the token request
data = {
'grant_type': 'client_id', # Value as per browser for anonymous login
}
else:
# Set headers for the token request
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:121.0) Gecko/20100101 Firefox/121.0',
# Setting to Firefox
'Authorization': 'Basic bm9haWhkZXZtXzZpeWcwYThsMHE6', # Seems to be the same across all browsers on PC
'ETP-Anonymous-ID': f'{uuid.uuid4()}', # Device ID, can be a randomized UUID
'Origin': 'https://www.crunchyroll.com', # Crunchyroll origin
'Referer': 'https://www.crunchyroll.com/', # Crunchyroll referer
}
# Set data for the token request
data = {
'device_id': f'{uuid.uuid4()}', # Device ID, can be randomized UUID
'device_type': 'Firefox on Windows', # Setting to FireFox
'grant_type': 'etp_rt_cookie', # Seems some sort of refresh token
}
# Set data for the token request
data = {
'device_id': f'{uuid.uuid4()}', # Device ID, can be randomized UUID
'device_type': 'Firefox on Windows', # Setting to FireFox
'grant_type': 'etp_rt_cookie', # Not sure what that is
}
# Send a post request to the auth login
response = login_session.post(url='https://www.crunchyroll.com/auth/v1/token', data=data, headers=headers)
response = login_session.post(url=self.config['endpoints']['auth_url'], data=data, headers=headers)
# Retrieve the token from the response
token = f"Bearer {response.json()['access_token']}"
# Cache token
self.token = token
self.token_expiry = time.time() + response.json()["expires_in"]
# Return the token
return token
# Defining a function to return titles
def get_titles(self) -> Series:
# Set the auth header
headers = {
'Authorization': f'{self.authenticate(cookies=self.session.cookies)}',
@ -132,16 +156,16 @@ class CR(Service):
self.session.headers.update(headers)
# Get the metadata needed for the series
series_metadata = self.session.get(url=f'https://www.crunchyroll.com/content/v2/cms/series/{self.title}/seasons')
series_metadata = self.session.get(url=self.config['endpoints']['series_metadata'].format(title=self.title)).json()
# Set an empty list for the seasons ids
season_ids = []
# Iterate through the count of seasons reported by the metadata
for seies_seasons in range(series_metadata.json()['total']):
for season in range(series_metadata['total']):
# Get the season id
season_id = series_metadata.json()['data'][seies_seasons]['id']
season_id = series_metadata['data'][season]['id']
# Append it to the season ids list
season_ids.append(season_id)
@ -151,41 +175,20 @@ class CR(Service):
# Get the episode metadata by iterating through each season id
for season in season_ids:
episode_metadata = self.session.get(url=f'https://www.crunchyroll.com/content/v2/cms/seasons/{season}/episodes')
episode_metadata = self.session.get(url=self.config['endpoints']['episode_metadata'].format(season=season)).json()
# Iterate through the total number of episodes and grab metadata
for episode in range(episode_metadata.json()['total']):
# Get the id
episode_id = episode_metadata.json()['data'][episode]['id']
# Get the episode season title
episode_season_title = episode_metadata.json()['data'][episode]['season_title']
# Get the season
episode_season = episode_metadata.json()['data'][episode]['season_number']
# Get the episode number
episode_number = math.ceil(episode_metadata.json()['data'][episode]['sequence_number'])
# Get the episode name
episode_name = episode_metadata.json()['data'][episode]['title']
# Get the episode year
timestamp = episode_metadata.json()['data'][episode]['episode_air_date']
if "Z" in timestamp:
timestamp = timestamp.replace("Z", "")
date_time = datetime.datetime.fromisoformat(timestamp)
episode_year = date_time.year
# Set a class for each episode
episode_class = Episode(id_=episode_id, title=episode_season_title, season=episode_season, number=episode_number, name=episode_name, year=episode_year, service=self.__class__)
# Append the stream link
episode_class.data = episode_metadata.json()['data'][episode]['streams_link']
# Append it to the list
episodes.append(episode_class)
# Iterate through each episode and grabs episode data
episodes = [
Episode(
id_=episode['id'],
title=episode['season_title'],
season=episode['season_number'],
number=episode['sequence_number'],
name=episode['title'],
year=episode['episode_air_date'][:4],
service=self.__class__,
data=episode['streams_link']
) for episode in episode_metadata['data']]
# Return the episodes as a Series object
return Series(episodes)
@ -201,21 +204,64 @@ class CR(Service):
# Update the headers
self.session.headers.update(headers)
# Get the MPD info
mpd_info = self.session.get(url=f'https://www.crunchyroll.com/{title.data}')
tracks = Tracks()
# Get the MPD info for each stream
mpd_info = self.session.get(url=self.config['endpoints']['mpd_url'].format(id=title.data)).json()
# Get the MPD URL
mpd_url = mpd_info.json()['data'][0]['drm_adaptive_dash']['']['url']
mpd_url = mpd_info['data'][0]['drm_adaptive_dash']['']['url']
mpd_lang = mpd_info['meta']['audio_locale']
# Grab the tracks from the MPD
tracks = DASH.from_url(url=mpd_url).to_tracks(language="en")
tracks = DASH.from_url(url=mpd_url).to_tracks(language=mpd_lang)
# Get subtitles
for _, sub in mpd_info['meta']['subtitles'].items():
tracks.add(
Subtitle(
url=sub['url'],
codec=Subtitle.Codec.from_mime(sub['format']),
language=sub['locale'],
forced=(sub['locale'] == mpd_lang), # If audio language matches subtitle language, it's Forced.
)
)
# Return the tracks
return tracks
# Defining a function to get chapters
def get_chapters(self, title):
return []
chapters_data = requests.get(self.config['endpoints']['chapters_url'].format(id=title.id))
# When Chapters are missing, it returns Access Denied XML Page.
if "Access Denied" in chapters_data.text:
logging.getLogger("CR").warning(f"No chapters found for Media ID: {title.id}")
return []
# Sort Chapters
raw_chapters = []
for _, chapter_data in chapters_data.json().items():
if type(chapter_data) is str: continue
if not chapter_data.get("start"): continue
raw_chapters.append(chapter_data)
raw_chapters = sorted( raw_chapters, key=lambda k: k["start"] )
# Convert to Chapter Class (More Explanation needs to be added)
chapters = []
chapter_index = 1
next_chapter_start = 0
for chapter in raw_chapters:
if chapter['start'] != next_chapter_start:
chapters.append(Chapter(timestamp=next_chapter_start*1000, name=f"Chapter {chapter_index}"))
chapter_index += 1
chapters.append(Chapter(timestamp=chapter['start']*1000, name=chapter['type'].capitalize()))
next_chapter_start = chapter['end']
return chapters
# Defining a function to get 'x-cr-video-token'
def get_video_token(self, video_id: str = None):
@ -232,8 +278,12 @@ class CR(Service):
self.session.headers.update(headers)
# Send a request to get the video token
get_token = self.session.get(f'https://cr-play-service.prd.crunchyrollsvc.com/v1/{list_video_id[0]}/web/firefox/play')
get_token = self.session.get(url=self.config['endpoints']['video_token'].format(id=list_video_id[0]))
# Check for errors
if get_token.json().get('error'):
raise Exception(f"Error encountered while getting token: " + get_token.json()['error'])
# Get token from JSON response
token = get_token.json()['token']

View File

@ -0,0 +1,8 @@
endpoints:
episode_metadata: https://www.crunchyroll.com/content/v2/cms/seasons/{season}/episodes
series_metadata: https://www.crunchyroll.com/content/v2/cms/series/{title}/seasons
mpd_url: https://www.crunchyroll.com/{id}
video_token: https://cr-play-service.prd.crunchyrollsvc.com/v1/{id}/web/firefox/play
license_url: https://cr-license-proxy.prd.crunchyrollsvc.com/v1/license/widevine
auth_url: https://www.crunchyroll.com/auth/v1/token
chapters_url: https://static.crunchyroll.com/skip-events/production/{id}.json