From f7428845e8006366d2deea8696700c0c812e534e Mon Sep 17 00:00:00 2001 From: stabbedbybrick <125766685+stabbedbybrick@users.noreply.github.com> Date: Sat, 29 Jun 2024 14:58:48 +0200 Subject: [PATCH] refactor(ITV): Change authentication from credentials to cookies - ITV has started to add telemetry to login requests to combat bot usage, so we now use cookies to authenticate. - The cookie should hopefully only be needed once per account. Any subsequent session will use cached tokens. --- services/ITV/__init__.py | 67 ++++++++++++++++++++-------------------- 1 file changed, 34 insertions(+), 33 deletions(-) diff --git a/services/ITV/__init__.py b/services/ITV/__init__.py index e084a89..84a0f35 100644 --- a/services/ITV/__init__.py +++ b/services/ITV/__init__.py @@ -3,7 +3,7 @@ from __future__ import annotations import hashlib import json import re -import time +import sys from collections.abc import Generator from http.cookiejar import MozillaCookieJar from typing import Any, Optional, Union @@ -25,7 +25,7 @@ class ITV(Service): \b Author: stabbedbybrick - Authorization: Credentials (Optional for free content | Required for premium content) + Authorization: Cookies (Optional for free content | Required for premium content) Robustness: L1: 1080p L3: 720p @@ -66,14 +66,34 @@ class ITV(Service): self.title = title super().__init__(ctx) + self.profile = ctx.parent.params.get("profile") + if not self.profile: + self.profile = "default" + self.session.headers.update(self.config["headers"]) def authenticate(self, cookies: Optional[MozillaCookieJar] = None, credential: Optional[Credential] = None) -> None: super().authenticate(cookies, credential) self.authorization = None - if credential is not None: - cache = self.cache.get(f"tokens_{credential.sha1}") + if credential and not cookies: + self.log.error(" - Error: This service requires cookies for authentication.") + sys.exit(1) + + if cookies is not None: + self.log.info(f"\n + Cookies for '{self.profile}' profile found, authenticating...") + itv_session = next((cookie.value for cookie in cookies if cookie.name == "Itv.Session"), None) + if not itv_session: + self.log.error(" - Error: Session cookie not found. Cookies may be invalid.") + sys.exit(1) + + itv_session = json.loads(itv_session) + refresh_token = itv_session["tokens"]["content"].get("refresh_token") + if not refresh_token: + self.log.error(" - Error: Access tokens not found. Try refreshing your cookies.") + sys.exit(1) + + cache = self.cache.get(f"tokens_{self.profile}") headers = { "Host": "auth.prd.user.itv.com", @@ -85,38 +105,19 @@ class ITV(Service): "Referer": "https://www.itv.com/", } - if cache: - self.log.info(" + Using cached Tokens...") - r = self.session.get( - self.config["endpoints"]["refresh"], - headers=headers, - params={"refresh": cache.data["refresh_token"]}, - ) - if r.status_code != 200: - raise ConnectionError(f"Failed to refresh tokens: {r.text}") + params = {"refresh": cache.data["refresh_token"]} if cache else {"refresh": refresh_token} - tokens = r.json() - else: - r = self.session.post( - self.config["endpoints"]["login"], - headers=headers, - data=json.dumps( - { - "username": credential.username, - "password": credential.password, - "scope": "content", - "grant_type": "password", - "nonce": f"cerberus-auth-request-{int(time.time())}", - } - ), - ) - if r.status_code != 200: - raise ConnectionError(f"Failed to authenticate: {r.text}") - - tokens = r.json() - self.log.info(" + Acquired Tokens...") + r = self.session.get( + self.config["endpoints"]["refresh"], + headers=headers, + params=params, + ) + if r.status_code != 200: + raise ConnectionError(f"Failed to refresh tokens: {r.text}") + tokens = r.json() cache.set(tokens) + self.log.info(" + Tokens refreshed and placed in cache\n") self.authorization = tokens["access_token"]