Hotstar (HS) service has been tested and added.
This commit is contained in:
Aswin 2025-04-06 20:27:22 +05:30
parent 4b4ed735ef
commit 96e36e7a05
8 changed files with 944 additions and 417 deletions

View File

@ -430,7 +430,9 @@ def result(ctx, service, quality, range_, wanted, alang, slang, audio_only, subs
continue # only wanted to see what tracks were available and chosen continue # only wanted to see what tracks were available and chosen
skip_title = False skip_title = False
for track in title.tracks:
#Download might fail as auth token expires quickly for Hotstar. This is a problem for big downloads like a 4k track. So we reverse tracks and download audio first and large video later.
for track in (list(title.tracks)[::-1] if service_name == "Hotstar" else title.tracks):
if not keys: if not keys:
log.info(f"Downloading: {track}") log.info(f"Downloading: {track}")
if (service_name == "AppleTVPlus" or service_name == "iTunes") and "VID" in str(track): if (service_name == "AppleTVPlus" or service_name == "iTunes") and "VID" in str(track):

View File

@ -1,10 +1,12 @@
needs_auth: false
endpoints: endpoints:
login: 'https://api.hotstar.com/in/aadhar/v2/web/th/user/login' login: 'https://api.hotstar.com/in/aadhar/v2/web/in/user/login'
refresh: 'https://www.hotstar.com/api/internal/bff/v2/start' refresh: 'https://www.hotstar.com/api/internal/bff/v2/start'
tv_title: 'https://api.hotstar.com/o/v1/show/detail' tv_title: 'https://api.hotstar.com/o/v1/show/detail'
tv_episodes: 'https://api.hotstar.com/o/v1/tray/g/1/detail' tv_episodes: 'https://api.hotstar.com/o/v1/tray/g/1/detail'
movie_title: 'https://api.hotstar.com/o/v1/movie/detail' movie_title: 'https://api.hotstar.com/o/v1/movie/detail'
manifest: 'https://api.hotstar.com/play/v4/playback/content/{id}' manifest: 'https://apix.hotstar.com/v2/pages/watch'
device: device:
os: os:
@ -13,4 +15,4 @@ device:
platform: platform:
name: 'web' name: 'web'
version: '7.35.0' version: '7.35.0'

View File

@ -0,0 +1,16 @@
endpoints:
login: 'https://api.hotstar.com/in/aadhar/v2/web/th/user/login'
refresh: 'https://www.hotstar.com/api/internal/bff/v2/start'
tv_title: 'https://api.hotstar.com/o/v1/show/detail'
tv_episodes: 'https://api.hotstar.com/o/v1/tray/g/1/detail'
movie_title: 'https://api.hotstar.com/o/v1/movie/detail'
manifest: 'https://api.hotstar.com/play/v4/playback/content/{id}'
device:
os:
name: 'Windows'
version: 10
platform:
name: 'web'
version: '7.35.0'

View File

@ -403,6 +403,7 @@ class Track:
segment.uri segment.uri
) )
self.url = segments self.url = segments
if self.source == "CORE": if self.source == "CORE":
asyncio.run(saldl( asyncio.run(saldl(
self.url, self.url,
@ -410,11 +411,13 @@ class Track:
headers, headers,
proxy if self.needs_proxy else None proxy if self.needs_proxy else None
)) ))
elif self.descriptor == self.Descriptor.ISM: elif (self.descriptor == self.Descriptor.ISM) or (self.source == "HS" and self.__class__.__name__ != "TextTrack"):
asyncio.run(m3u8dl( asyncio.run(m3u8dl(
self.url, self.url,
save_path, save_path,
self self,
headers,
proxy if self.needs_proxy else None
)) ))
else: else:
asyncio.run(aria2c( asyncio.run(aria2c(

View File

@ -267,7 +267,7 @@ def parse(*, url=None, data=None, source, session=None, downloader=None):
tracks.append(VideoTrack( tracks.append(VideoTrack(
id_=track_id, id_=track_id,
source=source, source=source,
url=track_url, url=url if source == "HS" else track_url,
# metadata # metadata
codec=(codecs or "").split(".")[0], codec=(codecs or "").split(".")[0],
language=track_lang, language=track_lang,
@ -303,7 +303,7 @@ def parse(*, url=None, data=None, source, session=None, downloader=None):
tracks.append(AudioTrack( tracks.append(AudioTrack(
id_=track_id, id_=track_id,
source=source, source=source,
url=track_url, url=url if source == "HS" else track_url,
# metadata # metadata
codec=(codecs or "").split(".")[0], codec=(codecs or "").split(".")[0],
language=track_lang, language=track_lang,

View File

@ -5,445 +5,483 @@ import json
import os import os
import time import time
import uuid import uuid
import re
import requests
from datetime import datetime
from urllib.parse import urlparse, parse_qs
from urllib.request import urlopen, Request
import http.cookiejar as cookiejar
import click import click
import m3u8
from urllib.request import urlopen, Request
from vinetrimmer.config import directories
from vinetrimmer.objects import Title, Tracks from vinetrimmer.objects import Title, Tracks
from vinetrimmer.services.BaseService import BaseService from vinetrimmer.services.BaseService import BaseService
from vinetrimmer.config import config, directories
class Hotstar(BaseService): class Hotstar(BaseService):
""" """
Service code for Star India's Hotstar (aka Disney+ Hotstar) streaming service (https://hotstar.com). Service code for Star India's Hotstar (aka Disney+ Hotstar) streaming service (https://hotstar.com).
\b \b
Authorization: Credentials Authorization: Credentials
Security: UHD@L3, doesn't seem to care about releases. Security: UHD@L3, doesn't seem to care about releases.
\b \b
Tips: - The library of contents can be viewed without logging in at https://hotstar.com Tips: - The library of contents can be viewed without logging in at https://hotstar.com
- The homepage hosts domestic programming; Disney+ content is at https://hotstar.com/in/disneyplus - The homepage hosts domestic programming; Disney+ content is at https://hotstar.com/in/disneyplus
""" """
ALIASES = ["HS", "hotstar"] ALIASES = ["HS", "Hotstar"]
#GEOFENCE = ["in"] TITLE_RE = r"^(?:https?://(?:www\.)?hotstar\.com/[a-z0-9/-]+/)(?P<id>\d+)"
TITLE_RE = r"^(?:https?://(?:www\.)?hotstar\.com/[a-z0-9/-]+/)(?P<id>\d+)"
@staticmethod @staticmethod
@click.command(name="Hotstar", short_help="https://hotstar.com") @click.command(name="Hotstar", short_help="https://hotstar.com")
@click.argument("title", type=str, required=False) @click.argument("title", type=str, required=False)
@click.option("-q", "--quality", default="fhd", @click.option("-rg", "--region", default="in", type=click.Choice(["in", "id", "th"], case_sensitive=False),
type=click.Choice(["4k", "fhd", "hd", "sd"], case_sensitive=False), help="Account region")
help="Manifest quality to request.") @click.pass_context
@click.option("-c", "--channels", default="5.1", type=click.Choice(["5.1", "2.0", "atmos"], case_sensitive=False), def cli(ctx, **kwargs):
help="Audio Codec") return Hotstar(ctx, **kwargs)
@click.option("-rg", "--region", default="in", type=click.Choice(["in", "id", "th"], case_sensitive=False),
help="Account region")
@click.pass_context
def cli(ctx, **kwargs):
return Hotstar(ctx, **kwargs)
def __init__(self, ctx, title, quality, channels, region): def __init__(self, ctx, title, region):
super().__init__(ctx) super().__init__(ctx)
self.parse_title(ctx, title) self.parse_title(ctx, title)
self.quality = quality self.region = region.lower()
self.channels = channels if "/movies/" in title:
self.region = region.lower() self.movie = True
else:
self.movie = False
assert ctx.parent is not None assert ctx.parent is not None
self.vcodec = ctx.parent.params["vcodec"]
self.acodec = ctx.parent.params["acodec"] or "EC3"
self.range = ctx.parent.params["range_"]
self.hdrdv = None
self.vcodec = ctx.parent.params["vcodec"] #self.log.info(self.title)
self.acodec = ctx.parent.params["acodec"] or "EC3" #self.log.info(self.range)
self.range = ctx.parent.params["range_"] #self.log.info(self.vcodec)
#self.log.info(self.hdrdv)
self.profile = ctx.obj.profile
self.device_id = None
self.hotstar_auth = None
self.token = None
self.license_api = None
self.profile = ctx.obj.profile self.configure()
self.device_id = None def get_titles(self):
self.hotstar_auth = None headers = {
self.token = None "Accept": "*/*",
self.license_api = None "Accept-Language": "en-GB,en;q=0.5",
"hotstarauth": self.hotstar_auth,
"X-HS-UserToken": self.token,
"X-HS-Platform": self.config["device"]["platform"]["name"],
"X-HS-AppVersion": self.config["device"]["platform"]["version"],
"X-Country-Code": self.region,
"x-platform-code": "PCTV"
}
if self.movie:
params = {
"contentId": self.title,
}
else:
params = {
"contentId": self.title,
"tao": "0",
"tas": "700",
}
r = self.session.get(
url=self.config["endpoints"]["movie_title"] if self.movie else self.config["endpoints"]["tv_title"],
headers=headers,
params=params,
)
try:
res = r.json()["body"]["results"]["item"]
#self.log.info(r.json())
except json.JSONDecodeError:
raise ValueError(f"Failed to load title manifest: {res.text}")
self.configure() self.content_type = res["assetType"]
self.lang = res["langObjs"][0]["iso3code"]
#self.log.info(self.lang)
def get_titles(self): if res["assetType"] == "MOVIE":
headers = { return Title(
"Accept": "*/*", id_=res.get("contentId"),
"Accept-Language": "en-GB,en;q=0.5", type_=Title.Types.MOVIE,
"hotstarauth": self.hotstar_auth, name=res["title"],
"X-HS-UserToken": self.token, year=res["year"],
"X-HS-Platform": self.config["device"]["platform"]["name"], original_lang=res["langObjs"][0]["iso3code"],
"X-HS-AppVersion": self.config["device"]["platform"]["version"], source=self.ALIASES[0],
"X-Country-Code": self.region, service_data=res,
"x-platform-code": "PCTV" )
} else:
try: try:
r = self.session.get( re = self.session.get(
url=self.config["endpoints"]["movie_title"], url=self.config["endpoints"]["tv_episodes"],
headers=headers, headers=headers,
params={"contentId": self.title} params={
) "eid": res.get("contentId"),
try: "etid": "2",
res = r.json()["body"]["results"]["item"] "tao": "0",
except json.JSONDecodeError: "tas": res["episodeCnt"],
raise ValueError(f"Failed to load title manifest: {res.text}") },
except: )
r = self.session.get( res = re.json()["body"]["results"]["assets"]["items"]
url=self.config["endpoints"]["tv_title"], except:
headers=headers, res = r.json()["body"]["results"]["trays"]["items"][0]["assets"]["items"]
params={"contentId": self.title} return [Title(
) id_=x.get("contentId"),
try: type_=Title.Types.TV,
res = r.json()["body"]["results"]["item"] name=x.get("showShortTitle"),
except json.JSONDecodeError: year=x.get("year"),
raise ValueError(f"Failed to load title manifest: {res.text}") season=x.get("seasonNo"),
episode=x.get("episodeNo"),
episode_name=x.get("title"),
original_lang=x["langObjs"][0]["iso3code"],
source=self.ALIASES[0],
service_data=x
) for x in res]
if res["assetType"] == "MOVIE": def get_playback(self, content_id, range):
return Title( if self.vcodec == "H265":
id_=self.title, quality = "4k"
type_=Title.Types.MOVIE, video_code = "h265\",\"dvh265"
name=res["title"], else:
year=res["year"], quality = "fhd"
original_lang=res["langObjs"][0]["iso3code"], video_code = "h264"
source=self.ALIASES[0], r = self.session.get(
service_data=res, url=self.config["endpoints"]["manifest"], # .format(id=title.service_data["contentId"]),
) params={
else: "content_id": content_id,
r = self.session.get( "filters": f"content_type={self.content_type}",
url=self.config["endpoints"]["tv_episodes"], "client_capabilities": "{\"package\":[\"dash\",\"hls\"],\"container\":[\"fmp4br\",\"fmp4\"],\"ads\":[\"non_ssai\",\"ssai\"],\"audio_channel\":[\"atmos\",\"dolby51\",\"stereo\"],\"encryption\":[\"plain\",\"widevine\"],\"video_codec\":[\"" + video_code + "\"],\"ladder\":[\"tv\",\"full\"],\"resolution\":[\"" + quality + "\"],\"true_resolution\":[\"" + quality + "\"],\"dynamic_range\":[\"" + range + "\"]}",
headers=headers, "drm_parameters": "{\"widevine_security_level\":[\"SW_SECURE_DECODE\",\"SW_SECURE_CRYPTO\"],\"hdcp_version\":[\"HDCP_V2_2\",\"HDCP_V2_1\",\"HDCP_V2\",\"HDCP_V1\"]}"
params={ },
"eid": res["id"], headers={
"etid": "2", "user-agent": "Disney+;in.startv.hotstar.dplus.tv/23.08.14.4.2915 (Android/13)",
"tao": "0", "hotstarauth": self.hotstar_auth,
"tas": "1000" "x-hs-usertoken": self.token,
} "x-hs-device-id": self.device_id,
) "x-hs-client": "platform:androidtv;app_id:in.startv.hotstar.dplus.tv;app_version:23.08.14.4;os:Android;os_version:13;schema_version:0.0.970",
try: "x-hs-platform": "androidtv",
res = r.json()["body"]["results"]["assets"]["items"] "content-type": "application/json",
except json.JSONDecodeError: },
raise ValueError(f"Failed to load episodes list: {r.text}") ).json()
return [Title(
id_=self.title,
type_=Title.Types.TV,
name=x.get("showShortTitle"),
year=x.get("year"),
season=x.get("seasonNo"),
episode=x.get("episodeNo"),
episode_name=x.get("title"),
original_lang=x["langObjs"][0]["iso3code"],
source=self.ALIASES[0],
service_data=x
) for x in res]
def get_tracks(self, title): try:
if title.service_data.get("parentalRating", 0) > 2: playback = r['success']['page']['spaces']['player']['widget_wrappers'][0]['widget']['data']['player_config'][
body = json.dumps({ 'media_asset']['primary']
"devices": [{ # self.log.info(playback)
"id": self.device_id, except:
"name": "Chrome Browser on Windows", #self.log.info(r['success']['page']['spaces']['player']['widget_wrappers'][0]['widget']['data']['player_config']['media_asset'])
"consentProvided": True self.log.info(f'Error: {str(r["error"]["error_message"])}')
}] self.log.exit(f' - {str(r["error"]["error_message"])}')
})
self.session.post( if playback == {}:
url="https://api.hotstar.com/play/v1/consent/content/{id}?".format(id=title.service_data["contentId"]), #self.log.info(json.dumps(r, indent=4))
headers={ # sendvtLog('Error: Wanted format is not available!')
"Accept": "*/*", self.log.exit(" - Wanted playback set is unavailable for this title!")
"Content-Type": "application/json",
"hotstarauth": self.hotstar_auth,
"X-HS-UserToken": self.token,
"X-HS-Platform": self.config["device"]["platform"]["name"],
"X-HS-AppVersion": self.config["device"]["platform"]["version"],
"X-HS-Request-Id": str(uuid.uuid4()),
"X-Country-Code": self.region
},
data=body
).json()
akamai_cdn=True
count = 1
while akamai_cdn:
r = self.session.post(
url=self.config["endpoints"]["manifest"].format(id=title.service_data["contentId"]),
params={
# TODO: Perhaps set up desired-config to actual desired playback set values?
"desired-config": "|".join([
"audio_channel:stereo",
"container:fmp4",
"dynamic_range:sdr",
"encryption:widevine",
"ladder:tv",
"package:dash",
"resolution:fhd",
"video_codec:h264"
]),
"device-id": self.device_id,
"type": "paid",
},
headers={
"Accept": "*/*",
"hotstarauth": self.hotstar_auth,
"x-hs-usertoken": self.token,
"x-hs-request-id": self.device_id,
"x-country-code": self.region
},
json={
"os_name": "Windows",
"os_version": "10",
"app_name": "web",
"app_version": "7.34.1",
"platform": "Chrome",
"platform_version": "99.0.4844.82",
"client_capabilities": {
"ads": ["non_ssai"],
"audio_channel": ["stereo"],
"dvr": ["short"],
"package": ["dash", "hls"],
"dynamic_range": ["sdr"],
"video_codec": ["h264"],
"encryption": ["widevine"],
"ladder": ["tv"],
"container": ["fmp4", "ts"],
"resolution": ["hd"]
},
"drm_parameters": {
"widevine_security_level": ["SW_SECURE_DECODE", "SW_SECURE_CRYPTO"],
"hdcp_version": ["HDCP_V2_2", "HDCP_V2_1", "HDCP_V2", "HDCP_V1"]
},
"resolution": "auto",
"type": "paid",
}
)
try:
playback_sets = r.json()["data"]["playback_sets"]
except json.JSONDecodeError:
raise ValueError(f"Manifest fetch failed: {r.text}")
# transform tagsCombination into `tags` key-value dictionary for easier usage return playback
playback_sets = [dict(
**x,
tags=dict(y.split(":") for y in x["tags_combination"].lower().split(";"))
) for x in playback_sets]
playback_set = next(( def get_tracks(self, title):
x for x in playback_sets if self.hdrdv:
if x["tags"].get("encryption") == "widevine" or x["tags"].get("encryption") == "plain" # widevine, fairplay, playready tracks = Tracks()
if x["tags"].get("package") == "dash" # dash, hls
if x["tags"].get("container") == "fmp4br" # fmp4, fmp4br, ts
if x["tags"].get("ladder") == "tv" # tv, phone
if x["tags"].get("video_codec").endswith(self.vcodec.lower()) # dvh265, h265, h264 - vp9?
# user defined, may not be available in the tags list:
if x["tags"].get("resolution") in [self.quality, None] # max is fine, -q can choose lower if wanted
if x["tags"].get("dynamic_range") in [self.range.lower(), None] # dv, hdr10, sdr - hdr10+?
if x["tags"].get("audio_codec") in [self.acodec.lower(), None] # ec3, aac - atmos?
if x["tags"].get("audio_channel") in [{"5.1": "dolby51", "2.0": "stereo", "atmos": "atmos"}[self.channels], None]
), None)
if not playback_set:
playback_set = next((
x for x in playback_sets
if x["tags"].get("encryption") == "widevine" or x["tags"].get("encryption") == "plain" # widevine, fairplay, playready
if x["tags"].get("package") == "dash" # dash, hls
if x["tags"].get("ladder") == "tv" # tv, phone
if x["tags"].get("resolution") in [self.quality, None]
), None)
if not playback_set:
raise ValueError("Wanted playback set is unavailable for this title...")
if "licence_url" in playback_set: self.license_api = playback_set["licence_url"]
if playback_set['token_algorithm'] == 'airtel-qwilt-vod' or playback_set['token_algorithm'] == 'AKAMAI-HMAC':
self.log.info(f'Gotcha!')
akamai_cdn = False
else:
self.log.info(f'Finding MPD... {count}')
count += 1
r = Request(playback_set["playback_url"]) session_hdr = self.session
r.add_header("user-agent", "Hotstar;in.startv.hotstar/3.3.0 (Android/8.1.0)") session_dv = self.session
data = urlopen(r).read()
playback_hdr = self.get_playback(title.service_data["contentId"], range='hdr10')
playback_dv = self.get_playback(title.service_data["contentId"], range='dv')
mpd_url = playback_set["playback_url"] #.replace(".hotstar.com", ".akamaized.net") mpd_url_hdr = playback_hdr['content_url'].split('?')[0]
mpd_url_dv = playback_dv['content_url'].split('?')[0]
self.session.headers.update({
"Cookie": self.hdntl
})
tracks = Tracks.from_mpd( if 'widevine' in playback_hdr['playback_tags']:
url=mpd_url, self.license_api = playback_hdr["license_url"]
data=data,
session=self.session,
source=self.ALIASES[0]
)
for track in tracks:
track.needs_proxy = True
return tracks
def get_chapters(self, title): if 'vod-cf' in mpd_url_hdr:
return [] data_hdr = session_hdr.get(playback_hdr['content_url'])
cookies_hdr = data_hdr.cookies.get_dict()
cookies_hdr_ = f"hdntl={cookies_hdr['hdntl']}; CloudFront-Key-Pair-Id={cookies_hdr['CloudFront-Key-Pair-Id']}; CloudFront-Policy={cookies_hdr['CloudFront-Policy']}; CloudFront-Signature={cookies_hdr['CloudFront-Signature']}"
session_hdr.headers.update({'cookie': cookies_hdr_})
else:
session_hdr.proxies.update(
{'all': 'http://150.230.141.229:3128'})
data_hdr = session_hdr.get(playback_hdr['content_url'])
cookies_hdr = data_hdr.cookies.get_dict()
cookies_hdr_ = f"hdntl={cookies_hdr['hdntl']}"
session_hdr.headers.update({'cookie': cookies_hdr_})
def certificate(self, **_): self.log.debug(f"Cookies HDR -> {cookies_hdr_}")
return None # will use common privacy cert tracks_hdr = Tracks.from_mpd(
url=mpd_url_hdr,
data=data_hdr.text,
session=session_hdr,
source=self.ALIASES[0],
)
for track in tracks_hdr.videos:
if not track.hdr10:
track.hdr10 = True
def license(self, challenge, **_): if 'vod-cf' in mpd_url_dv:
return self.session.post( data_dv = session_dv.get(playback_dv['content_url'])
url=self.license_api, cookies_dv = data_dv.cookies.get_dict()
data=challenge # expects bytes cookies_dv_ = f"hdntl={cookies_dv['hdntl']}; CloudFront-Key-Pair-Id={cookies_dv['CloudFront-Key-Pair-Id']}; CloudFront-Policy={cookies_dv['CloudFront-Policy']}; CloudFront-Signature={cookies_dv['CloudFront-Signature']}"
).content session_dv.headers.update({'cookie': cookies_dv_})
else:
session_dv.proxies.update(
{'all': 'http://150.230.141.229:3128'})
data_dv = session_dv.get(playback_dv['content_url'])
cookies_dv = data_dv.cookies.get_dict()
cookies_dv_ = f"hdntl={cookies_dv['hdntl']}"
session_dv.headers.update({'cookie': cookies_dv_})
# Service specific functions self.log.debug(f"Cookies DV -> {cookies_dv_}")
def configure(self): tracks_dv = Tracks.from_mpd(
self.session.headers.update({ url=mpd_url_dv,
"Origin": "https://www.hotstar.com", data=data_dv.text,
"Referer": f'"https://www.hotstar.com/{self.region}"' session=session_dv,
}) source=self.ALIASES[0],
self.log.info("Logging into Hotstar") )
self.log.info(f'Setting region to "{self.region}"') tracks.add(tracks_hdr, warn_only=True)
self.hotstar_auth = self.get_akamai() tracks.add(tracks_dv, warn_only=True)
self.log.info(f" + Calculated HotstarAuth: {self.hotstar_auth}") for track in tracks:
try: track.needs_proxy = True
if self.cookies: return tracks
hdntl_cookies = [cookie for cookie in self.session.cookies if cookie.name == 'hdntl'] else:
self.hdntl = f"hdntl={hdntl_cookies[-1].value}" range = 'sdr'
self.device_id = self.session.cookies.get("deviceId") if self.range == 'HDR10':
self.log.info(f" + Using Device ID: {self.device_id}") range = 'hdr10'
except: elif self.range == 'DV':
self.device_id = str(uuid.uuid4()) range = 'dv'
self.log.info(f" + Created Device ID: {self.device_id}")
self.token = self.get_token()
self.log.info(" + Obtained tokens")
@staticmethod playback = self.get_playback(title.service_data["contentId"], range)
def get_akamai(): self.log.debug(playback)
enc_key = b"\x05\xfc\x1a\x01\xca\xc9\x4b\xc4\x12\xfc\x53\x12\x07\x75\xf9\xee"
st = int(time.time())
exp = st + 6000
res = f"st={st}~exp={exp}~acl=/*"
res += "~hmac=" + hmac.new(enc_key, res.encode(), hashlib.sha256).hexdigest()
return res
def get_token(self): if 'widevine' in playback['playback_tags']:
token_cache_path = self.get_cache("token_{profile}.json".format(profile=self.profile)) self.license_api = playback["license_url"]
if os.path.isfile(token_cache_path):
with open(token_cache_path, encoding="utf-8") as fd:
token = json.load(fd)
if token.get("exp", 0) > int(time.time()):
# not expired, lets use
self.log.info(" + Using cached auth tokens...")
return token["uid"]
else:
# expired, refresh
self.log.info(" + Refreshing and using cached auth tokens...")
return self.save_token(self.refresh(token["uid"], token["sub"]["deviceId"]), token_cache_path)
# get new token
if self.cookies:
token = self.session.cookies.get("sessionUserUP", None, 'www.hotstar.com', '/' + self.region)
else:
raise self.log.exit(f" - Please add cookies")
# token = self.login()
return self.save_token(token, token_cache_path)
@staticmethod if 'vod-cf' in playback['content_url']:
def save_token(token, to): mpd_url = playback['content_url'].split('?')[0]
# Decode the JWT data component #self.log.debug(self.session.cookies.get_dict())
data = json.loads(base64.b64decode(token.split(".")[1] + "===").decode("utf-8")) datax = self.session.get(playback['content_url'])
data["uid"] = token data = datax.text
data["sub"] = json.loads(data["sub"]) cookies = datax.cookies.get_dict()
cookies_ = f"hdntl={cookies['hdntl']}; CloudFront-Key-Pair-Id={cookies['CloudFront-Key-Pair-Id']}; CloudFront-Policy={cookies['CloudFront-Policy']}; CloudFront-Signature={cookies['CloudFront-Signature']}"
self.log.debug(f"Manifest Header -> Cookie: {cookies_}")
self.session.headers.update({'cookie': cookies_})
else:
mpd_url = playback['content_url']
r = Request(playback["content_url"])
r.add_header(
"user-agent",
"Disney+;in.startv.hotstar.dplus.tv/23.08.14.4.2915 (Android/13)",
)
r1 = urlopen(r)
cookie = ""
cookies = r1.info().get_all("Set-Cookie")
if cookies is not None:
for cookiee in cookies:
cookie += cookiee.split(";")[0] + ";"
self.log.debug(cookie)
self.session.headers = {
"cookie": cookie,
"user-agent": "Disney+;in.startv.hotstar.dplus.tv/23.08.14.4.2915 (Android/13)",
}
data = r1.read()
#self.log.info(data)
if ".m3u8" in mpd_url:
data = data.decode("utf-8")
os.makedirs(os.path.dirname(to), exist_ok=True) self.log.debug(mpd_url)
with open(to, "w", encoding="utf-8") as fd:
json.dump(data, fd)
return token try:
tracks = Tracks.from_mpd(
url=mpd_url,
data=data,
session=self.session,
source=self.ALIASES[0],
)
except:
tracks = Tracks.from_m3u8(
master=m3u8.loads(data, uri=mpd_url),
source=self.ALIASES[0]
)
def refresh(self, user_id_token, device_id): for track in tracks:
json_data = { track.needs_proxy = True
'deeplink_url': f'/{self.region}?client_capabilities=%7B%22ads%22%3A%5B%22non_ssai%22%5D%2C%22audio_channel%22%3A%5B%22stereo%22%5D%2C%22container%22%3A%5B%22fmp4%22%2C%22ts%22%5D%2C%22dvr%22%3A%5B%22short%22%5D%2C%22dynamic_range%22%3A%5B%22sdr%22%5D%2C%22encryption%22%3A%5B%22widevine%22%2C%22plain%22%5D%2C%22ladder%22%3A%5B%22web%22%2C%22tv%22%2C%22phone%22%5D%2C%22package%22%3A%5B%22dash%22%2C%22hls%22%5D%2C%22resolution%22%3A%5B%22sd%22%2C%22hd%22%5D%2C%22video_codec%22%3A%5B%22h264%22%5D%2C%22true_resolution%22%3A%5B%22sd%22%2C%22hd%22%2C%22fhd%22%5D%7D&drm_parameters=%7B%22hdcp_version%22%3A%5B%22HDCP_V2_2%22%5D%2C%22widevine_security_level%22%3A%5B%22SW_SECURE_DECODE%22%5D%2C%22playready_security_level%22%3A%5B%5D%7D',
'app_launch_count': 1,
}
r = self.session.post(
url=self.config["endpoints"]["refresh"],
headers={
'x-hs-usertoken': user_id_token,
'X-HS-Platform': self.config["device"]["platform"]["name"],
'X-Country-Code': self.region,
'X-HS-Accept-language': 'eng',
'X-Request-Id': str(uuid.uuid4()),
'x-hs-device-id': device_id,
'X-HS-Client-Targeting': f'ad_id:{device_id};user_lat:false',
'x-hs-request-id': str(uuid.uuid4()),
'X-HS-Client': 'platform:web;app_version:23.06.23.3;browser:Firefox;schema_version:0.0.911',
'Origin': 'https://www.hotstar.com',
'Referer': f'https://www.hotstar.com/{self.region}',
},
json=json_data
)
for cookie in self.cookies:
if cookie.name == 'sessionUserUP' and cookie.path == f"/{self.region}" and cookie.domain == 'www.hotstar.com':
cookie.value = r.headers["x-hs-usertoken"]
for x in self.ALIASES:
cookie_file = os.path.join(directories.cookies, x.lower(), f"{self.profile}.txt")
if not os.path.isfile(cookie_file):
cookie_file = os.path.join(directories.cookies, x, f"{self.profile}.txt")
if os.path.isfile(cookie_file):
self.cookies.save(cookie_file, ignore_discard=True, ignore_expires=True)
break
return r.headers["x-hs-usertoken"]
def login(self): for track in tracks.videos:
""" if self.range == "HDR10":
Log in to HOTSTAR and return a JWT User Identity token. if not track.hdr10:
:returns: JWT User Identity token. track.hdr10 = True
""" track.language = self.lang
if self.credentials.username == "username" and self.credentials.password == "password":
logincode_url = f"https://api.hotstar.com/{self.region}/aadhar/v2/firetv/{self.region}/users/logincode/" for track in tracks.audios:
logincode_headers = { if track.language == "und":
"Content-Length": "0", track.language = self.lang
"User-Agent": "Hotstar;in.startv.hotstar/3.3.0 (Android/8.1.0)"
} return tracks
logincode = self.session.post(
url = logincode_url, def get_chapters(self, title):
headers = logincode_headers return []
).json()["description"]["code"]
print(f"Go to tv.hotstar.com and put {logincode}") def certificate(self, **_):
logincode_choice = input('Did you put as informed above? (y/n): ') return None # will use common privacy cert
if logincode_choice.lower() == 'y':
res = self.session.get( def license(self, challenge, **_):
url = logincode_url+logincode, return self.session.post(
headers = logincode_headers url=self.license_api,
) data=challenge # expects bytes
else: ).content
self.log.exit(" - Exited.")
raise # Service specific functions
else:
res = self.session.post( def configure(self):
url=self.config["endpoints"]["login"], self.session.headers.update({
json={ "Origin": "https://www.hotstar.com",
"isProfileRequired": "false", "Referer": f"https://www.hotstar.com/{self.region}"
"userData": { })
"deviceId": self.device_id, self.log.info("Logging into Hotstar")
"password": self.credentials.password, self.hotstar_auth = self.get_akamai()
"username": self.credentials.username, self.log.info(f" + Calculated HotstarAuth: {self.hotstar_auth}")
"usertype": "email" if self.cookies:
}, self.device_id = self.session.cookies.get("deviceId")
"verification": {} self.log.info(f" + Using Device ID: {self.device_id}")
}, else:
headers={ self.device_id = str(uuid.uuid4())
"hotstarauth": self.hotstar_auth, self.log.info(f" + Created Device ID: {self.device_id}")
"content-type": "application/json" self.token = self.get_token()
} self.log.info(" + Obtained tokens")
)
try: @staticmethod
data = res.json() def get_akamai():
except json.JSONDecodeError: enc_key = b"\x05\xfc\x1a\x01\xca\xc9\x4b\xc4\x12\xfc\x53\x12\x07\x75\xf9\xee"
self.log.exit(f" - Failed to get auth token, response was not JSON: {res.text}") st = int(time.time())
raise exp = st + 12000
if "errorCode" in data: res = f"st={st}~exp={exp}~acl=/*"
self.log.exit(f" - Login failed: {data['description']} [{data['errorCode']}]") res += "~hmac=" + hmac.new(enc_key, res.encode(), hashlib.sha256).hexdigest()
raise return res
return data["description"]["userIdentity"]
def get_token(self):
token_cache_path = self.get_cache("token_{profile}.json".format(profile=self.profile))
if os.path.isfile(token_cache_path):
with open(token_cache_path, encoding="utf-8") as fd:
token = json.load(fd)
if token.get("exp", 0) > int(time.time()):
# not expired, lets use
self.log.info(" + Using cached auth tokens...")
return token["uid"]
else:
# expired, refresh
self.log.info(" + Refreshing and using cached auth tokens...")
return self.save_token(self.refresh(token["uid"], token["sub"]["deviceId"]), token_cache_path)
# get new token
if self.cookies:
token = self.session.cookies.get("sessionUserUP", None, 'www.hotstar.com', '/' + self.region)
else:
raise self.log.exit(f" - Please add cookies")
# token = self.login()
return self.save_token(token, token_cache_path)
@staticmethod
def save_token(token, to):
# Decode the JWT data component
data = json.loads(base64.b64decode(token.split(".")[1] + "===").decode("utf-8"))
data["uid"] = token
data["sub"] = json.loads(data["sub"])
os.makedirs(os.path.dirname(to), exist_ok=True)
with open(to, mode="w", encoding="utf-8") as f:
f.write(json.dumps(data, indent=4))
return token
def refresh(self, user_id_token, device_id):
json_data = {
'deeplink_url': f'/{self.region}?client_capabilities=%7B%22ads%22%3A%5B%22non_ssai%22%5D%2C%22audio_channel%22%3A%5B%22stereo%22%5D%2C%22container%22%3A%5B%22fmp4%22%2C%22ts%22%5D%2C%22dvr%22%3A%5B%22short%22%5D%2C%22dynamic_range%22%3A%5B%22sdr%22%5D%2C%22encryption%22%3A%5B%22widevine%22%2C%22plain%22%5D%2C%22ladder%22%3A%5B%22web%22%2C%22tv%22%2C%22phone%22%5D%2C%22package%22%3A%5B%22dash%22%2C%22hls%22%5D%2C%22resolution%22%3A%5B%22sd%22%2C%22hd%22%5D%2C%22video_codec%22%3A%5B%22h264%22%5D%2C%22true_resolution%22%3A%5B%22sd%22%2C%22hd%22%2C%22fhd%22%5D%7D&drm_parameters=%7B%22hdcp_version%22%3A%5B%22HDCP_V2_2%22%5D%2C%22widevine_security_level%22%3A%5B%22SW_SECURE_DECODE%22%5D%2C%22playready_security_level%22%3A%5B%5D%7D',
'app_launch_count': 1,
}
r = self.session.post(
url=self.config["endpoints"]["refresh"],
headers={
'x-hs-usertoken': user_id_token,
'X-HS-Platform': self.config["device"]["platform"]["name"],
'X-Country-Code': self.region,
'X-HS-Accept-language': 'eng',
'X-Request-Id': str(uuid.uuid4()),
'x-hs-device-id': device_id,
'X-HS-Client-Targeting': f'ad_id:{device_id};user_lat:false',
'X-HS-Client': 'platform:web;app_version:23.06.23.3;browser:Firefox;schema_version:0.0.911',
},
json=json_data
)
#self.log.info(r.json())
for cookie in self.cookies:
if cookie.name == 'sessionUserUP' and cookie.path == f"/{self.region}" and cookie.domain == 'www.hotstar.com':
cookie.value = r.headers["x-hs-usertoken"]
for x in self.ALIASES:
cookie_file = os.path.join(directories.cookies, x.lower(), f"{self.profile}.txt")
if not os.path.isfile(cookie_file):
cookie_file = os.path.join(directories.cookies, x, f"{self.profile}.txt")
if os.path.isfile(cookie_file):
self.cookies.save(cookie_file, ignore_discard=True, ignore_expires=True)
break
return r.headers["x-hs-usertoken"]
def login(self):
"""
Log in to HOTSTAR and return a JWT User Identity token.
:returns: JWT User Identity token.
"""
if self.credentials.username == "username" and self.credentials.password == "password":
logincode_url = f"https://api.hotstar.com/{self.region}/aadhar/v2/firetv/{self.region}/users/logincode/"
logincode_headers = {
"Content-Length": "0",
"User-Agent": "Hotstar;in.startv.hotstar/3.3.0 (Android/8.1.0)"
}
logincode = self.session.post(
url=logincode_url,
headers=logincode_headers
).json()["description"]["code"]
print(f"Go to tv.hotstar.com and put {logincode}")
logincode_choice = input('Did you put as informed above? (y/n): ')
if logincode_choice.lower() == 'y':
res = self.session.get(
url=logincode_url + logincode,
headers=logincode_headers
)
else:
self.log.exit(" - Exited.")
raise
else:
res = self.session.post(
url=self.config["endpoints"]["login"],
json={
"isProfileRequired": "false",
"userData": {
"deviceId": self.device_id,
"password": self.credentials.password,
"username": self.credentials.username,
"usertype": "email"
},
"verification": {}
},
headers={
"hotstarauth": self.hotstar_auth,
"content-type": "application/json"
}
)
try:
data = res.json()
except json.JSONDecodeError:
self.log.exit(f" - Failed to get auth token, response was not JSON: {res.text}")
raise
if "errorCode" in data:
self.log.exit(f" - Login failed: {data['description']} [{data['errorCode']}]")
raise
return data["description"]["userIdentity"]

View File

@ -0,0 +1,456 @@
import base64
import hashlib
import hmac
import json
import os
import time
import uuid
import re
import requests
from datetime import datetime
from urllib.parse import urlparse, parse_qs
from urllib.request import urlopen, Request
import http.cookiejar as cookiejar
import click
from vinetrimmer.objects import Title, Tracks
from vinetrimmer.services.BaseService import BaseService
from vinetrimmer.config import config, directories
class Hotstar(BaseService):
"""
Service code for Star India's Hotstar (aka Disney+ Hotstar) streaming service (https://hotstar.com).
\b
Authorization: Credentials
Security: UHD@L3, doesn't seem to care about releases.
\b
Tips: - The library of contents can be viewed without logging in at https://hotstar.com
- The homepage hosts domestic programming; Disney+ content is at https://hotstar.com/in/disneyplus
"""
ALIASES = ["HS", "hotstar"]
#GEOFENCE = ["in"]
TITLE_RE = r"^(?:https?://(?:www\.)?hotstar\.com/[a-z0-9/-]+/)(?P<id>\d+)"
@staticmethod
@click.command(name="Hotstar", short_help="https://hotstar.com")
@click.argument("title", type=str, required=False)
@click.option("-q", "--quality", default="hd",
type=click.Choice(["4k", "fhd", "hd", "sd"], case_sensitive=False),
help="Manifest quality to request.")
@click.option("-c", "--channels", default="5.1", type=click.Choice(["5.1", "2.0", "atmos"], case_sensitive=False),
help="Audio Codec")
@click.option("-rg", "--region", default="in", type=click.Choice(["in", "id", "th"], case_sensitive=False),
help="Account region")
@click.pass_context
def cli(ctx, **kwargs):
return Hotstar(ctx, **kwargs)
def __init__(self, ctx, title, quality, channels, region):
super().__init__(ctx)
self.parse_title(ctx, title)
self.quality = quality
self.channels = channels
self.region = region.lower()
assert ctx.parent is not None
self.vcodec = ctx.parent.params["vcodec"]
self.acodec = ctx.parent.params["acodec"] or "EC3"
self.range = ctx.parent.params["range_"]
self.profile = ctx.obj.profile
self.device_id = None
self.hotstar_auth = None
self.token = None
self.license_api = None
self.configure()
def get_titles(self):
headers = {
"Accept": "*/*",
"Accept-Language": "en-GB,en;q=0.5",
"hotstarauth": self.hotstar_auth,
"X-HS-UserToken": self.token,
"X-HS-Platform": self.config["device"]["platform"]["name"],
"X-HS-AppVersion": self.config["device"]["platform"]["version"],
"X-Country-Code": self.region,
"x-platform-code": "PCTV"
}
try:
r = self.session.get(
url=self.config["endpoints"]["movie_title"],
headers=headers,
params={"contentId": self.title}
)
try:
res = r.json()["body"]["results"]["item"]
except json.JSONDecodeError:
raise ValueError(f"Failed to load title manifest: {res.text}")
except:
r = self.session.get(
url=self.config["endpoints"]["tv_title"],
headers=headers,
params={"contentId": self.title}
)
try:
res = r.json()["body"]["results"]["item"]
except json.JSONDecodeError:
raise ValueError(f"Failed to load title manifest: {res.text}")
if res["assetType"] == "MOVIE":
return Title(
id_=self.title,
type_=Title.Types.MOVIE,
name=res["title"],
year=res["year"],
original_lang=res["langObjs"][0]["iso3code"],
source=self.ALIASES[0],
service_data=res,
)
else:
r = self.session.get(
url=self.config["endpoints"]["tv_episodes"],
headers=headers,
params={
"eid": res["id"],
"etid": "2",
"tao": "0",
"tas": "1000"
}
)
try:
res = r.json()["body"]["results"]["assets"]["items"]
except json.JSONDecodeError:
raise ValueError(f"Failed to load episodes list: {r.text}")
return [Title(
id_=self.title,
type_=Title.Types.TV,
name=x.get("showShortTitle"),
year=x.get("year"),
season=x.get("seasonNo"),
episode=x.get("episodeNo"),
episode_name=x.get("title"),
original_lang=x["langObjs"][0]["iso3code"],
source=self.ALIASES[0],
service_data=x
) for x in res]
def get_tracks(self, title):
if title.service_data.get("parentalRating", 0) > 2:
body = json.dumps({
"devices": [{
"id": self.device_id,
"name": "Chrome Browser on Windows",
"consentProvided": True
}]
})
self.session.post(
url="https://api.hotstar.com/play/v1/consent/content/{id}?".format(id=title.service_data["contentId"]),
headers={
"Accept": "*/*",
"Content-Type": "application/json",
"hotstarauth": self.hotstar_auth,
"X-HS-UserToken": self.token,
"X-HS-Platform": self.config["device"]["platform"]["name"],
"X-HS-AppVersion": self.config["device"]["platform"]["version"],
"X-HS-Request-Id": str(uuid.uuid4()),
"X-Country-Code": self.region
},
data=body
).json()
akamai_cdn=True
count = 1
while akamai_cdn:
r = self.session.post(
url=self.config["endpoints"]["manifest"].format(id=title.service_data["contentId"]),
params={
# TODO: Perhaps set up desired-config to actual desired playback set values?
"desired-config": "|".join([
"audio_channel:stereo",
"container:fmp4",
"dynamic_range:sdr",
"encryption:widevine",
"ladder:tv",
"package:dash",
"resolution:fhd",
"video_codec:h264"
]),
"device-id": self.device_id,
"type": "paid",
},
headers={
"Accept": "*/*",
"hotstarauth": self.hotstar_auth,
"x-hs-usertoken": self.token,
"x-hs-request-id": self.device_id,
"x-country-code": self.region
},
json={
"os_name": "Windows",
"os_version": "10",
"app_name": "web",
"app_version": "7.34.1",
"platform": "Chrome",
"platform_version": "99.0.4844.82",
"client_capabilities": {
"ads": ["non_ssai"],
"audio_channel": ["stereo"],
"dvr": ["short"],
"package": ["dash", "hls"],
"dynamic_range": ["sdr"],
"video_codec": ["h264"],
"encryption": ["widevine"],
"ladder": ["tv"],
"container": ["fmp4", "ts"],
"resolution": ["hd"]
},
"drm_parameters": {
"widevine_security_level": ["SW_SECURE_DECODE", "SW_SECURE_CRYPTO"],
"hdcp_version": ["HDCP_V2_2", "HDCP_V2_1", "HDCP_V2", "HDCP_V1"]
},
"resolution": "auto",
"type": "paid",
}
)
try:
playback_sets = r.json()["data"]["playback_sets"]
except json.JSONDecodeError:
raise ValueError(f"Manifest fetch failed: {r.text}")
# transform tagsCombination into `tags` key-value dictionary for easier usage
playback_sets = [dict(
**x,
tags=dict(y.split(":") for y in x["tags_combination"].lower().split(";"))
) for x in playback_sets]
#self.log.debug(playback_sets)
playback_set = next((
x for x in playback_sets
if x["tags"].get("encryption") == "widevine" or x["tags"].get("encryption") == "plain" # widevine, fairplay, playready
if x["tags"].get("package") == "dash" # dash, hls
if x["tags"].get("container") == "fmp4br" # fmp4, fmp4br, ts
if x["tags"].get("ladder") == "tv" # tv, phone
if x["tags"].get("video_codec").endswith(self.vcodec.lower()) # dvh265, h265, h264 - vp9?
# user defined, may not be available in the tags list:
if x["tags"].get("resolution") in [self.quality, None] # max is fine, -q can choose lower if wanted
if x["tags"].get("dynamic_range") in [self.range.lower(), None] # dv, hdr10, sdr - hdr10+?
if x["tags"].get("audio_codec") in [self.acodec.lower(), None] # ec3, aac - atmos?
if x["tags"].get("audio_channel") in [{"5.1": "dolby51", "2.0": "stereo", "atmos": "atmos"}[self.channels], None]
), None)
if not playback_set:
playback_set = next((
x for x in playback_sets
if x["tags"].get("encryption") == "widevine" or x["tags"].get("encryption") == "plain" # widevine, fairplay, playready
if x["tags"].get("package") == "dash" # dash, hls
if x["tags"].get("ladder") == "tv" # tv, phone
if x["tags"].get("resolution") in [self.quality, None]
), None)
if not playback_set:
raise ValueError("Wanted playback set is unavailable for this title...")
if "licence_url" in playback_set: self.license_api = playback_set["licence_url"]
if playback_set['token_algorithm'] == 'airtel-qwilt-vod' or playback_set['token_algorithm'] == 'AKAMAI-HMAC':
self.log.info(f'Gotcha!')
akamai_cdn = False
else:
self.log.info(f'Finding MPD... {count}')
count += 1
r = Request(playback_set["playback_url"])
r.add_header("user-agent", "Hotstar;in.startv.hotstar/3.3.0 (Android/8.1.0)")
data = urlopen(r).read()
mpd_url = playback_set["playback_url"] # .replace(".hotstar.com", ".akamaized.net")
self.log.debug(mpd_url)
try:
self.session.headers.update({
"Cookie": self.hdntl,
})
except:
pass
tracks = Tracks.from_mpd(
url=mpd_url,
data=data,
session=self.session,
source=self.ALIASES[0]
)
for track in tracks:
track.needs_proxy = True
return tracks
def get_chapters(self, title):
return []
def certificate(self, **_):
return None # will use common privacy cert
def license(self, challenge, **_):
return self.session.post(
url=self.license_api,
data=challenge # expects bytes
).content
# Service specific functions
def configure(self):
self.session.headers.update({
"Origin": "https://www.hotstar.com",
"Referer": f'"https://www.hotstar.com/{self.region}"'
})
self.log.info("Logging into Hotstar")
self.log.info(f'Setting region to "{self.region}"')
self.hotstar_auth = self.get_akamai()
self.log.info(f" + Calculated HotstarAuth: {self.hotstar_auth}")
try:
if self.cookies:
hdntl_cookies = [cookie for cookie in self.session.cookies if cookie.name == 'hdntl']
self.hdntl = f"hdntl={hdntl_cookies[-1].value}"
self.device_id = self.session.cookies.get("deviceId")
self.log.info(f" + Using Device ID: {self.device_id}")
except:
self.device_id = str(uuid.uuid4())
self.log.info(f" + Created Device ID: {self.device_id}")
self.session.headers.update({
"dnt": "1"
})
self.token = self.get_token()
self.log.info(" + Obtained tokens")
@staticmethod
def get_akamai():
enc_key = b"\x05\xfc\x1a\x01\xca\xc9\x4b\xc4\x12\xfc\x53\x12\x07\x75\xf9\xee"
st = int(time.time())
exp = st + 6000
res = f"st={st}~exp={exp}~acl=/*"
res += "~hmac=" + hmac.new(enc_key, res.encode(), hashlib.sha256).hexdigest()
return res
def get_token(self):
token_cache_path = self.get_cache("token_{profile}.json".format(profile=self.profile))
if os.path.isfile(token_cache_path):
with open(token_cache_path, encoding="utf-8") as fd:
token = json.load(fd)
if token.get("exp", 0) > int(time.time()):
# not expired, lets use
self.log.info(" + Using cached auth tokens...")
return token["uid"]
else:
# expired, refresh
self.log.info(" + Refreshing and using cached auth tokens...")
return self.save_token(self.refresh(token["uid"], token["sub"]["deviceId"]), token_cache_path)
# get new token
if self.cookies:
token = self.session.cookies.get("sessionUserUP", None, 'www.hotstar.com', '/' + self.region)
else:
raise self.log.exit(f" - Please add cookies")
# token = self.login()
return self.save_token(token, token_cache_path)
@staticmethod
def save_token(token, to):
# Decode the JWT data component
data = json.loads(base64.b64decode(token.split(".")[1] + "===").decode("utf-8"))
data["uid"] = token
data["sub"] = json.loads(data["sub"])
os.makedirs(os.path.dirname(to), exist_ok=True)
with open(to, "w", encoding="utf-8") as fd:
json.dump(data, fd)
return token
def refresh(self, user_id_token, device_id):
json_data = {
'deeplink_url': f'/{self.region}?client_capabilities=%7B%22ads%22%3A%5B%22non_ssai%22%5D%2C%22audio_channel%22%3A%5B%22stereo%22%5D%2C%22container%22%3A%5B%22fmp4%22%2C%22ts%22%5D%2C%22dvr%22%3A%5B%22short%22%5D%2C%22dynamic_range%22%3A%5B%22sdr%22%5D%2C%22encryption%22%3A%5B%22widevine%22%2C%22plain%22%5D%2C%22ladder%22%3A%5B%22web%22%2C%22tv%22%2C%22phone%22%5D%2C%22package%22%3A%5B%22dash%22%2C%22hls%22%5D%2C%22resolution%22%3A%5B%22sd%22%2C%22hd%22%5D%2C%22video_codec%22%3A%5B%22h264%22%5D%2C%22true_resolution%22%3A%5B%22sd%22%2C%22hd%22%2C%22fhd%22%5D%7D&drm_parameters=%7B%22hdcp_version%22%3A%5B%22HDCP_V2_2%22%5D%2C%22widevine_security_level%22%3A%5B%22SW_SECURE_DECODE%22%5D%2C%22playready_security_level%22%3A%5B%5D%7D',
'app_launch_count': 1,
}
r = self.session.post(
url=self.config["endpoints"]["refresh"],
headers={
'x-hs-usertoken': user_id_token,
'X-HS-Platform': self.config["device"]["platform"]["name"],
'X-Country-Code': self.region,
'X-HS-Accept-language': 'eng',
'X-Request-Id': str(uuid.uuid4()),
'x-hs-device-id': device_id,
'X-HS-Client-Targeting': f'ad_id:{device_id};user_lat:false',
'x-hs-request-id': str(uuid.uuid4()),
'X-HS-Client': 'platform:web;app_version:23.06.23.3;browser:Firefox;schema_version:0.0.911',
'Origin': 'https://www.hotstar.com',
'Referer': f'https://www.hotstar.com/{self.region}',
},
json=json_data
)
for cookie in self.cookies:
if cookie.name == 'sessionUserUP' and cookie.path == f"/{self.region}" and cookie.domain == 'www.hotstar.com':
cookie.value = r.headers["x-hs-usertoken"]
for x in self.ALIASES:
cookie_file = os.path.join(directories.cookies, x.lower(), f"{self.profile}.txt")
if not os.path.isfile(cookie_file):
cookie_file = os.path.join(directories.cookies, x, f"{self.profile}.txt")
if os.path.isfile(cookie_file):
self.cookies.save(cookie_file, ignore_discard=True, ignore_expires=True)
break
return r.headers["x-hs-usertoken"]
def login(self):
"""
Log in to HOTSTAR and return a JWT User Identity token.
:returns: JWT User Identity token.
"""
if self.credentials.username == "username" and self.credentials.password == "password":
logincode_url = f"https://api.hotstar.com/{self.region}/aadhar/v2/firetv/{self.region}/users/logincode/"
logincode_headers = {
"Content-Length": "0",
"User-Agent": "Hotstar;in.startv.hotstar/3.3.0 (Android/8.1.0)"
}
logincode = self.session.post(
url = logincode_url,
headers = logincode_headers
).json()["description"]["code"]
print(f"Go to tv.hotstar.com and put {logincode}")
logincode_choice = input('Did you put as informed above? (y/n): ')
if logincode_choice.lower() == 'y':
res = self.session.get(
url = logincode_url+logincode,
headers = logincode_headers
)
else:
self.log.exit(" - Exited.")
raise
else:
res = self.session.post(
url=self.config["endpoints"]["login"],
json={
"isProfileRequired": "false",
"userData": {
"deviceId": self.device_id,
"password": self.credentials.password,
"username": self.credentials.username,
"usertype": "email"
},
"verification": {}
},
headers={
"hotstarauth": self.hotstar_auth,
"content-type": "application/json"
}
)
try:
data = res.json()
except json.JSONDecodeError:
self.log.exit(f" - Failed to get auth token, response was not JSON: {res.text}")
raise
if "errorCode" in data:
self.log.exit(f" - Login failed: {data['description']} [{data['errorCode']}]")
raise
return data["description"]["userIdentity"]

View File

@ -239,7 +239,7 @@ async def saldl(uri, out, headers=None, proxy=None):
print() print()
async def m3u8dl(uri: str, out: str, track): async def m3u8dl(uri, out, track, headers=None, proxy=None):
executable = shutil.which("N_m3u8DL-RE") or shutil.which("m3u8DL") or "/usr/bin/N_m3u8DL-RE" executable = shutil.which("N_m3u8DL-RE") or shutil.which("m3u8DL") or "/usr/bin/N_m3u8DL-RE"
if not executable: if not executable:
raise EnvironmentError("N_m3u8DL-RE executable not found...") raise EnvironmentError("N_m3u8DL-RE executable not found...")
@ -256,11 +256,17 @@ async def m3u8dl(uri: str, out: str, track):
"--thread-count", "96", "--thread-count", "96",
"--download-retry-count", "8", "--download-retry-count", "8",
"--ffmpeg-binary-path", ffmpeg_binary, "--ffmpeg-binary-path", ffmpeg_binary,
"--binary-merge", "--binary-merge"
] ]
if headers and track.source == "HS":
arguments.extend(["--header", f'"Cookie:{headers["cookie"].replace(" ", "")}"'])
#for k,v in headers.items():
if proxy:
arguments.extend(["--custom-proxy", proxy])
if not ("linux" in platform): if not ("linux" in platform):
arguments.append("--http-request-timeout") arguments.extend(["--http-request-timeout", "8"])
arguments.append("8")
if track.__class__.__name__ == "VideoTrack": if track.__class__.__name__ == "VideoTrack":
if track.height: if track.height:
arguments.extend([ arguments.extend([
@ -294,7 +300,11 @@ async def m3u8dl(uri: str, out: str, track):
raise ValueError(f"{track.__class__.__name__} not supported yet!") raise ValueError(f"{track.__class__.__name__} not supported yet!")
try: try:
p = subprocess.run(arguments, check=True) arg_str = " ".join(arguments)
#print(arg_str)
p = subprocess.run(arg_str, check=True)
#os.system(arg_str)
except subprocess.CalledProcessError: except subprocess.CalledProcessError:
raise ValueError("N_m3u8DL-RE failed too many times, aborting") raise ValueError("N_m3u8DL-RE failed too many times, aborting")
print()
# Removed above call using subprocess due to it failing to correctly pass --header value. The problem might be with spaces within the header string, I think? I know it's not recommended to use os.system but didn't have a choice