fix(CBC): Update authentication flow, title API
This commit is contained in:
parent
50f2062a1f
commit
1fe698966f
@ -30,7 +30,7 @@ class CBC(Service):
|
||||
Authorization: Credentials
|
||||
Robustness:
|
||||
AES-128: 1080p, DDP5.1
|
||||
Widevine: 720p, DDP5.1
|
||||
Widevine L3: 720p, DDP5.1
|
||||
|
||||
\b
|
||||
Tips:
|
||||
@ -40,6 +40,7 @@ class CBC(Service):
|
||||
|
||||
\b
|
||||
Notes:
|
||||
- DRM encrypted titles max out at 720p.
|
||||
- CCExtrator v0.94 will likely fail to extract subtitles. It's recommended to downgrade to v0.93.
|
||||
- Some audio tracks contain invalid data, causing warning messages from mkvmerge during muxing
|
||||
These can be ignored.
|
||||
@ -57,10 +58,10 @@ class CBC(Service):
|
||||
return CBC(ctx, **kwargs)
|
||||
|
||||
def __init__(self, ctx: Context, title: str):
|
||||
self.title = title
|
||||
self.title: str = title
|
||||
super().__init__(ctx)
|
||||
|
||||
self.base_url = self.config["endpoints"]["base_url"]
|
||||
self.base_url: str = self.config["endpoints"]["base_url"]
|
||||
|
||||
def search(self) -> Generator[SearchResult, None, None]:
|
||||
params = {
|
||||
@ -69,7 +70,7 @@ class CBC(Service):
|
||||
"pageSize": "20",
|
||||
"term": self.title,
|
||||
}
|
||||
response = self._request("GET", "/ott/catalog/v1/gem/search", params=params)
|
||||
response: dict = self._request("GET", "/ott/catalog/v1/gem/search", params=params)
|
||||
|
||||
for result in response.get("result", []):
|
||||
yield SearchResult(
|
||||
@ -85,87 +86,83 @@ class CBC(Service):
|
||||
if not credential:
|
||||
raise EnvironmentError("Service requires Credentials for Authentication.")
|
||||
|
||||
login = self.cache.get(f"login_{credential.sha1}")
|
||||
tokens = self.cache.get(f"tokens_{credential.sha1}")
|
||||
tokens: Optional[Any] = self.cache.get(f"tokens_{credential.sha1}")
|
||||
|
||||
if login and not login.expired:
|
||||
# cached
|
||||
self.log.info(" + Using cached login tokens")
|
||||
auth_token = login.data["access_token"]
|
||||
|
||||
elif login and login.expired:
|
||||
payload = {
|
||||
"email": credential.username,
|
||||
"password": credential.password,
|
||||
"refresh_token": login.data["refresh_token"],
|
||||
}
|
||||
|
||||
params = {"apikey": self.config["endpoints"]["api_key"]}
|
||||
auth = self._request(
|
||||
"POST", "https://api.loginradius.com/identity/v2/auth/login",
|
||||
payload=payload,
|
||||
params=params,
|
||||
)
|
||||
|
||||
login.set(auth, expiration=auth["expires_in"])
|
||||
auth_token = login.data["access_token"]
|
||||
|
||||
self.log.info(" + Refreshed login tokens")
|
||||
|
||||
else:
|
||||
payload = {
|
||||
"email": credential.username,
|
||||
"password": credential.password,
|
||||
}
|
||||
params = {"apikey": self.config["endpoints"]["api_key"]}
|
||||
auth = self._request(
|
||||
"POST", "https://api.loginradius.com/identity/v2/auth/login",
|
||||
payload=payload,
|
||||
params=params,
|
||||
)
|
||||
|
||||
login.set(auth, expiration=auth["expires_in"])
|
||||
auth_token = login.data["access_token"]
|
||||
|
||||
self.log.info(" + Acquired fresh login tokens")
|
||||
"""
|
||||
All grant types for future reference:
|
||||
PASSWORD("password"),
|
||||
ACCESS_TOKEN("access_token"),
|
||||
REFRESH_TOKEN("refresh_token"),
|
||||
CLIENT_CREDENTIALS("client_credentials"),
|
||||
AUTHORIZATION_CODE("authorization_code"),
|
||||
CODE("code");
|
||||
"""
|
||||
|
||||
if tokens and not tokens.expired:
|
||||
# cached
|
||||
self.log.info(" + Using cached access tokens")
|
||||
access_token = tokens.data["accessToken"]
|
||||
self.log.info(" + Using cached tokens")
|
||||
auth_token: str = tokens.data["access_token"]
|
||||
|
||||
elif tokens and tokens.expired:
|
||||
# expired, refresh
|
||||
self.log.info("Refreshing cached tokens...")
|
||||
auth_url, scopes = self.settings()
|
||||
params = {
|
||||
"client_id": self.config["client"]["id"],
|
||||
"grant_type": "refresh_token",
|
||||
"refresh_token": tokens.data["refresh_token"],
|
||||
"scope": scopes,
|
||||
}
|
||||
|
||||
access: dict = self._request("POST", auth_url, params=params)
|
||||
|
||||
# Shorten expiration by one hour to account for clock skew
|
||||
tokens.set(access, expiration=int(access["expires_in"]) - 3600)
|
||||
auth_token: str = access["access_token"]
|
||||
|
||||
else:
|
||||
access = self.access_token(auth_token)
|
||||
tokens.set(access, expiration=access["accessTokenExpiresIn"])
|
||||
access_token = access["accessToken"]
|
||||
self.log.info(" + Acquired fresh access tokens")
|
||||
# new
|
||||
self.log.info("Requesting new tokens...")
|
||||
auth_url, scopes = self.settings()
|
||||
params = {
|
||||
"client_id": self.config["client"]["id"],
|
||||
"grant_type": "password",
|
||||
"username": credential.username,
|
||||
"password": credential.password,
|
||||
"scope": scopes,
|
||||
}
|
||||
|
||||
claims_token = self.claims_token(access_token)
|
||||
access: dict = self._request("POST", auth_url, params=params)
|
||||
|
||||
# Shorten expiration by one hour to account for clock skew
|
||||
tokens.set(access, expiration=int(access["expires_in"]) - 3600)
|
||||
auth_token: str = access["access_token"]
|
||||
|
||||
claims_token: str = self.claims_token(auth_token)
|
||||
self.session.headers.update({"x-claims-token": claims_token})
|
||||
|
||||
def get_titles(self) -> Union[Movies, Series]:
|
||||
title_re = r"^(?:https?://(?:www.)?gem.cbc.ca/)?(?P<id>[a-zA-Z0-9_-]+)"
|
||||
title_re: str = r"^(?:https?://(?:www.)?gem.cbc.ca/)?(?P<id>[a-zA-Z0-9_-]+)"
|
||||
try:
|
||||
title_id = re.match(title_re, self.title).group("id")
|
||||
title_id: str = re.match(title_re, self.title).group("id")
|
||||
except Exception:
|
||||
raise ValueError("- Could not parse ID from title")
|
||||
|
||||
data = self._request("GET", "/ott/cbc-api/v2/shows/{}".format(title_id))
|
||||
label = data.get("seasons", [])[0].get("title")
|
||||
params = {"device": "web"}
|
||||
data: dict = self._request("GET", "/ott/catalog/v2/gem/show/{}".format(title_id), params=params)
|
||||
label: str = data.get("contentType", "").lower()
|
||||
|
||||
if label.lower() in ("film", "movie"):
|
||||
movie = self._movie(data)
|
||||
return Movies(movie)
|
||||
if label in ("film", "movie", "standalone"):
|
||||
movies: list[Movie] = self._movie(data)
|
||||
return Movies(movies)
|
||||
|
||||
else:
|
||||
episodes = self._show(data)
|
||||
episodes: list[Episode] = self._show(data)
|
||||
return Series(episodes)
|
||||
|
||||
def get_tracks(self, title: Union[Movie, Episode]) -> Tracks:
|
||||
media_id = title.data["playSession"].get("mediaId")
|
||||
index = self._request(
|
||||
"GET", "/media/meta/v1/index.ashx",
|
||||
params={"appCode": "gem", "idMedia": media_id, "output": "jsonObject"}
|
||||
index: dict = self._request(
|
||||
"GET", "/media/meta/v1/index.ashx", params={"appCode": "gem", "idMedia": title.id, "output": "jsonObject"}
|
||||
)
|
||||
|
||||
title.data["extra"] = {
|
||||
@ -173,22 +170,22 @@ class CBC(Service):
|
||||
"credits": index["Metas"].get("CreditStartTime"),
|
||||
}
|
||||
|
||||
self.drm = index["Metas"].get("isDrmActive") == "true"
|
||||
self.drm: bool = index["Metas"].get("isDrmActive") == "true"
|
||||
if self.drm:
|
||||
tech = next(tech["name"] for tech in index["availableTechs"] if "widevine" in tech["drm"])
|
||||
tech: str = next(tech["name"] for tech in index["availableTechs"] if "widevine" in tech["drm"])
|
||||
else:
|
||||
tech = next(tech["name"] for tech in index["availableTechs"] if not tech["drm"])
|
||||
tech: str = next(tech["name"] for tech in index["availableTechs"] if not tech["drm"])
|
||||
|
||||
response = self._request(
|
||||
"GET", self.config["endpoints"]["validation"].format("android", media_id, "smart-tv", tech)
|
||||
response: dict = self._request(
|
||||
"GET", self.config["endpoints"]["validation"].format("android", title.id, "smart-tv", tech)
|
||||
)
|
||||
|
||||
manifest = response.get("url")
|
||||
self.license = next((x["value"] for x in response["params"] if "widevineLicenseUrl" in x["name"]), None)
|
||||
self.token = next((x["value"] for x in response["params"] if "widevineAuthToken" in x["name"]), None)
|
||||
|
||||
stream_type = HLS if tech == "hls" else DASH
|
||||
tracks = stream_type.from_url(manifest, self.session).to_tracks(language=index.get("Language", "en"))
|
||||
stream_type: Union[HLS, DASH] = HLS if tech == "hls" else DASH
|
||||
tracks: Tracks = stream_type.from_url(manifest, self.session).to_tracks(language=title.language)
|
||||
|
||||
if stream_type == DASH:
|
||||
for track in tracks.audio:
|
||||
@ -196,10 +193,13 @@ class CBC(Service):
|
||||
if label is not None and "descriptive" in label.text.lower():
|
||||
track.descriptive = True
|
||||
|
||||
for track in tracks:
|
||||
track.language = title.language
|
||||
|
||||
return tracks
|
||||
|
||||
def get_chapters(self, title: Union[Movie, Episode]) -> Chapters:
|
||||
extra = title.data["extra"]
|
||||
extra: dict = title.data["extra"]
|
||||
|
||||
chapters = []
|
||||
if extra.get("chapters"):
|
||||
@ -226,73 +226,80 @@ class CBC(Service):
|
||||
|
||||
# Service specific
|
||||
|
||||
def _show(self, data: dict) -> Episode:
|
||||
episodes = [episode for season in data["seasons"] for episode in season["assets"] if not episode["isTrailer"]]
|
||||
def _show(self, data: dict) -> list[Episode]:
|
||||
lineups: list = next((x["lineups"] for x in data["content"] if x.get("title", "").lower() == "episodes"), None)
|
||||
if not lineups:
|
||||
self.log.warning("No episodes found for: {}".format(data.get("title")))
|
||||
return
|
||||
|
||||
return Series(
|
||||
[
|
||||
titles = []
|
||||
for season in lineups:
|
||||
for episode in season["items"]:
|
||||
if episode.get("mediaType", "").lower() == "episode":
|
||||
parts = episode.get("title", "").split(".", 1)
|
||||
episode_name = parts[1].strip() if len(parts) > 1 else parts[0].strip()
|
||||
titles.append(
|
||||
Episode(
|
||||
id_=episode["id"],
|
||||
id_=episode["idMedia"],
|
||||
service=self.__class__,
|
||||
title=data.get("title"),
|
||||
season=int(episode.get("season", 0)),
|
||||
number=int(episode.get("episode", 0)),
|
||||
name=episode.get("title"),
|
||||
season=int(season.get("seasonNumber", 0)),
|
||||
number=int(episode.get("episodeNumber", 0)),
|
||||
name=episode_name,
|
||||
year=episode.get("metadata", {}).get("productionYear"),
|
||||
language=data["structuredMetadata"].get("inLanguage", "en-CA"),
|
||||
data=episode,
|
||||
)
|
||||
for episode in episodes
|
||||
]
|
||||
)
|
||||
|
||||
def _movie(self, data: dict) -> Movie:
|
||||
movies = [movie for season in data["seasons"] for movie in season["assets"] if not movie["isTrailer"]]
|
||||
return titles
|
||||
|
||||
return [
|
||||
def _movie(self, data: dict) -> list[Movie]:
|
||||
unwanted: tuple = ("episodes", "trailers", "extras")
|
||||
lineups: list = next((x["lineups"] for x in data["content"] if x.get("title", "").lower() not in unwanted), None)
|
||||
if not lineups:
|
||||
self.log.warning("No movies found for: {}".format(data.get("title")))
|
||||
return
|
||||
|
||||
titles = []
|
||||
for season in lineups:
|
||||
for movie in season["items"]:
|
||||
if movie.get("mediaType", "").lower() == "episode":
|
||||
parts = movie.get("title", "").split(".", 1)
|
||||
movie_name = parts[1].strip() if len(parts) > 1 else parts[0].strip()
|
||||
titles.append(
|
||||
Movie(
|
||||
id_=movie.get("id"),
|
||||
id_=movie.get("idMedia"),
|
||||
service=self.__class__,
|
||||
name=data.get("title"),
|
||||
name=movie_name,
|
||||
year=movie.get("metadata", {}).get("productionYear"),
|
||||
language=data["structuredMetadata"].get("inLanguage", "en-CA"),
|
||||
data=movie,
|
||||
)
|
||||
for movie in movies
|
||||
]
|
||||
|
||||
def access_token(self, token: str) -> str:
|
||||
params = {
|
||||
"access_token": token,
|
||||
"apikey": self.config["endpoints"]["api_key"],
|
||||
"jwtapp": "jwt",
|
||||
}
|
||||
|
||||
headers = {"content-type": "application/json"}
|
||||
resp = self._request(
|
||||
"GET", "https://cloud-api.loginradius.com/sso/jwt/api/token",
|
||||
headers=headers,
|
||||
params=params
|
||||
)
|
||||
|
||||
payload = {"jwt": resp.get("signature")}
|
||||
headers = {"content-type": "application/json", "ott-device-type": "web"}
|
||||
auth = self._request("POST", "/ott/cbc-api/v2/token", headers=headers, payload=payload)
|
||||
return titles
|
||||
|
||||
return auth
|
||||
def settings(self) -> tuple:
|
||||
settings = self._request("GET", "/ott/catalog/v1/gem/settings", params={"device": "web"})
|
||||
auth_url: str = settings["identityManagement"]["ropc"]["url"]
|
||||
scopes: str = settings["identityManagement"]["ropc"]["scopes"]
|
||||
return auth_url, scopes
|
||||
|
||||
def claims_token(self, token: str) -> str:
|
||||
headers = {
|
||||
"content-type": "application/json",
|
||||
"ott-device-type": "web",
|
||||
"ott-access-token": token,
|
||||
"Authorization": "Bearer " + token,
|
||||
}
|
||||
response = self._request("GET", "/ott/cbc-api/v2/profile", headers=headers)
|
||||
params = {"device": "web"}
|
||||
response: dict = self._request(
|
||||
"GET", "/ott/subscription/v2/gem/Subscriber/profile", headers=headers, params=params
|
||||
)
|
||||
return response["claimsToken"]
|
||||
|
||||
def _request(
|
||||
self, method: str, api: str, params: dict = None, headers: dict = None, payload: dict = None
|
||||
) -> Any[dict | str]:
|
||||
url = urljoin(self.base_url, api)
|
||||
headers = headers or self.session.headers
|
||||
def _request(self, method: str, api: str, **kwargs: Any) -> Any[dict | str]:
|
||||
url: str = urljoin(self.base_url, api)
|
||||
|
||||
prep = self.session.prepare_request(Request(method, url, params=params, headers=headers, json=payload))
|
||||
prep: Request = self.session.prepare_request(Request(method, url, **kwargs))
|
||||
response = self.session.send(prep)
|
||||
if response.status_code not in (200, 426):
|
||||
raise ConnectionError(f"{response.status_code} - {response.text}")
|
||||
|
@ -2,3 +2,6 @@ endpoints:
|
||||
base_url: "https://services.radio-canada.ca"
|
||||
validation: "/media/validation/v2?appCode=gem&&deviceType={}&idMedia={}&manifestType={}&output=json&tech={}"
|
||||
api_key: "3f4beddd-2061-49b0-ae80-6f1f2ed65b37"
|
||||
|
||||
client:
|
||||
id: "fc05b0ee-3865-4400-a3cc-3da82c330c23"
|
Loading…
x
Reference in New Issue
Block a user