Update __init__.py

Rollback on code :(
master
TPD94 2024-03-06 23:18:20 -05:00
parent 28d41681e5
commit 0da99a0250
1 changed files with 57 additions and 171 deletions

View File

@ -1,7 +1,6 @@
import base64
import uuid
import math
import time
import datetime
import logging
from abc import ABCMeta, abstractmethod
@ -21,7 +20,7 @@ from devine.core.console import console
from devine.core.constants import AnyTrack
from devine.core.credential import Credential
from devine.core.titles import Title_T, Titles_T
from devine.core.tracks import Video, Audio, Subtitle, Chapter, Chapters, Tracks
from devine.core.tracks import Chapters, Tracks
from devine.core.utilities import get_ip_info
from devine.core.manifests import HLS, DASH
@ -30,7 +29,7 @@ class CR(Service):
Service code for Crunchyroll
Written by TPD94
Authorization: None (Free) | Cookies (Free and Paid Titles)
Authorization: Cookies (Free and Paid Titles)
Security: L3 FHD
"""
@ -53,77 +52,64 @@ class CR(Service):
# Pass the series_id argument to self so it's accessable across all methods
self.title = title
self.no_login = False
self.token = None
self.token_expiry = 0
# Overriding the constructor
super().__init__(ctx)
# Define function to get a session for service
@staticmethod
def get_session() -> requests.Session:
# Start a session
session = requests.Session()
# Set the headers
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:123.0) Gecko/20100101 Firefox/123.0',
'Accept': 'application/json, text/plain, */*',
'Accept-Language': 'en-US,en;q=0.5',
}
# Update the headers for the session
session.headers.update(headers)
# return the session
return session
# Defining an authinticate function
def authenticate(self, cookies: Optional[CookieJar], credential: Optional[Credential] = None):
# Check for cached token
if self.token_expiry > time.time():
return self.token
# Login session
login_session = requests.Session()
# Check whether cookies are available
if not cookies:
self.no_login = True
if cookies:
# Load the cookies for login session
login_session.cookies.get(cookies)
# Load the cookies for login session
login_session.cookies.get(cookies)
# Add cookies
login_session.cookies.update(cookies)
# Add cookies
login_session.cookies.update(cookies)
# Load the cookies for self sessions later
self.session.cookies.get(cookies)
# Load the cookies for self sessions later
self.session.cookies.get(cookies)
# Add cookies for self sessions later
self.session.cookies.update(cookies)
# Add cookies for self sessions later
self.session.cookies.update(cookies)
# If cookies are not available, log in anonymously
if self.no_login:
# Set headers for the token request
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:121.0) Gecko/20100101 Firefox/121.0',
# Setting to Firefox
"Authorization":"Basic Y3Jfd2ViOg==", # Seems to be the same across all browsers on PC
'ETP-Anonymous-ID': f'{uuid.uuid4()}', # Device ID, can be a randomized UUID
'Origin': 'https://www.crunchyroll.com', # Crunchyroll origin
'Referer': 'https://www.crunchyroll.com/', # Crunchyroll referer
}
# Set data for the token request
data = {
'grant_type': 'client_id', # Value as per browser for anonymous login
}
else:
# Set headers for the token request
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:121.0) Gecko/20100101 Firefox/121.0',
# Setting to Firefox
'Authorization': 'Basic bm9haWhkZXZtXzZpeWcwYThsMHE6', # Seems to be the same across all browsers on PC
'ETP-Anonymous-ID': f'{uuid.uuid4()}', # Device ID, can be a randomized UUID
'Origin': 'https://www.crunchyroll.com', # Crunchyroll origin
'Referer': 'https://www.crunchyroll.com/', # Crunchyroll referer
}
# Set data for the token request
data = {
'device_id': f'{uuid.uuid4()}', # Device ID, can be randomized UUID
'device_type': 'Firefox on Windows', # Setting to FireFox
'grant_type': 'etp_rt_cookie', # Seems some sort of refresh token
}
# Set headers for the token request
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:121.0) Gecko/20100101 Firefox/121.0',
# Setting to Firefox
'Authorization': 'Basic bm9haWhkZXZtXzZpeWcwYThsMHE6', # Seems to be the same across all browsers on PC
'ETP-Anonymous-ID': f'{uuid.uuid4()}', # Device ID, can be a randomized UUID
'Origin': 'https://www.crunchyroll.com', # Crunchyroll origin
'Referer': 'https://www.crunchyroll.com/', # Crunchyroll referer
}
# Set data for the token request
data = {
'device_id': f'{uuid.uuid4()}', # Device ID, can be randomized UUID
'device_type': 'Firefox on Windows', # Setting to FireFox
'grant_type': 'etp_rt_cookie', # Not sure what that is
}
# Send a post request to the auth login
response = login_session.post(url='https://www.crunchyroll.com/auth/v1/token', data=data, headers=headers)
@ -131,10 +117,6 @@ class CR(Service):
# Retrieve the token from the response
token = f"Bearer {response.json()['access_token']}"
# Cache token
self.token = token
self.token_expiry = time.time() + response.json()["expires_in"]
# Return the token
return token
@ -159,13 +141,7 @@ class CR(Service):
for seies_seasons in range(series_metadata.json()['total']):
# Get the season id
season_data = series_metadata.json()['data'][seies_seasons]
# Attempt to get season IDs for the version with original language.
if season_data.get("versions"):
season_id = [ s["guid"] for s in season_data["versions"] if s["original"] ][0]
else:
season_id = season_id['id']
season_id = series_metadata.json()['data'][seies_seasons]['id']
# Append it to the season ids list
season_ids.append(season_id)
@ -205,13 +181,8 @@ class CR(Service):
# Set a class for each episode
episode_class = Episode(id_=episode_id, title=episode_season_title, season=episode_season, number=episode_number, name=episode_name, year=episode_year, service=self.__class__)
# Check whether you can stream the title or not
if self.no_login and not episode_metadata.json()['data'][episode].get('streams_link'):
logging.getLogger("CR").warning(f"Media ID: {episode_id} is Premium Only")
continue
# Append the stream links into a list
episode_class.streams = episode_metadata.json()['data'][0]['versions'] or []
# Append the stream link
episode_class.data = episode_metadata.json()['data'][episode]['streams_link']
# Append it to the list
episodes.append(episode_class)
@ -230,89 +201,21 @@ class CR(Service):
# Update the headers
self.session.headers.update(headers)
tracks = Tracks()
# Get the MPD info
mpd_info = self.session.get(url=f'https://www.crunchyroll.com/{title.data}')
# Get the MPD info for each stream
for stream in title.streams:
# Get the MPD URL
mpd_url = mpd_info.json()['data'][0]['drm_adaptive_dash']['']['url']
# Get the MPD info
media_guid = stream["media_guid"]
episode_id = stream['guid']
mpd_info = self.session.get(f'https://www.crunchyroll.com/content/v2/cms/videos/{media_guid}/streams')
# Check for error HTML pages
try: mpd_info.json()
except:
logging.getLogger("CR").warning(f"Unable to get streams for Media ID: {episode_id}")
continue
# Get the MPD URL
mpd_url = mpd_info.json()['data'][0]['drm_adaptive_dash']['']['url']
mpd_lang = mpd_info.json()['meta']['audio_locale']
# Check whether MPD has original lang
original_vid = [ v["media_guid"] for v in mpd_info.json()['meta']['versions'] if v['original']][0]
is_original = original_vid == mpd_info.json()['meta']['media_id']
# Grab the tracks from the MPD
mpd_tracks = DASH.from_url(url=mpd_url).to_tracks(language=mpd_lang)
# Only save the video stream from the first MPD
if len(tracks.videos) == 0:
for video_track in mpd_tracks.videos:
video_track.data.update({"episode_id": stream['guid'], "media_id": stream['media_guid']})
tracks.add(video_track)
# Save audio tracks
for audio_track in mpd_tracks.audio:
audio_track.is_original_lang = is_original
audio_track.data.update({"episode_id": stream['guid'], "media_id": stream['media_guid']})
tracks.add(audio_track)
# Get subtitles
for _, sub in mpd_info.json()['meta']['subtitles'].items():
tracks.add(
Subtitle(
url=sub['url'],
codec=Subtitle.Codec.from_mime(sub['format']),
language=sub['locale'],
forced=(sub['locale'] == mpd_lang), # If audio language matches subtitle language, it's Forced.
)
)
# Grab the tracks from the MPD
tracks = DASH.from_url(url=mpd_url).to_tracks(language="en")
# Return the tracks
return tracks
# Defining a function to get chapters
def get_chapters(self, title):
chapters_data = requests.get(f"https://static.crunchyroll.com/skip-events/production/{title.id}.json")
# When Chapters are missing, it returns Access Denied XML Page.
if "Access Denied" in chapters_data.text:
logging.getLogger("CR").warning(f"No chapters found for Media ID: {title.id}")
return []
# Sort Chapters
raw_chapters = []
for _, chapter_data in chapters_data.json().items():
if type(chapter_data) is str: continue
if not chapter_data.get("start"): continue
raw_chapters.append(chapter_data)
raw_chapters = sorted( raw_chapters, key=lambda k: k["start"] )
# Convert to Chapter Class (More Explanation needs to be added)
chapters = []
chapter_index = 1
next_chapter_start = 0
for chapter in raw_chapters:
if chapter['start'] != next_chapter_start:
chapters.append(Chapter(timestamp=next_chapter_start*1000, name=f"Chapter {chapter_index}"))
chapter_index += 1
chapters.append(Chapter(timestamp=chapter['start']*1000, name=chapter['type'].capitalize()))
next_chapter_start = chapter['end']
return chapters
return []
# Defining a function to get 'x-cr-video-token'
def get_video_token(self, video_id: str = None):
@ -331,10 +234,6 @@ class CR(Service):
# Send a request to get the video token
get_token = self.session.get(f'https://cr-play-service.prd.crunchyrollsvc.com/v1/{list_video_id[0]}/web/firefox/play')
# Check for errors
if get_token.json().get('error'):
raise Exception(f"Error encountered while getting token: " + get_token.json()['error'])
# Get token from JSON response
token = get_token.json()['token']
@ -347,21 +246,14 @@ class CR(Service):
# Set the license server
crunchyroll_license_server = 'https://cr-license-proxy.prd.crunchyrollsvc.com/v1/license/widevine'
# Get the Episode ID
episode_id = track.data["episode_id"]
# Get Video Token
video_token = self.get_video_token(video_id={episode_id})
jwtToken = self.authenticate(cookies=self.session.cookies)
# Set the headers
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:121.0) Gecko/20100101 Firefox/121.0',
'Referer': 'https://static.crunchyroll.com/',
'Authorization': f'{jwtToken}',
'Authorization': f'{self.authenticate(cookies=self.session.cookies)}',
'content-type': 'application/octet-stream',
'x-cr-content-id': f'{episode_id}',
'x-cr-video-token': f'{video_token}',
'x-cr-content-id': f'{title.id}',
'x-cr-video-token': f'{self.get_video_token(video_id={title.id})}',
'Origin': 'https://static.crunchyroll.com',
}
@ -371,11 +263,5 @@ class CR(Service):
# Send the post request to the license server
license = self.session.post(url=crunchyroll_license_server, data=challenge)
# Close session to avoid TOO_MANY_ACTIVE_STREAMS error
self.session.delete(
f'https://cr-play-service.prd.crunchyrollsvc.com/v1/token/{episode_id}/{video_token}'
)
# Return the license
return license.json()['license']