2025-03-18 00:17:27 +05:30
from __future__ import annotations
2025-03-21 00:39:48 +05:30
from sys import platform
2025-03-18 00:17:27 +05:30
import base64
import hashlib
import json
import os
import re
import time
from collections import defaultdict
from pathlib import Path
from urllib . parse import urlencode , quote
from typing import Union
from uuid import uuid4
import click
import jsonpickle
import requests
from click import Context
from langcodes import Language
from tldextract import tldextract
from click . core import ParameterSource
from vinetrimmer . objects import TextTrack , Title , Tracks
from vinetrimmer . objects . tracks import MenuTrack
from vinetrimmer . services . BaseService import BaseService
from vinetrimmer . utils import is_close_match
from vinetrimmer . utils . Logger import Logger
from pywidevine import Device
class Amazon ( BaseService ) :
"""
Service code for Amazon VOD ( https : / / amazon . com ) and Amazon Prime Video ( https : / / primevideo . com ) .
\b
Authorization : Cookies
Security : UHD @L1 / SL3000 FHD @L3 ( ChromeCDM ) / SL2000 SD @L3 , Maintains their own license server like Netflix , be cautious .
\b
Region is chosen automatically based on domain extension found in cookies .
Prime Video specific code will be run if the ASIN is detected to be a prime video variant .
Use ' Amazon Video ASIN Display ' for Tampermonkey addon for ASIN
https : / / greasyfork . org / en / scripts / 381997 - amazon - video - asin - display
vt dl - - list - z uk - q 1080 Amazon B09SLGYLK8
"""
ALIASES = [ " AMZN " , " amazon " ]
TITLE_RE = r " ^(?:https?://(?:www \ .)?(?P<domain>amazon \ .(?P<region>com|co \ .uk|de|co \ .jp)|primevideo \ .com)(?:/.+)?/)?(?P<id>[A-Z0-9] { 10,}|amzn1 \ .dv \ .gti \ .[a-f0-9-]+) " # noqa: E501
REGION_TLD_MAP = {
" au " : " com.au " ,
" br " : " com.br " ,
" jp " : " co.jp " ,
" mx " : " com.mx " ,
" tr " : " com.tr " ,
" gb " : " co.uk " ,
" us " : " com " ,
}
VIDEO_RANGE_MAP = {
" SDR " : " None " ,
" HDR10 " : " Hdr10 " ,
" DV " : " DolbyVision " ,
}
@staticmethod
@click.command ( name = " Amazon " , short_help = " https://amazon.com, https://primevideo.com " , help = __doc__ )
@click.argument ( " title " , type = str , required = False )
@click.option ( " -b " , " --bitrate " , default = " CBR " ,
type = click . Choice ( [ " CVBR " , " CBR " , " CVBR+CBR " ] , case_sensitive = False ) ,
help = " Video Bitrate Mode to download in. CVBR=Constrained Variable Bitrate, CBR=Constant Bitrate. " )
@click.option ( " -c " , " --cdn " , default = None , type = str ,
help = " CDN to download from, defaults to the CDN with the highest weight set by Amazon. " )
# UHD, HD, SD. UHD only returns HEVC, ever, even for <=HD only content
2025-03-21 00:39:48 +05:30
@click.option ( " -vq " , " --vquality " , default = " HD " ,
2025-03-18 00:17:27 +05:30
type = click . Choice ( [ " SD " , " HD " , " UHD " ] , case_sensitive = False ) ,
help = " Manifest quality to request. " )
@click.option ( " -s " , " --single " , is_flag = True , default = False ,
help = " Force single episode/season instead of getting series ASIN. " )
@click.option ( " -am " , " --amanifest " , default = " H265 " ,
type = click . Choice ( [ " CVBR " , " CBR " , " H265 " ] , case_sensitive = False ) ,
help = " Manifest to use for audio. Defaults to H265 if the video manifest is missing 640k audio. " )
2025-03-21 00:39:48 +05:30
@click.option ( " -aq " , " --aquality " , default = " HD " ,
2025-03-18 00:17:27 +05:30
type = click . Choice ( [ " SD " , " HD " , " UHD " ] , case_sensitive = False ) ,
help = " Manifest quality to request for audio. Defaults to the same as --quality. " )
@click.option ( " -ism " , " --ism " , is_flag = True , default = False ,
help = " Set manifest override to SmoothStreaming. Defaults to DASH w/o this flag. " )
@click.pass_context
def cli ( ctx , * * kwargs ) :
return Amazon ( ctx , * * kwargs )
def __init__ ( self , ctx , title , bitrate : str , cdn : str , vquality : str , single : bool , amanifest : str , aquality : str , ism : bool ) :
m = self . parse_title ( ctx , title )
self . bitrate = bitrate
self . bitrate_source = ctx . get_parameter_source ( " bitrate " )
self . cdn = cdn
self . vquality = vquality
self . vquality_source = ctx . get_parameter_source ( " vquality " )
self . single = single
self . amanifest = amanifest
self . aquality = aquality
self . ism = ism
super ( ) . __init__ ( ctx )
assert ctx . parent is not None
self . vcodec = ctx . parent . params [ " vcodec " ] or " H264 "
self . range = ctx . parent . params [ " range_ " ] or " SDR "
self . chapters_only = ctx . parent . params [ " chapters_only " ]
self . atmos = ctx . parent . params [ " atmos " ]
self . quality = ctx . parent . params . get ( " quality " ) or 1080
self . cdm = ctx . obj . cdm
self . profile = ctx . obj . profile
2025-03-21 00:39:48 +05:30
if " linux " in platform :
import yaml
#Read YAML file
with open ( " /content/vinetrimmer/config/Services/amazon.yml " , ' r ' ) as stream :
self . config = yaml . safe_load ( stream )
2025-03-18 00:17:27 +05:30
self . region : dict [ str , str ] = { }
self . endpoints : dict [ str , str ] = { }
self . device : dict [ str , str ] = { }
self . pv = False
self . device_token = None
self . device_id : None
self . customer_id = None
self . client_id = " f22dbddb-ef2c-48c5-8876-bed0d47594fd " # browser client id
if self . vquality_source != ParameterSource . COMMANDLINE :
if 0 < self . quality < = 576 and self . range == " SDR " :
self . log . info ( " + Setting manifest quality to SD " )
self . vquality = " SD "
if self . quality > 1080 :
self . log . info ( " + Setting manifest quality to UHD to be able to get 2160p video track " )
self . vquality = " UHD "
self . vcodec = " H265 "
self . vquality = self . vquality or " HD "
if self . bitrate_source != ParameterSource . COMMANDLINE :
if self . vcodec == " H265 " and self . range == " SDR " and self . bitrate != " CVBR+CBR " :
self . bitrate = " CVBR+CBR "
self . log . info ( " + Changed bitrate mode to CVBR+CBR to be able to get H.265 SDR video track " )
if self . vquality == " UHD " and self . range != " SDR " and self . bitrate != " CBR " :
self . bitrate = " CBR "
self . log . info ( f " + Changed bitrate mode to CBR to be able to get highest quality UHD { self . range } video track " )
self . orig_bitrate = self . bitrate
if self . ism :
self . manifestTypeTry = " SmoothStreaming "
self . log . info ( " Setting manifestType to SmoothStreaming (ISM) " )
else :
self . manifestTypeTry = " DASH "
self . log . info ( " Setting manifestType to DASH (MPD) " )
self . configure ( )
# Abstracted functions
def get_titles ( self ) :
res = self . session . get (
url = self . endpoints [ " details " ] ,
params = {
" titleID " : self . title ,
" isElcano " : " 1 " ,
" sections " : [ " Atf " , " Btf " ]
} ,
headers = {
" Accept " : " application/json "
}
)
if not res . ok :
raise self . log . exit ( f " Unable to get title: { res . text } [ { res . status_code } ] " )
data = res . json ( ) [ " widgets " ]
product_details = data . get ( " productDetails " , { } ) . get ( " detail " )
if not product_details :
error = res . json ( ) [ " degradations " ] [ 0 ]
raise self . log . exit ( f " Unable to get title: { error [ ' message ' ] } [ { error [ ' code ' ] } ] " )
titles = [ ]
if data [ " pageContext " ] [ " subPageType " ] == " Movie " :
card = data [ " productDetails " ] [ " detail " ]
titles . append ( Title (
id_ = card [ " catalogId " ] ,
type_ = Title . Types . MOVIE ,
name = product_details [ " title " ] ,
#year=card["releaseYear"],
year = card . get ( " releaseYear " , " " ) ,
# language is obtained afterward
original_lang = None ,
source = self . ALIASES [ 0 ] ,
service_data = card
) )
else :
if ( " titleContent " not in data . keys ( ) ) or ( data [ " titleContent " ] == [ ] ) :
episodes = data [ " episodeList " ] [ " episodes " ]
for episode in episodes :
details = episode [ " detail " ]
titles . append (
Title (
id_ = details [ " catalogId " ] ,
type_ = Title . Types . TV ,
name = product_details [ " parentTitle " ] ,
season = data [ " productDetails " ] [ " detail " ] [ " seasonNumber " ] ,
episode = episode [ " self " ] [ " sequenceNumber " ] ,
episode_name = details [ " title " ] ,
# language is obtained afterward
original_lang = None ,
source = self . ALIASES [ 0 ] ,
service_data = details ,
)
)
if len ( titles ) == 25 :
page_count = 1
pagination_data = data . get ( ' episodeList ' , { } ) . get ( ' actions ' , { } ) . get ( ' pagination ' , [ ] )
token = next ( ( quote ( item . get ( ' token ' ) ) for item in pagination_data if item . get ( ' tokenType ' ) == ' NextPage ' ) , None )
while True :
page_count + = 1
res = self . session . get (
url = self . endpoints [ " getDetailWidgets " ] ,
params = {
" titleID " : self . title ,
" isTvodOnRow " : " 1 " ,
" widgets " : f ' [ {{ " widgetType " : " EpisodeList " , " widgetToken " : " { token } " }} ] '
} ,
headers = {
" Accept " : " application/json "
}
) . json ( )
episodeList = res [ ' widgets ' ] . get ( ' episodeList ' , { } )
for item in episodeList . get ( ' episodes ' , [ ] ) :
episode = int ( item . get ( ' self ' , { } ) . get ( ' sequenceNumber ' , { } ) )
titles . append ( Title (
id_ = item [ " detail " ] [ " catalogId " ] ,
type_ = Title . Types . TV ,
name = product_details [ " parentTitle " ] ,
season = product_details [ " seasonNumber " ] ,
episode = episode ,
episode_name = item [ " detail " ] [ " title " ] ,
# language is obtained afterward
original_lang = None ,
source = self . ALIASES [ 0 ] ,
service_data = item
) )
pagination_data = res [ ' widgets ' ] . get ( ' episodeList ' , { } ) . get ( ' actions ' , { } ) . get ( ' pagination ' , [ ] )
token = next ( ( quote ( item . get ( ' token ' ) ) for item in pagination_data if item . get ( ' tokenType ' ) == ' NextPage ' ) , None )
if not token :
break
else :
cards = [
x [ " detail " ]
for x in data [ " titleContent " ] [ 0 ] [ " cards " ]
if not self . single or
( self . single and self . title in data [ " self " ] [ " asins " ] ) or ( self . single and self . title in data [ " self " ] [ " compactGTI " ] ) or
( self . single and self . title in x [ " self " ] [ " asins " ] ) or ( self . single and self . title == x [ " detail " ] [ " catalogId " ] )
]
for card in cards :
episode_number = card . get ( " episodeNumber " , 0 )
if episode_number != 0 :
titles . append ( Title (
id_ = card [ " catalogId " ] ,
type_ = Title . Types . TV ,
name = product_details [ " parentTitle " ] ,
season = product_details [ " seasonNumber " ] ,
episode = episode_number ,
episode_name = card [ " title " ] ,
# language is obtained afterward
original_lang = None ,
source = self . ALIASES [ 0 ] ,
service_data = card
) )
if not self . single :
temp_title = self . title
temp_single = self . single
self . single = True
for season in data . get ( ' seasonSelector ' , [ ] ) :
season_link = season [ " seasonLink " ]
match = re . search ( r ' /([a-zA-Z0-9]+) \ /ref= ' , season_link ) #extract other season id using re
if match :
extracted_value = match . group ( 1 )
if data [ " self " ] [ " compactGTI " ] == extracted_value : #skip entered asin season data and grab rest id's
continue
self . title = extracted_value
for title in self . get_titles ( ) :
titles . append ( title )
self . title = temp_title
self . single = temp_single
if titles :
# TODO: Needs playback permission on first title, title needs to be available
original_lang = self . get_original_language ( self . get_manifest (
next ( ( x for x in titles if x . type == Title . Types . MOVIE or x . episode > 0 ) , titles [ 0 ] ) ,
video_codec = self . vcodec ,
bitrate_mode = self . bitrate ,
quality = " UHD " ,
ignore_errors = True
) )
if original_lang :
for title in titles :
title . original_lang = Language . get ( original_lang )
else :
#self.log.warning(" - Unable to obtain the title's original language, setting 'en' default...")
for title in titles :
title . original_lang = Language . get ( " en " )
filtered_titles = [ ]
season_episode_count = defaultdict ( int )
for title in titles :
key = ( title . season , title . episode )
if season_episode_count [ key ] < 1 :
filtered_titles . append ( title )
season_episode_count [ key ] + = 1
titles = filtered_titles
return titles
def get_tracks ( self , title : Title ) - > Tracks :
tracks = Tracks ( )
if self . chapters_only :
return [ ]
manifest , chosen_manifest , tracks = self . get_best_quality ( title )
manifest = self . get_manifest (
title ,
video_codec = self . vcodec ,
bitrate_mode = self . bitrate ,
quality = " UHD " ,
hdr = self . range ,
manifest_type = self . manifestTypeTry ,
ignore_errors = False
)
# Move rightsException termination here so that script can attempt continuing
if " rightsException " in manifest [ " returnedTitleRendition " ] [ " selectedEntitlement " ] :
self . log . error ( " - The profile used does not have the rights to this title. " )
return
self . customer_id = manifest [ " returnedTitleRendition " ] [ " selectedEntitlement " ] [ " grantedByCustomerId " ]
default_url_set = manifest [ " playbackUrls " ] [ " urlSets " ] [ manifest [ " playbackUrls " ] [ " defaultUrlSetId " ] ]
encoding_version = default_url_set [ " urls " ] [ " manifest " ] [ " encodingVersion " ]
self . log . info ( f " + Detected encodingVersion= { encoding_version } " )
chosen_manifest = self . choose_manifest ( manifest , self . cdn )
try :
manifest_url = self . clean_mpd_url ( chosen_manifest [ " avUrlInfoList " ] [ 0 ] [ " url " ] , False )
except :
chosen_manifest = default_url_set [ " urls " ] [ " manifest " ]
manifest_url = self . clean_mpd_url ( chosen_manifest [ " url " ] , optimise = False )
self . log . debug ( manifest_url )
self . log . info ( " + Downloading Manifest " )
try :
tracks = Tracks ( [
x for x in iter ( Tracks . from_mpd (
url = manifest_url ,
session = self . session ,
source = self . ALIASES [ 0 ] ,
) )
] )
except ValueError :
tracks = Tracks ( [
x for x in iter ( Tracks . from_ism (
url = manifest_url ,
session = self . session ,
source = self . ALIASES [ 0 ] ,
) )
] )
except :
raise self . log . exit ( f " Unsupported manifest type: { chosen_manifest [ ' streamingTechnology ' ] } \n { manifest_url } " )
need_separate_audio = ( ( self . aquality or self . vquality ) != self . vquality
or self . amanifest == " CVBR " and ( self . vcodec , self . bitrate ) != ( " H264 " , " CVBR " )
or self . amanifest == " CBR " and ( self . vcodec , self . bitrate ) != ( " H264 " , " CBR " )
or self . amanifest == " H265 " and self . vcodec != " H265 "
or self . amanifest != " H265 " and self . vcodec == " H265 " )
if not need_separate_audio :
audios = defaultdict ( list )
for audio in tracks . audios :
audios [ audio . language ] . append ( audio )
for lang in audios :
if not any ( ( x . bitrate or 0 ) > = 640000 for x in audios [ lang ] ) :
need_separate_audio = True
break
if need_separate_audio and not self . atmos :
manifest_type = self . amanifest or " H265 "
self . log . info ( f " Getting audio from { manifest_type } manifest for potential higher bitrate or better codec " )
audio_manifest = self . get_manifest (
title = title ,
video_codec = " H265 " if manifest_type == " H265 " else " H264 " ,
bitrate_mode = " CVBR " if manifest_type != " CBR " else " CBR " ,
quality = self . aquality or self . vquality ,
2025-03-21 00:39:48 +05:30
manifest_type = " DASH " ,
2025-03-18 00:17:27 +05:30
hdr = None ,
ignore_errors = True
)
if not audio_manifest :
self . log . warning ( f " - Unable to get { manifest_type } audio manifests, skipping " )
elif not ( chosen_audio_manifest := self . choose_manifest ( audio_manifest , self . cdn ) ) :
self . log . warning ( f " - No { manifest_type } audio manifests available, skipping " )
else :
audio_mpd_url = self . clean_mpd_url ( chosen_audio_manifest [ " avUrlInfoList " ] [ 0 ] [ " url " ] , optimise = False )
self . log . debug ( audio_mpd_url )
self . log . info ( " + Downloading HEVC manifest " )
try :
audio_mpd = Tracks ( [
x for x in iter ( Tracks . from_mpd (
url = audio_mpd_url ,
session = self . session ,
source = self . ALIASES [ 0 ] ,
) )
] )
except KeyError :
self . log . warning ( f " - Title has no { self . amanifest } stream, cannot get higher quality audio " )
else :
tracks . add ( audio_mpd . audios , warn_only = True ) # expecting possible dupes, ignore
need_uhd_audio = self . atmos
if not self . amanifest and ( ( self . aquality == " UHD " and self . vquality != " UHD " ) or not self . aquality ) :
audios = defaultdict ( list )
for audio in tracks . audios :
audios [ audio . language ] . append ( audio )
for lang in audios :
if not any ( ( x . bitrate or 0 ) > = 640000 for x in audios [ lang ] ) :
need_uhd_audio = True
break
if need_uhd_audio and ( self . config . get ( " device " ) or { } ) . get ( self . profile , None ) :
self . log . info ( " Getting audio from UHD manifest for potential higher bitrate or better codec " )
temp_device = self . device
temp_device_token = self . device_token
temp_device_id = self . device_id
uhd_audio_manifest = None
if ( self . cdm . device . type == Device . Types . CHROME if " common_privacy_cert " in dir ( self . cdm ) else True ) and self . quality < 2160 :
self . log . info ( f " + Switching to device to get UHD manifest " )
self . register_device ( )
uhd_audio_manifest = self . get_manifest (
title = title ,
video_codec = " H265 " ,
bitrate_mode = " CVBR+CBR " ,
quality = " UHD " ,
hdr = " DV " , # Needed for 576kbps Atmos sometimes
manifest_type = " DASH " ,
ignore_errors = True
)
self . log . debug ( uhd_audio_manifest )
self . device = temp_device
self . device_token = temp_device_token
self . device_id = temp_device_id
if not uhd_audio_manifest :
self . log . warning ( f " - Unable to get UHD manifests, skipping " )
elif not ( chosen_uhd_audio_manifest := self . choose_manifest ( uhd_audio_manifest , self . cdn ) ) :
self . log . warning ( f " - No UHD manifests available, skipping " )
else :
uhd_audio_mpd_url = self . clean_mpd_url ( chosen_uhd_audio_manifest [ " avUrlInfoList " ] [ 0 ] [ " url " ] , optimise = False )
self . log . debug ( uhd_audio_mpd_url )
self . log . info ( " + Downloading UHD manifest " )
try :
uhd_audio_mpd = Tracks ( [
x for x in iter ( Tracks . from_mpd (
url = uhd_audio_mpd_url ,
session = self . session ,
source = self . ALIASES [ 0 ] ,
) )
] )
except ValueError :
uhd_audio_mpd = Tracks ( [
x for x in iter ( Tracks . from_ism (
url = uhd_audio_mpd_url ,
session = self . session ,
source = self . ALIASES [ 0 ] ,
) )
] )
except KeyError :
self . log . warning ( f " - Title has no UHD stream, cannot get higher quality audio " )
# replace the audio tracks with DV manifest version if atmos is present
if any ( x for x in uhd_audio_mpd . audios if x . atmos ) :
2025-03-21 00:39:48 +05:30
print ( " Hello " )
tracks . audios = uhd_audio_mpd . audios
2025-03-18 00:17:27 +05:30
for video in tracks . videos :
try :
video . hdr10 = chosen_manifest [ " hdrFormat " ] == " Hdr10 "
video . dv = chosen_manifest [ " hdrFormat " ] == " DolbyVision "
except :
video . hdr10 = chosen_manifest [ " dynamicRange " ] == " Hdr10 "
video . dv = chosen_manifest [ " dynamicRange " ] == " DolbyVision "
for audio in tracks . audios :
audio . descriptive = audio . extra [ 1 ] . get ( " audioTrackSubtype " ) == " descriptive " or audio . extra [ 1 ] . get ( " AudioTrackSubtype " ) == " descriptive "
# Amazon @lang is just the lang code, no dialect, @audioTrackId has it.
audio_track_id = audio . extra [ 1 ] . get ( " audioTrackId " ) or audio . extra [ 1 ] . get ( " AudioTrackId " )
if audio_track_id :
audio . language = Language . get ( audio_track_id . split ( " _ " ) [ 0 ] ) # e.g. es-419_ec3_blabla
for sub in manifest . get ( " subtitleUrls " , [ ] ) + manifest . get ( " forcedNarratives " , [ ] ) :
tracks . add ( TextTrack (
id_ = sub . get (
" timedTextTrackId " ,
f " { sub [ ' languageCode ' ] } _ { sub [ ' type ' ] } _ { sub [ ' subtype ' ] } _ { sub [ ' index ' ] } "
) ,
source = self . ALIASES [ 0 ] ,
url = os . path . splitext ( sub [ " url " ] ) [ 0 ] + " .srt " , # DFXP -> SRT forcefully seems to work fine
# metadata
codec = " srt " , # sub["format"].lower(),
language = sub [ " languageCode " ] ,
#is_original_lang=title.original_lang and is_close_match(sub["languageCode"], [title.original_lang]),
forced = " forced " in sub [ " displayName " ] ,
sdh = sub [ " type " ] . lower ( ) == " sdh " # TODO: what other sub types? cc? forced?
) , warn_only = True ) # expecting possible dupes, ignore
return tracks
def get_chapters ( self , title : Title ) - > list [ MenuTrack ] :
""" Get chapters from Amazon ' s XRay Scenes API. """
manifest = self . get_manifest (
title ,
video_codec = self . vcodec ,
bitrate_mode = self . bitrate ,
quality = " UHD " ,
manifest_type = self . manifestTypeTry ,
hdr = self . range
)
if " xrayMetadata " in manifest :
xray_params = manifest [ " xrayMetadata " ] [ " parameters " ]
elif self . chapters_only :
xray_params = {
" pageId " : " fullScreen " ,
" pageType " : " xray " ,
" serviceToken " : json . dumps ( {
" consumptionType " : " Streaming " ,
" deviceClass " : " normal " ,
" playbackMode " : " playback " ,
" vcid " : manifest [ " returnedTitleRendition " ] [ " contentId " ] ,
} )
}
else :
return [ ]
xray_params . update ( {
" deviceID " : self . device_id ,
" deviceTypeID " : self . config [ " device_types " ] [ " browser " ] , # must be browser device type
" marketplaceID " : self . region [ " marketplace_id " ] ,
" gascEnabled " : str ( self . pv ) . lower ( ) ,
" decorationScheme " : " none " ,
" version " : " inception-v2 " ,
" uxLocale " : " en-US " ,
" featureScheme " : " XRAY_WEB_2020_V1 "
} )
xray = self . session . get (
url = self . endpoints [ " xray " ] ,
params = xray_params
) . json ( ) . get ( " page " )
if not xray :
return [ ]
widgets = xray [ " sections " ] [ " center " ] [ " widgets " ] [ " widgetList " ]
scenes = next ( ( x for x in widgets if x [ " tabType " ] == " scenesTab " ) , None )
if not scenes :
return [ ]
scenes = scenes [ " widgets " ] [ " widgetList " ] [ 0 ] [ " items " ] [ " itemList " ]
chapters = [ ]
for scene in scenes :
chapter_title = scene [ " textMap " ] [ " PRIMARY " ]
match = re . search ( r " ( \ d+ \ . |)(.+) " , chapter_title )
if match :
chapter_title = match . group ( 2 )
chapters . append ( MenuTrack (
number = int ( scene [ " id " ] . replace ( " /xray/scene/ " , " " ) ) ,
title = chapter_title ,
timecode = scene [ " textMap " ] [ " TERTIARY " ] . replace ( " Starts at " , " " )
) )
return chapters
def certificate ( self , * * _ ) :
return self . config [ " certificate " ]
def license ( self , challenge : Union [ bytes , str ] , title : Title , * * _ ) :
lic_challenge = base64 . b64encode ( challenge ) . decode ( " utf-8 " ) if isinstance ( challenge , bytes ) else base64 . b64encode ( challenge . encode ( " utf-8 " ) ) . decode ( " utf-8 " )
self . log . debug ( f " Challenge - { lic_challenge } " )
try :
lic = self . session . post (
url = self . endpoints [ " licence " ] ,
params = {
" asin " : title . id ,
" consumptionType " : " Streaming " ,
" desiredResources " : " PlayReadyLicense " ,
" deviceTypeID " : self . device [ " device_type " ] ,
" deviceID " : self . device_id ,
" firmware " : 1 ,
" gascEnabled " : str ( self . pv ) . lower ( ) ,
" marketplaceID " : self . region [ " marketplace_id " ] ,
" resourceUsage " : " ImmediateConsumption " ,
" videoMaterialType " : " Feature " ,
" operatingSystemName " : " Linux " if self . vquality == " SD " else " Windows " ,
" operatingSystemVersion " : " unknown " if self . vquality == " SD " else " 10.0 " ,
" customerID " : self . customer_id ,
" deviceDrmOverride " : " Playready " ,
" deviceStreamingTechnologyOverride " : " SmoothStreaming " ,
" deviceVideoQualityOverride " : self . vquality ,
" deviceHdrFormatsOverride " : self . VIDEO_RANGE_MAP . get ( self . range , " None " ) ,
} ,
headers = {
" Accept " : " application/json " ,
" Content-Type " : " application/x-www-form-urlencoded " ,
" Authorization " : f " Bearer { self . device_token } "
} ,
data = {
" playReadyChallenge " : lic_challenge , # expects base64
" includeHdcpTestKeyInLicense " : " true "
}
) . json ( )
if " errorsByResource " in lic :
error_code = lic [ " errorsByResource " ] [ " PlayReadyLicense " ]
self . log . debug ( error_code )
if " errorCode " in error_code :
error_code = error_code [ " errorCode " ]
elif " type " in error_code :
error_code = error_code [ " type " ]
if error_code == " PRS.NoRights.AnonymizerIP " :
raise self . log . exit ( " - Amazon detected a Proxy/VPN and refused to return a license! " )
message = lic [ " errorsByResource " ] [ " PlayReadyLicense " ] [ " message " ]
raise self . log . exit ( f " - Amazon reported an error during the License request: { message } [ { error_code } ] " )
if " error " in lic :
error_code = lic [ " error " ]
if " errorCode " in error_code :
error_code = error_code [ " errorCode " ]
elif " type " in error_code :
error_code = error_code [ " type " ]
if error_code == " PRS.NoRights.AnonymizerIP " :
raise self . log . exit ( " - Amazon detected a Proxy/VPN and refused to return a license! " )
message = lic [ " error " ] [ " message " ]
raise self . log . exit ( f " - Amazon reported an error during the License request: { message } [ { error_code } ] " )
except :
lic = self . session . post (
url = self . endpoints [ " licence " ] ,
params = {
" asin " : title . id ,
" consumptionType " : " Streaming " ,
" desiredResources " : " PlayReadyLicense " ,
" deviceTypeID " : self . device [ " device_type " ] ,
" deviceID " : self . device_id ,
" firmware " : 1 ,
" gascEnabled " : str ( self . pv ) . lower ( ) ,
" marketplaceID " : self . region [ " marketplace_id " ] ,
" resourceUsage " : " ImmediateConsumption " ,
" videoMaterialType " : " Feature " ,
" operatingSystemName " : " Linux " if self . vquality == " SD " else " Windows " ,
" operatingSystemVersion " : " unknown " if self . vquality == " SD " else " 10.0 " ,
" customerID " : self . customer_id ,
" deviceDrmOverride " : " Playready " , #CENC or Playready
" deviceStreamingTechnologyOverride " : " DASH " ,
" deviceVideoQualityOverride " : self . vquality ,
" deviceHdrFormatsOverride " : self . VIDEO_RANGE_MAP . get ( self . range , " None " ) ,
} ,
headers = {
" Accept " : " application/json " ,
" Content-Type " : " application/x-www-form-urlencoded " ,
" Authorization " : f " Bearer { self . device_token } "
} ,
data = {
" playReadyChallenge " : lic_challenge , # expects base64
" includeHdcpTestKeyInLicense " : " true "
}
) . json ( )
if " errorsByResource " in lic :
error_code = lic [ " errorsByResource " ] [ " PlayReadyLicense " ]
self . log . debug ( error_code )
if " errorCode " in error_code :
error_code = error_code [ " errorCode " ]
elif " type " in error_code :
error_code = error_code [ " type " ]
if error_code == " PRS.NoRights.AnonymizerIP " :
raise self . log . exit ( " - Amazon detected a Proxy/VPN and refused to return a license! " )
message = lic [ " errorsByResource " ] [ " PlayReadyLicense " ] [ " message " ]
raise self . log . exit ( f " - Amazon reported an error during the License request: { message } [ { error_code } ] " )
if " error " in lic :
error_code = lic [ " error " ]
if " errorCode " in error_code :
error_code = error_code [ " errorCode " ]
elif " type " in error_code :
error_code = error_code [ " type " ]
if error_code == " PRS.NoRights.AnonymizerIP " :
raise self . log . exit ( " - Amazon detected a Proxy/VPN and refused to return a license! " )
message = lic [ " error " ] [ " message " ]
raise self . log . exit ( f " - Amazon reported an error during the License request: { message } [ { error_code } ] " )
#self.log.debug(lic["playReadyLicense"]["encodedLicenseResponse"])
return base64 . b64decode ( lic [ " playReadyLicense " ] [ " encodedLicenseResponse " ] . encode ( " utf-8 " ) ) . decode ( " utf-8 " ) # Return Xml licence
# Service specific functions
def configure ( self ) - > None :
if len ( self . title ) > 10 :
self . pv = True
self . log . info ( " Getting Account Region " )
self . region = self . get_region ( )
if not self . region :
raise self . log . exit ( " - Failed to get Amazon Account region " )
self . GEOFENCE . append ( self . region [ " code " ] )
self . log . info ( f " + Region: { self . region [ ' code ' ] } " )
# endpoints must be prepared AFTER region data is retrieved
self . endpoints = self . prepare_endpoints ( self . config [ " endpoints " ] , self . region )
self . session . headers . update ( {
" Origin " : f " https:// { self . region [ ' base ' ] } "
} )
self . device = ( self . config . get ( " device " ) or { } ) . get ( self . profile , { } )
if ( self . quality > 1080 or self . range != " SDR " ) and self . vcodec == " H265 " and ( self . cdm . device . type == Device . Types . CHROME if " common_privacy_cert " in dir ( self . cdm ) else True ) :
self . log . info ( f " Using device to get UHD manifests " )
self . register_device ( )
elif not self . device or self . vquality != " UHD " or ( self . cdm . device . type == Device . Types . CHROME if " common_privacy_cert " in dir ( self . cdm ) else False ) :
# falling back to browser-based device ID
if not self . device :
self . log . warning (
" No Device information was provided for %s , using browser device... " ,
self . profile
)
self . device_id = hashlib . sha224 (
( " CustomerID " + self . session . headers [ " User-Agent " ] ) . encode ( " utf-8 " )
) . hexdigest ( )
self . device = { " device_type " : self . config [ " device_types " ] [ " browser " ] }
else :
self . register_device ( )
def register_device ( self ) - > None :
self . device = ( self . config . get ( " device " ) or { } ) . get ( self . profile , { } )
device_cache_path = self . get_cache ( " device_tokens_ {profile} _ {hash} .json " . format (
profile = self . profile ,
hash = hashlib . md5 ( json . dumps ( self . device ) . encode ( ) ) . hexdigest ( ) [ 0 : 6 ]
) )
self . device_token = self . DeviceRegistration (
device = self . device ,
endpoints = self . endpoints ,
log = self . log ,
cache_path = device_cache_path ,
session = self . session
) . bearer
self . device_id = self . device . get ( " device_serial " )
if not self . device_id :
raise self . log . exit ( f " - A device serial is required in the config, perhaps use: { os . urandom ( 8 ) . hex ( ) } " )
def get_region ( self ) - > dict :
domain_region = self . get_domain_region ( )
if not domain_region :
return { }
region = self . config [ " regions " ] . get ( domain_region )
if not region :
raise self . log . exit ( f " - There ' s no region configuration data for the region: { domain_region } " )
region [ " code " ] = domain_region
if self . pv :
res = self . session . get ( " https://www.primevideo.com " ) . text
match = re . search ( r ' ue_furl *= *([ \' " ])fls-(na|eu|fe) \ .amazon \ .[a-z.]+ \ 1 ' , res )
if match :
pv_region = match . group ( 2 ) . lower ( )
else :
raise self . log . exit ( " - Failed to get PrimeVideo region " )
pv_region = { " na " : " atv-ps " } . get ( pv_region , f " atv-ps- { pv_region } " )
region [ " base_manifest " ] = f " { pv_region } .primevideo.com "
region [ " base " ] = " www.primevideo.com "
return region
def get_domain_region ( self ) :
""" Get the region of the cookies from the domain. """
tlds = [ tldextract . extract ( x . domain ) for x in self . cookies if x . domain_specified ]
tld = next ( ( x . suffix for x in tlds if x . domain . lower ( ) in ( " amazon " , " primevideo " ) ) , None )
if tld :
tld = tld . split ( " . " ) [ - 1 ]
return { " com " : " us " , " uk " : " gb " } . get ( tld , tld )
def prepare_endpoint ( self , name : str , uri : str , region : dict ) - > str :
if name in ( " browse " , " playback " , " licence " , " xray " ) :
return f " https:// { ( region [ ' base_manifest ' ] ) } { uri } "
if name in ( " ontv " , " devicelink " , " details " , " getDetailWidgets " ) :
if self . pv :
host = " www.primevideo.com "
else :
host = region [ " base " ]
return f " https:// { host } { uri } "
if name in ( " codepair " , " register " , " token " ) :
return f " https:// { self . config [ ' regions ' ] [ ' us ' ] [ ' base_api ' ] } { uri } "
raise ValueError ( f " Unknown endpoint: { name } " )
def prepare_endpoints ( self , endpoints : dict , region : dict ) - > dict :
return { k : self . prepare_endpoint ( k , v , region ) for k , v in endpoints . items ( ) }
def choose_manifest ( self , manifest : dict , cdn = None ) :
""" Get manifest URL for the title based on CDN weight (or specified CDN). """
if cdn :
cdn = cdn . lower ( )
manifest = next ( ( x for x in manifest [ " audioVideoUrls " ] [ " avCdnUrlSets " ] if x [ " cdn " ] . lower ( ) == cdn ) , { } )
if not manifest :
raise self . log . exit ( f " - There isn ' t any manifests available on the CDN \" { cdn } \" for this title " )
else :
manifest = next ( ( x for x in sorted ( [ x for x in manifest [ " audioVideoUrls " ] [ " avCdnUrlSets " ] ] , key = lambda x : int ( x [ " cdnWeightsRank " ] ) ) ) , { } )
return manifest
def get_manifest (
self ,
title : Title ,
video_codec : str ,
bitrate_mode : str ,
quality : str ,
manifest_type : str = None ,
hdr = None ,
ignore_errors : bool = False
) - > dict :
res = self . session . get (
url = self . endpoints [ " playback " ] ,
params = {
" asin " : title . id ,
" consumptionType " : " Streaming " ,
" desiredResources " : " , " . join ( [
" PlaybackUrls " ,
" AudioVideoUrls " ,
" CatalogMetadata " ,
" ForcedNarratives " ,
" SubtitlePresets " ,
" SubtitleUrls " ,
" TransitionTimecodes " ,
" TrickplayUrls " ,
" CuepointPlaylist " ,
" XRayMetadata " ,
" PlaybackSettings " ,
] ) ,
" deviceID " : self . device_id ,
" deviceTypeID " : self . device [ " device_type " ] ,
" firmware " : 1 ,
" gascEnabled " : str ( self . pv ) . lower ( ) ,
" marketplaceID " : self . region [ " marketplace_id " ] ,
" resourceUsage " : " CacheResources " ,
" videoMaterialType " : " Feature " ,
" playerType " : " html5 " ,
" clientId " : self . client_id ,
* * ( {
" operatingSystemName " : " Linux " if quality == " SD " else " Windows " ,
" operatingSystemVersion " : " unknown " if quality == " SD " else " 10.0 " ,
} if not self . device_token else { } ) ,
" deviceDrmOverride " : " Playready " if manifest_type and ( manifest_type == " SmoothStreaming " ) else " CENC " ,
" deviceStreamingTechnologyOverride " : manifest_type if manifest_type else " DASH " ,
" deviceProtocolOverride " : " Https " ,
" deviceVideoCodecOverride " : video_codec ,
" deviceBitrateAdaptationsOverride " : bitrate_mode . replace ( " + " , " , " ) ,
" deviceVideoQualityOverride " : quality ,
" deviceHdrFormatsOverride " : self . VIDEO_RANGE_MAP . get ( hdr , " None " ) ,
" supportedDRMKeyScheme " : " DUAL_KEY " , # ?
" liveManifestType " : " live,accumulating " , # ?
" titleDecorationScheme " : " primary-content " ,
" subtitleFormat " : " TTMLv2 " ,
" languageFeature " : " MLFv2 " , # ?
" uxLocale " : " en_US " ,
" xrayDeviceClass " : " normal " ,
" xrayPlaybackMode " : " playback " ,
" xrayToken " : " XRAY_WEB_2020_V1 " ,
" playbackSettingsFormatVersion " : " 1.0.0 " ,
" playerAttributes " : json . dumps ( { " frameRate " : " HFR " } ) ,
# possibly old/unused/does nothing:
" audioTrackId " : " all " ,
} ,
headers = {
" Authorization " : f " Bearer { self . device_token } " if self . device_token else None ,
} ,
)
try :
manifest = res . json ( )
except json . JSONDecodeError :
if ignore_errors :
return { }
raise self . log . exit ( " - Amazon didn ' t return JSON data when obtaining the Playback Manifest. " )
if " error " in manifest :
if ignore_errors :
return { }
raise self . log . exit ( " - Amazon reported an error when obtaining the Playback Manifest. " )
# Commented out as we move the rights exception check elsewhere
# if "rightsException" in manifest["returnedTitleRendition"]["selectedEntitlement"]:
# if ignore_errors:
# return {}
# raise self.log.exit(" - The profile used does not have the rights to this title.")
# Below checks ignore NoRights errors
if (
manifest . get ( " errorsByResource " , { } ) . get ( " PlaybackUrls " ) and
manifest [ " errorsByResource " ] [ " PlaybackUrls " ] . get ( " errorCode " ) != " PRS.NoRights.NotOwned "
) :
if ignore_errors :
return { }
error = manifest [ " errorsByResource " ] [ " PlaybackUrls " ]
raise self . log . exit ( f " - Amazon had an error with the Playback Urls: { error [ ' message ' ] } [ { error [ ' errorCode ' ] } ] " )
if (
manifest . get ( " errorsByResource " , { } ) . get ( " AudioVideoUrls " ) and
manifest [ " errorsByResource " ] [ " AudioVideoUrls " ] . get ( " errorCode " ) != " PRS.NoRights.NotOwned "
) :
if ignore_errors :
return { }
error = manifest [ " errorsByResource " ] [ " AudioVideoUrls " ]
raise self . log . exit ( f " - Amazon had an error with the A/V Urls: { error [ ' message ' ] } [ { error [ ' errorCode ' ] } ] " )
#self.log.debug(manifest)
return manifest
@staticmethod
def get_original_language ( manifest ) :
""" Get a title ' s original language from manifest data. """
try :
return next (
x [ " language " ] . replace ( " _ " , " - " )
for x in manifest [ " catalogMetadata " ] [ " playback " ] [ " audioTracks " ]
if x [ " isOriginalLanguage " ]
)
except ( KeyError , StopIteration ) :
pass
if " defaultAudioTrackId " in manifest . get ( " playbackUrls " , { } ) :
try :
return manifest [ " playbackUrls " ] [ " defaultAudioTrackId " ] . split ( " _ " ) [ 0 ]
except IndexError :
pass
try :
return sorted (
manifest [ " audioVideoUrls " ] [ " audioTrackMetadata " ] ,
key = lambda x : x [ " index " ]
) [ 0 ] [ " languageCode " ]
except ( KeyError , IndexError ) :
pass
return None
@staticmethod
def clean_mpd_url ( mpd_url , optimise ) :
""" Clean up an Amazon MPD manifest url. """
if optimise :
return mpd_url . replace ( " ~ " , " " ) + " ?encoding=segmentBase "
if match := re . match ( r " (https?://.*/)d.?/.*~/(.*) " , mpd_url ) :
return " " . join ( match . groups ( ) )
elif match := re . match ( r " (https?://.*/)d.?/.* \ $.*?/(.*) " , mpd_url ) :
return " " . join ( match . groups ( ) )
else :
try :
mpd_url = " " . join (
re . split ( r " (?i)(/) " , mpd_url ) [ : 5 ] + re . split ( r " (?i)(/) " , mpd_url ) [ 9 : ]
)
except IndexError :
self . log . warning ( " Unable to parse manifest URL " )
return mpd_url
raise ValueError ( " Unable to parse manifest URL " )
def get_best_quality ( self , title ) :
"""
Choose the best quality manifest from CBR / CVBR
"""
track_list = [ ]
bitrates = [ self . orig_bitrate ]
if self . vcodec != " H265 " :
bitrates = self . orig_bitrate . split ( ' + ' )
for bitrate in bitrates :
manifest = self . get_manifest (
title ,
video_codec = self . vcodec ,
bitrate_mode = bitrate ,
quality = " UHD " ,
hdr = self . range ,
ignore_errors = False
)
if not manifest :
self . log . warning ( f " Skipping { bitrate } manifest due to error " )
continue
# return three empty objects if a rightsException error exists to correlate to manifest, chosen_manifest, tracks
if " rightsException " in manifest [ " returnedTitleRendition " ] [ " selectedEntitlement " ] :
return None , None , None
self . customer_id = manifest [ " returnedTitleRendition " ] [ " selectedEntitlement " ] [ " grantedByCustomerId " ]
default_url_set = manifest [ " playbackUrls " ] [ " urlSets " ] [ manifest [ " playbackUrls " ] [ " defaultUrlSetId " ] ]
encoding_version = default_url_set [ " urls " ] [ " manifest " ] [ " encodingVersion " ]
self . log . info ( f " + Detected encodingVersion= { encoding_version } " )
#self.log.debug(manifest)
chosen_manifest = self . choose_manifest ( manifest , self . cdn )
try :
mpd_url = self . clean_mpd_url ( chosen_manifest [ " avUrlInfoList " ] [ 0 ] [ " url " ] , optimise = False )
except :
chosen_manifest = default_url_set [ " urls " ] [ " manifest " ]
mpd_url = self . clean_mpd_url ( chosen_manifest [ " url " ] , optimise = False )
self . log . debug ( mpd_url )
self . log . info ( f " + Downloading { bitrate } Manifest " )
self . log . debug ( f " Obtained Manifest Type: { chosen_manifest [ ' streamingTechnology ' ] } " )
try :
tracks = Tracks ( [
x for x in iter ( Tracks . from_mpd (
url = mpd_url ,
session = self . session ,
source = self . ALIASES [ 0 ] ,
) )
] )
except ValueError :
tracks = Tracks ( [
x for x in iter ( Tracks . from_ism (
url = mpd_url ,
session = self . session ,
source = self . ALIASES [ 0 ] ,
) )
] )
except :
raise self . log . exit ( f " Unsupported manifest type: { chosen_manifest [ ' streamingTechnology ' ] } \n { mpd_url } " )
for video in tracks . videos :
video . note = bitrate
max_size = max ( tracks . videos , key = lambda x : int ( x . size or 0 ) ) . size
track_list . append ( {
' bitrate ' : bitrate ,
' max_size ' : max_size ,
' manifest ' : manifest ,
' chosen_manifest ' : chosen_manifest ,
' tracks ' : tracks
} )
best_quality = max ( track_list , key = lambda x : x [ ' max_size ' ] )
if len ( self . bitrate . split ( ' + ' ) ) > 1 :
self . bitrate = best_quality [ ' bitrate ' ]
self . log . info ( " Selected video manifest bitrate: %s " , best_quality [ ' bitrate ' ] )
return best_quality [ ' manifest ' ] , best_quality [ ' chosen_manifest ' ] , best_quality [ ' tracks ' ]
# Service specific classes
class DeviceRegistration :
def __init__ ( self , device : dict , endpoints : dict , cache_path : Path , session : requests . Session , log : Logger ) :
self . session = session
self . device = device
self . endpoints = endpoints
self . cache_path = Path ( cache_path )
self . log = log
self . device = { k : str ( v ) if not isinstance ( v , str ) else v for k , v in self . device . items ( ) }
self . bearer = None
if os . path . isfile ( self . cache_path ) :
with open ( self . cache_path , encoding = " utf-8 " ) as fd :
cache = jsonpickle . decode ( fd . read ( ) )
#self.device["device_serial"] = cache["device_serial"]
#if cache.get("expires_in", 0) > int(time.time()):
# # not expired, lets use
# self.log.info(" + Using cached device bearer")
# self.bearer = cache["access_token"]
#else:
# expired, refresh
self . log . info ( " Refreshing cached device bearer... " )
refreshed_tokens = self . refresh ( self . device , cache [ " refresh_token " ] , cache [ " access_token " ] )
refreshed_tokens [ " refresh_token " ] = cache [ " refresh_token " ]
# expires_in seems to be in minutes, create a unix timestamp and add the minutes in seconds
refreshed_tokens [ " expires_in " ] = int ( time . time ( ) ) + int ( refreshed_tokens [ " expires_in " ] )
with open ( self . cache_path , " w " , encoding = " utf-8 " ) as fd :
fd . write ( jsonpickle . encode ( refreshed_tokens ) )
self . bearer = refreshed_tokens [ " access_token " ]
else :
self . log . info ( " + Registering new device bearer " )
self . bearer = self . register ( self . device )
def register ( self , device : dict ) - > dict :
"""
Register device to the account
: param device : Device data to register
: return : Device bearer tokens
"""
# OnTV csrf
csrf_token = self . get_csrf_token ( )
# Code pair
code_pair = self . get_code_pair ( device )
# Device link
response = self . session . post (
url = self . endpoints [ " devicelink " ] ,
headers = {
" Accept " : " */* " ,
" Accept-Language " : " en-US,en;q=0.9,es-US;q=0.8,es;q=0.7 " , # needed?
" Content-Type " : " application/x-www-form-urlencoded " ,
" Referer " : self . endpoints [ " ontv " ]
} ,
params = urlencode ( {
# any reason it urlencodes here? requests can take a param dict...
" ref_ " : " atv_set_rd_reg " ,
" publicCode " : code_pair [ " public_code " ] , # public code pair
" token " : csrf_token # csrf token
} )
)
if response . status_code != 200 :
raise self . log . exit ( f " Unexpected response with the codeBasedLinking request: { response . text } [ { response . status_code } ] " )
# Register
response = self . session . post (
url = self . endpoints [ " register " ] ,
headers = {
" Content-Type " : " application/json " ,
" Accept-Language " : " en-US "
} ,
json = {
" auth_data " : {
" code_pair " : code_pair
} ,
" registration_data " : device ,
" requested_token_type " : [ " bearer " ] ,
" requested_extensions " : [ " device_info " , " customer_info " ]
} ,
cookies = None # for some reason, may fail if cookies are present. Odd.
)
if response . status_code != 200 :
raise self . log . exit ( f " Unable to register: { response . text } [ { response . status_code } ] " )
bearer = response . json ( ) [ " response " ] [ " success " ] [ " tokens " ] [ " bearer " ]
bearer [ " expires_in " ] = int ( time . time ( ) ) + int ( bearer [ " expires_in " ] )
# Cache bearer
os . makedirs ( os . path . dirname ( self . cache_path ) , exist_ok = True )
with open ( self . cache_path , " w " , encoding = " utf-8 " ) as fd :
fd . write ( jsonpickle . encode ( bearer ) )
return bearer [ " access_token " ]
def refresh ( self , device : dict , refresh_token : str , access_token : str ) - > dict :
"""
json3 = {
' app_name ' : ' ioBroker Alexa2 ' ,
' app_version ' : ' 2.2.556530.0 ' ,
' di.sdk.version ' : ' 6.12.4 ' ,
' source_token ' : refresh_token ,
' package_name ' : ' com.amazon.echo ' ,
' di.hw.version ' : ' iPhone ' ,
' platform ' : ' iOS ' ,
' requested_token_type ' : ' access_token ' ,
' source_token_type ' : ' refresh_token ' ,
' di.os.name ' : ' iOS ' ,
' di.os.version ' : ' 16.6 ' ,
' current_version ' : ' 6.12.4 '
}
json4 = {
' di.os.name ' : ' iOS ' ,
' app_version ' : ' 2.2.223830.0 ' ,
' domain ' : ' . ' + ' api.amazon.com ' ,
' source_token ' : refresh_token ,
' requested_token_type ' : ' auth_cookies ' ,
' source_token_type ' : ' refresh_token ' ,
' di.hw.version ' : ' iPhone ' ,
' di.sdk.version ' : ' 6.10.0 ' ,
' cookies ' : { } ,
' app_name ' : ' Amazon Alexa ' ,
' di.os.version ' : ' 11.4.1 '
}
"""
# https://gitlab.com/keatontaylor/alexapy/-/commit/540b6333d973177bbc98e6ef39b00134f80ef0bb
cookies = {
' at-main ' : access_token ,
}
headers = {
' User-Agent ' : ' AmazonWebView/Amazon Alexa/2.2.223830.0/iOS/11.4.1/iPhone ' ,
' Accept-Language ' : ' en-US ' ,
' Accept-Charset ' : ' utf-8 ' ,
' Connection ' : ' keep-alive ' ,
' Content-Type ' : ' application/x-www-form-urlencoded ' ,
' Accept ' : ' */* '
}
data = {
' di.os.name ' : ' iOS ' ,
' app_version ' : ' 2.2.223830.0 ' ,
' domain ' : ' . ' + ' api.amazon.com ' ,
' source_token ' : refresh_token ,
' requested_token_type ' : ' auth_cookies ' ,
' source_token_type ' : ' refresh_token ' ,
' di.hw.version ' : ' iPhone ' ,
' di.sdk.version ' : ' 6.10.0 ' ,
' app_name ' : ' Amazon Alexa ' ,
' di.os.version ' : ' 11.4.1 '
}
try :
# using the refresh token get the cookies needed for making calls to alexa.amazon.com
response = requests . post ( url = self . endpoints [ " token " ] , headers = headers , cookies = cookies , data = data ) . json ( )
# Extract the cookies from the response
raw_cookies = response [ ' response ' ] [ ' tokens ' ] [ ' cookies ' ] [ ' .amazon.com ' ]
except :
error = response [ ' response ' ] [ " error " ]
self . cache_path . unlink ( missing_ok = True )
raise self . log . exit ( f " Error when refreshing cookies: { error [ ' message ' ] } [ { error [ ' code ' ] } ] " )
# Create a new cookies object to be used with requsts.
cookies = { }
for cookie in raw_cookies :
cookies [ cookie [ ' Name ' ] ] = cookie [ ' Value ' ]
headers = {
' Content-Type ' : ' application/json; charset=utf-8 ' ,
' Accept-Encoding ' : ' gzip, deflate, br ' ,
' Connection ' : ' keep-alive ' ,
' Accept ' : ' application/json; charset=utf-8 ' ,
' User-Agent ' : ' Mozilla/5.0 (iPhone; CPU iPhone OS 14_3 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 PitanguiBridge/2.2.389238.0-[HARDWARE=iPhone12_3][SOFTWARE=14.3] ' ,
' Accept-Language ' : ' en-US,en-US;q=1.0 ' ,
}
json_data = {
* * device ,
' requested_token_type ' : ' access_token ' ,
' source_token_type ' : ' refresh_token ' ,
" source_token " : refresh_token ,
} # https://github.com/Sandmann79/xbmc/blob/dab17d913ee877d96115e6f799623bca158f3f24/plugin.video.amazon-test/resources/lib/login.py#L593
# make the call and print the response.
response = requests . post ( url = self . endpoints [ " token " ] , headers = headers , cookies = cookies , json = json_data ) . json ( )
if " error " in response :
self . cache_path . unlink ( missing_ok = True ) # Remove the cached device as its tokens have expired
raise self . log . exit (
f " Failed to refresh device token: { response [ ' error_description ' ] } [ { response [ ' error ' ] } ] "
)
self . log . debug ( response )
if response [ " token_type " ] != " bearer " :
raise self . log . exit ( " Unexpected returned refreshed token type " )
return response
def get_csrf_token ( self ) - > str :
"""
On the amazon website , you need a token that is in the html page ,
this token is used to register the device
: return : OnTV Page ' s CSRF Token
"""
res = self . session . get ( self . endpoints [ " ontv " ] )
response = res . text
if ' input type= " hidden " name= " appAction " value= " SIGNIN " ' in response :
raise self . log . exit (
" Cookies are signed out, cannot get ontv CSRF token. "
f " Expecting profile to have cookies for: { self . endpoints [ ' ontv ' ] } "
)
for match in re . finditer ( r " <script type= \" text/template \" >(.+)</script> " , response ) :
prop = json . loads ( match . group ( 1 ) )
prop = prop . get ( " props " , { } ) . get ( " codeEntry " , { } ) . get ( " token " )
if prop :
return prop
raise self . log . exit ( " Unable to get ontv CSRF token \n Navigate to /region/eu/ontv/code?ref_=atv_auth_red_aft, login and save cookies from that page to default.txt " )
def get_code_pair ( self , device : dict ) - > dict :
"""
Getting code pairs based on the device that you are using
: return : public and private code pairs
"""
res = self . session . post (
url = self . endpoints [ " codepair " ] ,
headers = {
" Content-Type " : " application/json " ,
" Accept-Language " : " en-US "
} ,
json = { " code_data " : device }
) . json ( )
if " error " in res :
raise self . log . exit ( f " Unable to get code pair: { res [ ' error_description ' ] } [ { res [ ' error ' ] } ] " )
return res