Compare commits

...

7 Commits

Author SHA1 Message Date
ToonsHub 729ec7183b Merge remote-tracking branch 'upstream/master' 2024-03-11 20:13:14 +01:00
TPD94 baee8df7c2 Added Disney+ service script
Supports Disney+ movies and TV shows
2024-03-11 02:44:37 -04:00
TPD94 8b6de1a569 Added Hulu service script.
Added support for Hulu Movies and TV series
2024-03-10 04:59:21 -04:00
TPD94 38d962d118 Update __init__.py
Fixed playID for movies
2024-03-10 01:21:54 -05:00
TPD94 64ed492ab6 Merge branch 'master' of https://cdm-project.com/TPD94/devine 2024-03-10 01:05:51 -05:00
TPD94 a73bd07527 Added TheRokuChannel service script
Tested with free movie and TV shows.
2024-03-10 01:05:30 -05:00
TPD94 ea9e197529 Merge pull request 'CR service update & VIKI config added' (#4) from ToonsHub/devine:master into master
Reviewed-on: TPD94/devine#4
2024-03-07 17:12:58 +00:00
6 changed files with 733 additions and 175 deletions

View File

@ -4,28 +4,37 @@ import math
import time
import datetime
import logging
import re
from abc import ABCMeta, abstractmethod
from http.cookiejar import CookieJar
from typing import Optional, Union
from urllib.parse import urlparse
import click
import requests
from requests.adapters import HTTPAdapter, Retry
from rich.padding import Padding
from rich.rule import Rule
from devine.core.service import Service
from devine.core.titles import Series, Episode
from devine.core.cacher import Cacher
from devine.core.config import config
from devine.core.console import console
from devine.core.constants import AnyTrack
from devine.core.credential import Credential
from devine.core.titles import Title_T, Titles_T
from devine.core.tracks import Subtitle, Chapter, Tracks
from devine.core.tracks import Video, Audio, Subtitle, Chapter, Chapters, Tracks
from devine.core.utilities import get_ip_info
from devine.core.manifests import HLS, DASH
class CR(Service):
"""
Service code for Crunchyroll
Written by TPD94 & ToonsHub
Written by TPD94 and Toonshub, rewritten by an actual dev
Authorization: None (Free) | Cookies (Free and Paid Titles)
Security: FHD@L3
Security: L3 FHD
"""
# Static method, this method belongs to the class
@ -47,6 +56,10 @@ class CR(Service):
# Pass the series_id argument to self so it's accessable across all methods
self.title = title
result = re.match(r"^https?://(?:www\.|beta\.)?crunchyroll\.com/series/(?P<id>[A-Z0-9]+)", self.title)
if result:
self.title = result.group(1)
self.no_login = False
self.token = None
self.token_expiry = 0
@ -55,7 +68,7 @@ class CR(Service):
super().__init__(ctx)
# Defining an authenticate function
# Defining an authinticate function
def authenticate(self, cookies: Optional[CookieJar], credential: Optional[Credential] = None):
# Check for cached token
@ -81,20 +94,19 @@ class CR(Service):
# Add cookies for self sessions later
self.session.cookies.update(cookies)
# Set default headers for the token request
headers = {
'User-Agent': self.config['browser']['user_agent'],
'ETP-Anonymous-ID': f'{uuid.uuid4()}', # Device ID, can be a randomized UUID
'Origin': 'https://www.crunchyroll.com', # Crunchyroll origin
'Referer': 'https://www.crunchyroll.com/', # Crunchyroll referer
}
# If cookies are not available, log in anonymously
if self.no_login:
# Set Authorization for the token request
headers["Authorization"] = "Basic Y3Jfd2ViOg==" # Seems to be the same across all browsers on PC
# Set headers for the token request
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:121.0) Gecko/20100101 Firefox/121.0',
# Setting to Firefox
"Authorization":"Basic Y3Jfd2ViOg==", # Seems to be the same across all browsers on PC
'ETP-Anonymous-ID': f'{uuid.uuid4()}', # Device ID, can be a randomized UUID
'Origin': 'https://www.crunchyroll.com', # Crunchyroll origin
'Referer': 'https://www.crunchyroll.com/', # Crunchyroll referer
}
# Set data for the token request
data = {
@ -102,13 +114,20 @@ class CR(Service):
}
else:
# Set Authorization for the token request
headers['Authorization'] = 'Basic bm9haWhkZXZtXzZpeWcwYThsMHE6' # Seems to be the same across all browsers on PC
# Set headers for the token request
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:121.0) Gecko/20100101 Firefox/121.0',
# Setting to Firefox
'Authorization': 'Basic bm9haWhkZXZtXzZpeWcwYThsMHE6', # Seems to be the same across all browsers on PC
'ETP-Anonymous-ID': f'{uuid.uuid4()}', # Device ID, can be a randomized UUID
'Origin': 'https://www.crunchyroll.com', # Crunchyroll origin
'Referer': 'https://www.crunchyroll.com/', # Crunchyroll referer
}
# Set data for the token request
data = {
'device_id': f'{uuid.uuid4()}', # Device ID, can be randomized UUID
'device_type': self.config['browser']['device_type'],
'device_type': 'Firefox on Windows', # Setting to FireFox
'grant_type': 'etp_rt_cookie', # Seems some sort of refresh token
}
@ -128,7 +147,6 @@ class CR(Service):
# Defining a function to return titles
def get_titles(self) -> Series:
# Set the auth header
headers = {
'Authorization': f'{self.authenticate(cookies=self.session.cookies)}',
@ -144,17 +162,10 @@ class CR(Service):
season_ids = []
# Iterate through the count of seasons reported by the metadata
for season_data in series_metadata['data']:
for season in range(series_metadata['total']):
# Attempt to get season IDs for the version with original language.
if season_data.get("versions"):
season_id = [ s["guid"] for s in season_data["versions"] if s["original"] ]
if len(season_id) > 0:
season_id = season_id[0]
else:
season_id = season_data['id']
else:
season_id = season_data['id']
# Get the season id
season_id = series_metadata['data'][season]['id']
# Append it to the season ids list
season_ids.append(season_id)
@ -166,52 +177,18 @@ class CR(Service):
for season in season_ids:
episode_metadata = self.session.get(url=self.config['endpoints']['episode_metadata'].format(season=season)).json()
# Iterate through the total number of episodes and grab metadata
for episode in episode_metadata['data']:
# Get the id
episode_id = episode['id']
# Get the episode season title
episode_season_title = episode['season_title']
# Get the season
episode_season = episode['season_number']
# Get the episode number
episode_number = math.ceil(episode['sequence_number'])
# Get the episode name
episode_name = episode['title']
# Get the episode year
timestamp = episode['episode_air_date']
if "Z" in timestamp:
timestamp = timestamp.replace("Z", "")
date_time = datetime.datetime.fromisoformat(timestamp)
episode_year = date_time.year
# Check whether you can stream the title or not
if self.no_login and not episode.get('streams_link'):
logging.getLogger("CR").warning(f"Media ID: {episode_id} is Premium Only")
continue
# Set a class for each episode
episode_class = Episode(id_=episode_id, title=episode_season_title, season=episode_season, number=episode_number, name=episode_name, year=episode_year, service=self.__class__)
# Append the stream links into a list
episode_class.streams = episode['versions'] or []
if len(episode_class.streams) == 0:
episode_class.streams = [
{
"audio_locale": episode['audio_locale'],
"guid": episode['id'],
"media_guid": episode['streams_link'].split("/")[-2]
}
]
# Append it to the list
episodes.append(episode_class)
# Iterate through each episode and grabs episode data
episodes = [
Episode(
id_=episode['id'],
title=episode['season_title'],
season=episode['season_number'],
number=episode['sequence_number'],
name=episode['title'],
year=episode['episode_air_date'][:4],
service=self.__class__,
data=episode['streams_link']
) for episode in episode_metadata['data']]
# Return the episodes as a Series object
return Series(episodes)
@ -230,56 +207,27 @@ class CR(Service):
tracks = Tracks()
# Get the MPD info for each stream
for stream in title.streams:
# Get the MPD info
episode_id = stream['guid']
mpd_info = self.session.get(self.config['endpoints']['mpd_api'].format(id=stream["media_guid"]))
mpd_info = self.session.get(url=self.config['endpoints']['mpd_url'].format(id=title.data)).json()
# Check for error HTML pages
try: mpd_info.json()
except:
logging.getLogger("CR").warning(f"Unable to get streams for Media ID: {episode_id}")
continue
# Get the MPD URL
mpd_url = mpd_info.json()['data'][0]['drm_adaptive_dash']['']['url']
mpd_lang = mpd_info.json()['meta']['audio_locale']
# Get the MPD URL
mpd_url = mpd_info['data'][0]['drm_adaptive_dash']['']['url']
mpd_lang = mpd_info['meta']['audio_locale']
# Check whether MPD has original lang
versions = mpd_info.json()['meta']['versions'] or [{}]
original_versions = [ v["media_guid"] for v in versions if v.get('original') ]
if len(original_versions) > 0:
original_vid = original_versions[0]
is_original = original_vid == mpd_info.json()['meta']['media_id']
else:
is_original = False
# Grab the tracks from the MPD
tracks = DASH.from_url(url=mpd_url).to_tracks(language=mpd_lang)
# Grab the tracks from the MPD
mpd_tracks = DASH.from_url(url=mpd_url).to_tracks(language=mpd_lang)
# Only save the video stream from the first MPD
if len(tracks.videos) == 0:
for video_track in mpd_tracks.videos:
video_track.data.update({"episode_id": stream['guid'], "media_id": stream['media_guid']})
tracks.add(video_track)
# Save audio tracks
for audio_track in mpd_tracks.audio:
audio_track.is_original_lang = is_original
audio_track.data.update({"episode_id": stream['guid'], "media_id": stream['media_guid']})
tracks.add(audio_track)
# Get subtitles
for _, sub in mpd_info.json()['meta']['subtitles'].items():
tracks.add(
Subtitle(
url=sub['url'],
codec=Subtitle.Codec.from_mime(sub['format']),
language=sub['locale'],
forced=(sub['locale'] == mpd_lang), # If audio language matches subtitle language, it's Forced.
)
# Get subtitles
for _, sub in mpd_info['meta']['subtitles'].items():
tracks.add(
Subtitle(
url=sub['url'],
codec=Subtitle.Codec.from_mime(sub['format']),
language=sub['locale'],
forced=(sub['locale'] == mpd_lang), # If audio language matches subtitle language, it's Forced.
)
)
# Return the tracks
return tracks
@ -330,63 +278,32 @@ class CR(Service):
self.session.headers.update(headers)
# Send a request to get the video token
get_token = self.session.get(
self.config['endpoints']['video_token'].format(id=list_video_id[0], browser=self.config['browser']['browser_name'])
)
get_token = self.session.get(url=self.config['endpoints']['video_token'].format(id=list_video_id[0]))
# Check for errors
if get_token.json().get('error'):
if get_token.json()['error'] == "TOO_MANY_ACTIVE_STREAMS":
self.close_all_sessions()
return self.get_video_token(video_id)
else:
raise Exception(f"Error encountered while getting token: " + get_token.json()['error'])
raise Exception(f"Error encountered while getting token: " + get_token.json()['error'])
# Get token from JSON response
token = get_token.json()['token']
# Return the token
return token
# Function to close all sessions before getting license
def close_all_sessions(self):
# Get all sessions
headers = {
"authorization": f"Bearer {self.authenticate(cookies=self.session.cookies)}"
}
response = requests.get(self.config['endpoints']['open_sessions'], headers=headers)
# Close all sessions
for ses in response.json()['items']:
self.close_session(ses['contentId'], ses['token'])
# Function to close a watch session
def close_session(self, id, token):
# Close session
self.session.delete(
self.config['endpoints']['delete_session'].format(id=id, token=token)
)
# Defining a function to get widevine license keys
def get_widevine_license(self, *, challenge: bytes, title: Title_T, track: AnyTrack) -> Optional[Union[bytes, str]]:
# Get the Episode ID
episode_id = track.data["episode_id"]
# Get the Tokens
video_token = self.get_video_token(video_id={episode_id})
jwtToken = self.authenticate(cookies=self.session.cookies)
# Set the license server
crunchyroll_license_server = 'https://cr-license-proxy.prd.crunchyrollsvc.com/v1/license/widevine'
# Set the headers
headers = {
'User-Agent': self.config['browser']['user_agent'],
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:121.0) Gecko/20100101 Firefox/121.0',
'Referer': 'https://static.crunchyroll.com/',
'Authorization': f'{jwtToken}',
'Authorization': f'{self.authenticate(cookies=self.session.cookies)}',
'content-type': 'application/octet-stream',
'x-cr-content-id': f'{episode_id}',
'x-cr-video-token': f'{video_token}',
'x-cr-content-id': f'{title.id}',
'x-cr-video-token': f'{self.get_video_token(video_id={title.id})}',
'Origin': 'https://static.crunchyroll.com',
}
@ -394,10 +311,7 @@ class CR(Service):
self.session.headers.update(headers)
# Send the post request to the license server
license = self.session.post(url=self.config['endpoints']['license_url'], data=challenge)
# Close session to avoid TOO_MANY_ACTIVE_STREAMS error
self.close_session(episode_id, video_token)
license = self.session.post(url=crunchyroll_license_server, data=challenge)
# Return the license
return license.json()['license']
return license.json()['license']

View File

@ -1,15 +1,8 @@
endpoints:
episode_metadata: https://www.crunchyroll.com/content/v2/cms/seasons/{season}/episodes
series_metadata: https://www.crunchyroll.com/content/v2/cms/series/{title}/seasons
mpd_api: https://www.crunchyroll.com/content/v2/cms/videos/{id}/streams
video_token: https://cr-play-service.prd.crunchyrollsvc.com/v1/{id}/web/{browser}/play
mpd_url: https://www.crunchyroll.com/{id}
video_token: https://cr-play-service.prd.crunchyrollsvc.com/v1/{id}/web/firefox/play
license_url: https://cr-license-proxy.prd.crunchyrollsvc.com/v1/license/widevine
auth_url: https://www.crunchyroll.com/auth/v1/token
chapters_url: https://static.crunchyroll.com/skip-events/production/{id}.json
open_sessions: https://cr-play-service.prd.crunchyrollsvc.com/v1/sessions/streaming
delete_session: https://cr-play-service.prd.crunchyrollsvc.com/v1/token/{id}/{token}
browser:
user_agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:121.0) Gecko/20100101 Firefox/121.0
browser_name: firefox
device_type: Firefox on Windows
chapters_url: https://static.crunchyroll.com/skip-events/production/{id}.json

View File

@ -0,0 +1,262 @@
import json
import logging
import re
from abc import ABCMeta, abstractmethod
from http.cookiejar import CookieJar
from typing import Optional, Union
from urllib.parse import urlparse
import click
import requests
from requests.adapters import HTTPAdapter, Retry
from rich.padding import Padding
from rich.rule import Rule
from devine.core.service import Service
from devine.core.titles import Movies, Movie, Titles_T, Title_T, Series, Episode
from devine.core.cacher import Cacher
from devine.core.config import config
from devine.core.console import console
from devine.core.constants import AnyTrack
from devine.core.credential import Credential
from devine.core.tracks import Chapters, Tracks
from devine.core.utilities import get_ip_info
from devine.core.manifests import HLS, DASH
from devine.core.utils.collections import as_list
class DSNP(Service):
"""
Service code for Disney+
Written by TPD94
Authorization: Login
Security: HD@L3
"""
GEOFENCE = ('US',)
# Static method, this method belongs to the class
@staticmethod
# The command name, must much the service tag (and by extension the service folder)
@click.command(name="DSNP", short_help="https://disneyplus.com", help=__doc__)
# Using series ID for Disney+
@click.argument("title", type=str)
# Option if it is a movie
@click.option("--movie", is_flag=True, help="Specify if it's a movie")
# Pass the context back to the CLI with arguments
@click.pass_context
def cli(ctx, **kwargs):
return DSNP(ctx, **kwargs)
# Accept the CLI arguments by overriding the constructor (The __init__() method)
def __init__(self, ctx, title, movie):
# Pass the series_id argument to self so it's accessable across all methods
self.title = title
self.movie = movie
# Overriding the constructor
super().__init__(ctx)
# Define authenticate function
def authenticate(self, cookies: Optional[CookieJar] = None, credential: Optional[Credential] = None) -> None:
# Set API url
api_url = 'https://disney.api.edge.bamgrid.com/graph/v1/device/graphql'
# Set first (public) header for device registration
headers = {
'authorization': 'Bearer ZGlzbmV5JmJyb3dzZXImMS4wLjA.Cu56AgSfBTDag5NiRA81oLHkDZfu5L3CKadnefEAY84',
}
# Set first (public) json data for device registration
json_data = {
'operationName': 'registerDevice',
'query': 'mutation registerDevice($input: RegisterDeviceInput!) {\n registerDevice(registerDevice: $input) {\n grant {\n grantType\n assertion\n },\n activeSession {\n partnerName\n profile {\n id\n }\n }\n }\n }',
'variables': {
'input': {
'deviceFamily': 'browser',
'applicationRuntime': 'firefox',
'deviceProfile': 'windows',
'deviceLanguage': 'en',
'attributes': {
'brand': 'web',
'operatingSystem': 'n/a',
'operatingSystemVersion': 'n/a',
},
},
},
}
# Get the device registartion auth token
register_auth = requests.post(url=api_url, json=json_data, headers=headers).json()['extensions']['sdk']['token']['accessToken']
# Set headers for login
headers = {
'authorization': f'{register_auth}',
}
# Set json for login
json_data = {
'query': '\n mutation login($input: LoginInput!) {\n login(login: $input) {\n actionGrant\n account {\n activeProfile {\n id\n }\n profiles {\n id\n attributes {\n isDefault\n parentalControls {\n isPinProtected\n }\n }\n }\n }\n activeSession {\n isSubscriber\n }\n identity {\n personalInfo {\n dateOfBirth\n gender\n }\n flows {\n personalInfo {\n requiresCollection\n eligibleForCollection\n }\n }\n }\n }\n }\n',
'variables': {
'input': {
'email': f'{credential.username}',
'password': f'{credential.password}',
},
},
'operationName': 'login',
}
# Get the login auth token
login_auth = requests.post(url='https://disney.api.edge.bamgrid.com/v1/public/graphql', json=json_data, headers=headers).json()['extensions']['sdk']['token']['accessToken']
# Pick the first profile
profile = requests.post(url='https://disney.api.edge.bamgrid.com/v1/public/graphql', json=json_data, headers=headers).json()['data']['login']['account']['profiles'][0]['id']
# Set the headers to switch profiles
headers = {
'authorization': f'{login_auth}',
}
# Set the json data to switch profiles
json_data = {
'query': '\n mutation switchProfile($input: SwitchProfileInput!) {\n switchProfile(switchProfile: $input) {\n account {\n activeProfile {\n name\n }\n }\n }\n }\n',
'variables': {
'input': {
'profileId': f'{profile}',
},
},
'operationName': 'switchProfile',
}
# Get the profile auth refresh token
profile_auth = requests.post(url='https://disney.api.edge.bamgrid.com/v1/public/graphql', json=json_data, headers=headers).json()['extensions']['sdk']['token']['refreshToken']
# Set the headers to get an auth bearer token
headers = {
'authorization': 'ZGlzbmV5JmJyb3dzZXImMS4wLjA.Cu56AgSfBTDag5NiRA81oLHkDZfu5L3CKadnefEAY84',
}
# Set the json data to get an auth bearer token
json_data = {
'query': 'mutation refreshToken($input:RefreshTokenInput!){refreshToken(refreshToken:$input){activeSession{sessionId}}}',
'variables': {
'input': {
'refreshToken': f'{profile_auth}',
},
},
'operationName': 'refreshToken',
}
# Get the bearer token
bearer = requests.post(url=api_url, json=json_data, headers=headers).json()['extensions']['sdk']['token']['accessToken']
# Set the bearer token in headers dictionary
headers = {
'authorization': f'Bearer {bearer}'
}
# Update the session with the auth bearer headers
self.session.headers.update(headers)
def get_titles(self) -> Titles_T:
# Get the title metadata
title_metadata = self.session.get(f'https://disney.api.edge.bamgrid.com/explore/v1.2/page/{self.title}').json()
# Check if --movie was used
if self.movie:
movie_metadata = self.session.get(url=f'https://disney.api.edge.bamgrid.com/explore/v1.2/page/{self.title}').json()
movie = Movie(
id_=movie_metadata['data']['page']['id'],
service=self.__class__,
name=movie_metadata['data']['page']['visuals']['title'],
year=movie_metadata['data']['page']['visuals']['metastringParts']['releaseYearRange']['startYear'],
data={'resourceId': movie_metadata['data']['page']['actions'][0]['resourceId']}
)
return Movies([movie])
else:
# Set empty list for episodes
all_episodes = []
# Iterate through the seasons
for season_num in title_metadata['data']['page']['containers'][0]['seasons']:
# Grab the season metadata
season_metadata = self.session.get(url=f'https://disney.api.edge.bamgrid.com/explore/v1.2/season/{season_num["id"]}').json()
# Iterate through each episode
i = 0
for episode_num in season_metadata['data']['season']['items']:
all_episodes.append(Episode(id_=episode_num['id'],
service=self.__class__,
title=episode_num['visuals']['title'],
season=episode_num['visuals']['seasonNumber'],
number=episode_num['visuals']['episodeNumber'],
name=episode_num['visuals']['episodeTitle'],
year=episode_num['visuals']['metastringParts']['releaseYearRange']['startYear'],
data={'resourceId': self.session.get(f'https://disney.api.edge.bamgrid.com/explore/v1.2/deeplink?action=playback&refId={episode_num["id"]}&refIdType=deeplinkId').json()['data']['deeplink']['actions'][0]['resourceId']}
))
i += 1
# Return the episodes
return Series(all_episodes)
def get_tracks(self, title: Title_T) -> Tracks:
# Set the headers to grab the m3u
self.session.headers['x-dss-feature-filtering'] = 'true'
self.session.headers['x-application-version'] = '1.1.2'
self.session.headers['x-bamsdk-client-id'] = 'disney-svod'
self.session.headers['x-bamsdk-platform'] = 'javascript/windows/firefox'
self.session.headers['x-bamsdk-version'] = '28.0'
# Set the JSON to grab the m3u
json_data = {
'playback': {
'attributes': {
'resolution': {
'max': [
'1280x720',
],
},
'protocol': 'HTTPS',
'assetInsertionStrategy': 'SGAI',
'playbackInitiationContext': 'ONLINE',
'frameRates': [
60,
],
},
},
'playbackId': f'{title.data["resourceId"]}',
}
# Grab the metadata
title_metadata = self.session.post(url='https://disney.playback.edge.bamgrid.com/v7/playback/ctr-limited', json=json_data).json()
# Grab the m3u
title_m3u = title_metadata['stream']['sources'][0]['complete']['url']
# Convert the m3u to tracks
tracks = HLS.from_url(url=title_m3u).to_tracks(language="en")
# Format the bitrate
for audio in tracks.audio:
bitrate = re.search(r"(?<=r/composite_)\d+|\d+(?=_complete.m3u8)", as_list(audio.url)[0])
audio.bitrate = int(bitrate.group()) * 10000
# Return the tracks
return tracks
def get_chapters(self, title: Title_T) -> Chapters:
return []
def get_widevine_license(self, *, challenge: bytes, title: Title_T, track: AnyTrack) -> Optional[Union[bytes, str]]:
return self.session.post(url='https://disney.playback.edge.bamgrid.com/widevine/v1/obtain-license', data=challenge).content

View File

@ -0,0 +1,228 @@
import json
import logging
from abc import ABCMeta, abstractmethod
from http.cookiejar import CookieJar
from typing import Optional, Union
from urllib.parse import urlparse
import click
import requests
from requests.adapters import HTTPAdapter, Retry
from rich.padding import Padding
from rich.rule import Rule
from devine.core.service import Service
from devine.core.titles import Movies, Movie, Titles_T, Title_T, Series, Episode
from devine.core.cacher import Cacher
from devine.core.config import config
from devine.core.console import console
from devine.core.constants import AnyTrack
from devine.core.credential import Credential
from devine.core.tracks import Chapters, Tracks
from devine.core.utilities import get_ip_info
from devine.core.manifests import HLS, DASH
class HULU(Service):
"""
Service code for Hulu
Written by TPD94
Authorization: Cookies (Ad free account only tested)
Security: HD@L3
"""
GEOFENCE = ('US',)
# Static method, this method belongs to the class
@staticmethod
# The command name, must much the service tag (and by extension the service folder)
@click.command(name="HULU", short_help="https://hulu.com", help=__doc__)
# Using series ID for hulu
@click.argument("title", type=str)
# Option if it is a movie
@click.option("--movie", is_flag=True, help="Specify if it's a movie")
# Pass the context back to the CLI with arguments
@click.pass_context
def cli(ctx, **kwargs):
return HULU(ctx, **kwargs)
# Accept the CLI arguments by overriding the constructor (The __init__() method)
def __init__(self, ctx, title, movie):
# Pass the series_id argument to self so it's accessable across all methods
self.title = title
# Set boolean if movie was used
self.movie = movie
# Overriding the constructor
super().__init__(ctx)
# Define function to authenticate
def authenticate(self, cookies: Optional[CookieJar] = None, credential: Optional[Credential] = None) -> None:
self.session.cookies.update(cookies)
def get_titles(self) -> Titles_T:
# Check to see if --movie was used
if self.movie:
# Get the metadata
metadata = self.session.get(url=f'https://discover.hulu.com/content/v5/hubs/movie/{self.title}').json()
# Set the movie
movie = Movie(id_=metadata['id'],
service=self.__class__,
name=metadata['name'],
year=metadata['details']['entity']['premiere_date'][:4],
language="en",
data={"eab_id": metadata['details']['vod_items']['focus']['entity']['bundle']['eab_id']})
# Return the movie
return Movies([movie])
# If --movie was not used
else:
# Get themetadata
metadata = self.session.get(url=f'https://discover.hulu.com/content/v5/hubs/series/{self.title}').json()
# Set empty list for episodes
all_episodes = []
# Set a season counter
s = 1
# Iterate through each season
for season_count in metadata['components'][0]['items']:
# Get the season metadata
season_metadata = self.session.get(url=f'https://discover.hulu.com/content/v5/hubs/series/{self.title}/season/{s}').json()
# Set an episode counter
e = 0
# Iterate through each episode and append it to the all episodes list
for episode_count in season_metadata['items']:
all_episodes.append(Episode(
id_=season_metadata['items'][e]['id'],
service=self.__class__,
title=metadata['name'],
season=season_metadata['items'][e]['season'],
number=season_metadata['items'][e]['number'],
name=season_metadata['items'][e]['name'],
year=season_metadata['items'][e]['premiere_date'][:4],
language="en",
data={"eab_id": season_metadata['items'][e]['bundle']['eab_id']}
))
e += 1
s += 1
# Return the episodes
return Series(all_episodes)
# Define a function to get tracks
def get_tracks(self, title: Title_T) -> Tracks:
# Set the JSON data
json_data = {
'deejay_device_id': 214,
'version': 1,
'all_cdn': True,
'content_eab_id': f'{title.data["eab_id"]}',
'region': 'US',
'language': 'en',
'unencrypted': True,
'playback': {
'version': 2,
'video': {
'codecs': {
'values': [
{
'type': 'H264',
'width': 1920,
'height': 1080,
'framerate': 60,
'level': '4.2',
'profile': 'HIGH',
},
],
'selection_mode': 'ONE',
},
},
'audio': {
'codecs': {
'values': [
{
'type': 'AAC',
},
],
'selection_mode': 'ONE',
},
},
'drm': {
'values': [
{
'type': 'WIDEVINE',
'version': 'MODULAR',
'security_level': 'L3',
},
{
'type': 'PLAYREADY',
'version': 'V2',
'security_level': 'SL2000',
},
],
'selection_mode': 'ALL',
},
'manifest': {
'type': 'DASH',
'https': True,
'multiple_cdns': True,
'patch_updates': True,
'hulu_types': True,
'live_dai': True,
'multiple_periods': True,
'secondary_audio': True,
'unified_asset_signaling': False,
'live_fragment_delay': 3,
},
'segments': {
'values': [
{
'type': 'FMP4',
'encryption': {
'mode': 'CENC',
'type': 'CENC',
},
'https': True,
},
],
'selection_mode': 'ONE',
},
},
}
# Get the title metadata
title_metadata = self.session.post(url=f'https://play.hulu.com/v6/playlist', json=json_data).json()
# Get the tracks
if 'stream_url' in title_metadata:
tracks = DASH.from_url(url=title_metadata['stream_url']).to_tracks("en")
# Grab the widevine license url and put it in the title data
title.data['widevineurl'] = title_metadata['wv_server']
# Return the tracks
return tracks
def get_chapters(self, title: Title_T) -> Chapters:
return []
def get_widevine_license(self, *, challenge: bytes, title: Title_T, track: AnyTrack) -> Optional[Union[bytes, str]]:
return self.session.post(url=f'{title.data["widevineurl"]}', data=challenge).content

View File

@ -24,20 +24,24 @@ from devine.core.tracks import Chapters, Tracks
from devine.core.utilities import get_ip_info
from devine.core.manifests import HLS, DASH
class RB(Service):
class RBOX(Service):
"""
Service code for Redbox
Written by TPD94
Authorization: None (Free titles)
Security: FHD@L3
"""
GEOFENCE = ('US',)
# Static method, this method belongs to the class
@staticmethod
# The command name, must much the service tag (and by extension the service folder)
@click.command(name="RB", short_help="https://www.redbox.com/ondemand-movies/", help=__doc__)
@click.command(name="RBOX", short_help="https://www.redbox.com/ondemand-movies/", help=__doc__)
# Using series ID for crunchyroll
@click.argument("title", type=str)
@ -45,7 +49,7 @@ class RB(Service):
# Pass the context back to the CLI with arguments
@click.pass_context
def cli(ctx, **kwargs):
return RB(ctx, **kwargs)
return RBOX(ctx, **kwargs)
# Accept the CLI arguments by overriding the constructor (The __init__() method)
def __init__(self, ctx, title):

View File

@ -0,0 +1,157 @@
import json
import logging
from abc import ABCMeta, abstractmethod
from http.cookiejar import CookieJar
from typing import Optional, Union
from urllib.parse import urlparse
import click
import requests
from requests.adapters import HTTPAdapter, Retry
from rich.padding import Padding
from rich.rule import Rule
from devine.core.service import Service
from devine.core.titles import Movies, Movie, Titles_T, Title_T, Series, Episode
from devine.core.cacher import Cacher
from devine.core.config import config
from devine.core.console import console
from devine.core.constants import AnyTrack
from devine.core.credential import Credential
from devine.core.tracks import Chapters, Tracks
from devine.core.utilities import get_ip_info
from devine.core.manifests import HLS, DASH
class ROKU(Service):
"""
Service code for TheRokuChannel
Written by TPD94
Authorization: Cookies (Free titles / Paid titles untested)
Security: FHD@L3
"""
GEOFENCE = ('US',)
# Static method, this method belongs to the class
@staticmethod
# The command name, must much the service tag (and by extension the service folder)
@click.command(name="ROKU", short_help="https://therokuchannel.roku.com", help=__doc__)
# Using series ID for crunchyroll
@click.argument("title", type=str)
# Pass the context back to the CLI with arguments
@click.pass_context
def cli(ctx, **kwargs):
return ROKU(ctx, **kwargs)
# Accept the CLI arguments by overriding the constructor (The __init__() method)
def __init__(self, ctx, title):
# Pass the series_id argument to self so it's accessable across all methods
self.title = title
# Overriding the constructor
super().__init__(ctx)
# Define the authenticate function
def authenticate(self, cookies: Optional[CookieJar] = None, credential: Optional[Credential] = None) -> None:
self.session.cookies.update(cookies)
# Define a function to get titles
def get_titles(self) -> Titles_T:
# Grab the metadata
metadata = self.session.get(url=f'https://therokuchannel.roku.com/api/v2/homescreen/content/https%3A%2F%2Fcontent.sr.roku.com%2Fcontent%2Fv1%2Froku-trc%2F{self.title}%3Fexpand%3Dcredits%252CviewOptions%252CcategoryObjects%252CviewOptions.providerDetails%252Cseries%252Cseason%252Cseason.episodes%252Cnext%252Cepisodes%252Cseasons%255B0%255D%252Cseasons%255B0%255D.episodes%26include%3Dtype%252Ctitle%252CimageMap.detailPoster%252CimageMap.detailBackground%252Cbobs.detailScreen%252CcategoryObjects%252CrunTimeSeconds%252CcastAndCrew%252Csavable%252CstationDma%252CkidsDirected%252CreleaseDate%252CreleaseYear%252Cdescription%252Cdescriptions%252Cindicators%252Cgenres%252Ccredits.birthDate%252Ccredits.meta%252Ccredits.order%252Ccredits.name%252Ccredits.role%252Ccredits.personId%252Ccredits.images%252CparentalRatings%252CreverseChronological%252CcontentRatingClass%252ClanguageDialogBody%252CdetailScreenOptions%252CviewOptions%252CepisodeNumber%252CseasonNumber%252CsportInfo%252CeventState%252Cseries.title%252Cseason%252Cseasons.title%252Cseasons.seasonNumber%252Cseasons.description%252Cseasons.descriptions%252Cseasons.releaseYear%252Cseasons.castAndCrew%252Cseasons.credits.birthDate%252Cseasons.credits.meta%252Cseasons.credits.order%252Cseasons.credits.name%252Cseasons.credits.role%252Cseasons.credits.personId%252Cseasons.credits.images%252Cseasons.imageMap.detailBackground%252Cseasons.episodes.title%252Cseasons.episodes.description%252Cseasons.episodes.descriptions.40%252Cseasons.episodes.descriptions.60%252Cseasons.episodes.episodeNumber%252Cseasons.episodes.seasonNumber%252Cseasons.episodes.images%252Cseasons.episodes.imageMap.grid%252Cseasons.episodes.indicators%252Cseasons.episodes.releaseDate%252Cseasons.episodes.viewOptions%252Cepisodes.episodeNumber%252Cepisodes.seasonNumber%252Cepisodes.viewOptions%26filter%3DcategoryObjects%253AgenreAppropriate%252520eq%252520true%252Cseasons.episodes%253A%2528not%252520empty%2528viewOptions%2529%2529%253Aall%26featureInclude%3Dbookmark%252Cwatchlist%252ClinearSchedule').json()
# Check if the title is a movie
if metadata['type'] == 'movie':
# Declare the movie
movie = Movie(id_=metadata['meta']['id'],
service=self.__class__,
name=metadata['title'],
data={'playId': metadata['viewOptions'][0]['playId']},
year=metadata['releaseYear'])
# Retuen the movie
return Movies([movie])
# Check if the title is a series
if metadata['type'] == 'series':
# Declare a list for episodes
all_episodes = []
# Set a season counter
s = 0
# Iterate through each season
for season_count in metadata['seasons']:
# Get the season metadata
season_metadata = metadata = self.session.get(f'https://therokuchannel.roku.com/api/v2/homescreen/content/https%3A%2F%2Fcontent.sr.roku.com%2Fcontent%2Fv1%2Froku-trc%2F{self.title}%3Fexpand%3Dcredits%252CviewOptions%252CcategoryObjects%252CviewOptions.providerDetails%252Cseries%252Cseason%252Cseason.episodes%252Cnext%252Cepisodes%252Cseasons%252Cseasons.episodes%26include%3Dtype%252Ctitle%252CimageMap.detailPoster%252CimageMap.detailBackground%252Cbobs.detailScreen%252CcategoryObjects%252CrunTimeSeconds%252CcastAndCrew%252Csavable%252CstationDma%252CkidsDirected%252CreleaseDate%252CreleaseYear%252Cdescription%252Cdescriptions%252Cindicators%252Cgenres%252Ccredits.birthDate%252Ccredits.meta%252Ccredits.order%252Ccredits.name%252Ccredits.role%252Ccredits.personId%252Ccredits.images%252CparentalRatings%252CreverseChronological%252CcontentRatingClass%252ClanguageDialogBody%252CdetailScreenOptions%252CviewOptions%252CepisodeNumber%252CseasonNumber%252CsportInfo%252CeventState%252Cseries.title%252Cseason%252Cseasons.title%252Cseasons.seasonNumber%252Cseasons.description%252Cseasons.descriptions%252Cseasons.releaseYear%252Cseasons.castAndCrew%252Cseasons.credits.birthDate%252Cseasons.credits.meta%252Cseasons.credits.order%252Cseasons.credits.name%252Cseasons.credits.role%252Cseasons.credits.personId%252Cseasons.credits.images%252Cseasons.imageMap.detailBackground%252Cseasons.episodes.title%252Cseasons.episodes.description%252Cseasons.episodes.descriptions.40%252Cseasons.episodes.descriptions.60%252Cseasons.episodes.episodeNumber%252Cseasons.episodes.seasonNumber%252Cseasons.episodes.images%252Cseasons.episodes.imageMap.grid%252Cseasons.episodes.indicators%252Cseasons.episodes.releaseDate%252Cseasons.episodes.viewOptions%252Cepisodes.episodeNumber%252Cepisodes.seasonNumber%252Cepisodes.viewOptions%26filter%3DcategoryObjects%253AgenreAppropriate%252520eq%252520true%252Cseasons.episodes%253A%2528not%252520empty%2528viewOptions%2529%2529%253Aall%252Cseasons%253AseasonNumber%2Beq%2B{s+1}%26featureInclude%3Dbookmark%252Cwatchlist%252ClinearSchedule').json()
# Set the episode counter
e = 0
# iterate through each episode
try:
for episode_count in season_metadata['seasons'][0]['episodes']:
all_episodes.append(Episode(
id_=metadata['seasons'][0]['episodes'][e]['meta']['id'],
title=metadata['title'],
service=self.__class__,
name=metadata['seasons'][0]['episodes'][e]['title'],
season=metadata['seasons'][0]['episodes'][e]['seasonNumber'],
number=metadata['seasons'][0]['episodes'][e]['episodeNumber'],
year=metadata['seasons'][0]['episodes'][e]['releaseDate'][:4],
data={'playId': metadata['seasons'][0]['episodes'][e]['viewOptions'][0]['playId']}
))
e += 1
s += 1
except KeyError:
continue
# Return the episodes
return Series(all_episodes)
# Define a function to get tracks
def get_tracks(self, title: Title_T) -> Tracks:
# Set the headers / get CSRF token
headers = {
'csrf-token': f'{self.session.get(url="https://therokuchannel.roku.com/api/v1/csrf").json()["csrf"]}',
}
# Set the JSON to get title information
json_data = {
'rokuId': f'{title.id}',
'playId': f'{title.data["playId"]}',
'mediaFormat': 'mpeg-dash',
'drmType': 'widevine',
'quality': 'fhd',
'bifUrl': None,
'adPolicyId': '',
'providerId': 'rokuavod',
}
# Get the movie metadata
title_metadata = self.session.post('https://therokuchannel.roku.com/api/v3/playback', headers=headers, json=json_data).json()
# Get the tracks
tracks = DASH.from_url(url=title_metadata['url']).to_tracks(language="en")
# Set the widevine url
title.data['widevineurl'] = title_metadata['drm']['widevine']['licenseServer']
# Return the tracks
return tracks
def get_chapters(self, title: Title_T) -> Chapters:
return []
def get_widevine_license(self, *, challenge: bytes, title: Title_T, track: AnyTrack) -> Optional[Union[bytes, str]]:
return self.session.post(url=title.data['widevineurl'], data=challenge).content