refactor(ITV): Change authentication from credentials to cookies

- ITV has started to add telemetry to login requests to combat bot usage, so we now use cookies to authenticate.
- The cookie should hopefully only be needed once per account. Any subsequent session will use cached tokens.
This commit is contained in:
stabbedbybrick 2024-06-29 14:58:48 +02:00
parent 601a6a55d4
commit f7428845e8

View File

@ -3,7 +3,7 @@ from __future__ import annotations
import hashlib
import json
import re
import time
import sys
from collections.abc import Generator
from http.cookiejar import MozillaCookieJar
from typing import Any, Optional, Union
@ -25,7 +25,7 @@ class ITV(Service):
\b
Author: stabbedbybrick
Authorization: Credentials (Optional for free content | Required for premium content)
Authorization: Cookies (Optional for free content | Required for premium content)
Robustness:
L1: 1080p
L3: 720p
@ -66,14 +66,34 @@ class ITV(Service):
self.title = title
super().__init__(ctx)
self.profile = ctx.parent.params.get("profile")
if not self.profile:
self.profile = "default"
self.session.headers.update(self.config["headers"])
def authenticate(self, cookies: Optional[MozillaCookieJar] = None, credential: Optional[Credential] = None) -> None:
super().authenticate(cookies, credential)
self.authorization = None
if credential is not None:
cache = self.cache.get(f"tokens_{credential.sha1}")
if credential and not cookies:
self.log.error(" - Error: This service requires cookies for authentication.")
sys.exit(1)
if cookies is not None:
self.log.info(f"\n + Cookies for '{self.profile}' profile found, authenticating...")
itv_session = next((cookie.value for cookie in cookies if cookie.name == "Itv.Session"), None)
if not itv_session:
self.log.error(" - Error: Session cookie not found. Cookies may be invalid.")
sys.exit(1)
itv_session = json.loads(itv_session)
refresh_token = itv_session["tokens"]["content"].get("refresh_token")
if not refresh_token:
self.log.error(" - Error: Access tokens not found. Try refreshing your cookies.")
sys.exit(1)
cache = self.cache.get(f"tokens_{self.profile}")
headers = {
"Host": "auth.prd.user.itv.com",
@ -85,38 +105,19 @@ class ITV(Service):
"Referer": "https://www.itv.com/",
}
if cache:
self.log.info(" + Using cached Tokens...")
r = self.session.get(
self.config["endpoints"]["refresh"],
headers=headers,
params={"refresh": cache.data["refresh_token"]},
)
if r.status_code != 200:
raise ConnectionError(f"Failed to refresh tokens: {r.text}")
params = {"refresh": cache.data["refresh_token"]} if cache else {"refresh": refresh_token}
tokens = r.json()
else:
r = self.session.post(
self.config["endpoints"]["login"],
headers=headers,
data=json.dumps(
{
"username": credential.username,
"password": credential.password,
"scope": "content",
"grant_type": "password",
"nonce": f"cerberus-auth-request-{int(time.time())}",
}
),
)
if r.status_code != 200:
raise ConnectionError(f"Failed to authenticate: {r.text}")
tokens = r.json()
self.log.info(" + Acquired Tokens...")
r = self.session.get(
self.config["endpoints"]["refresh"],
headers=headers,
params=params,
)
if r.status_code != 200:
raise ConnectionError(f"Failed to refresh tokens: {r.text}")
tokens = r.json()
cache.set(tokens)
self.log.info(" + Tokens refreshed and placed in cache\n")
self.authorization = tokens["access_token"]