mirror of
				https://github.com/devine-dl/devine.git
				synced 2025-11-04 03:44:49 +00:00 
			
		
		
		
	feat(search): New Search command, Service method, SearchResult Class
This commit is contained in:
		
							parent
							
								
									10a01b0b47
								
							
						
					
					
						commit
						77e663ebee
					
				
							
								
								
									
										166
									
								
								devine/commands/search.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										166
									
								
								devine/commands/search.py
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,166 @@
 | 
			
		||||
from __future__ import annotations
 | 
			
		||||
 | 
			
		||||
import logging
 | 
			
		||||
import re
 | 
			
		||||
import sys
 | 
			
		||||
from typing import Any, Optional
 | 
			
		||||
 | 
			
		||||
import click
 | 
			
		||||
import yaml
 | 
			
		||||
from rich.padding import Padding
 | 
			
		||||
from rich.rule import Rule
 | 
			
		||||
from rich.tree import Tree
 | 
			
		||||
 | 
			
		||||
from devine.commands.dl import dl
 | 
			
		||||
from devine.core.config import config
 | 
			
		||||
from devine.core.console import console
 | 
			
		||||
from devine.core.constants import context_settings
 | 
			
		||||
from devine.core.proxies import Basic, Hola, NordVPN
 | 
			
		||||
from devine.core.service import Service
 | 
			
		||||
from devine.core.services import Services
 | 
			
		||||
from devine.core.utilities import get_binary_path
 | 
			
		||||
from devine.core.utils.click_types import ContextData
 | 
			
		||||
from devine.core.utils.collections import merge_dict
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@click.command(
 | 
			
		||||
    short_help="Search for titles from a Service.",
 | 
			
		||||
    cls=Services,
 | 
			
		||||
    context_settings=dict(
 | 
			
		||||
        **context_settings,
 | 
			
		||||
        token_normalize_func=Services.get_tag
 | 
			
		||||
    ))
 | 
			
		||||
@click.option("-p", "--profile", type=str, default=None,
 | 
			
		||||
              help="Profile to use for Credentials and Cookies (if available).")
 | 
			
		||||
@click.option("--proxy", type=str, default=None,
 | 
			
		||||
              help="Proxy URI to use. If a 2-letter country is provided, it will try get a proxy from the config.")
 | 
			
		||||
@click.option("--no-proxy", is_flag=True, default=False,
 | 
			
		||||
              help="Force disable all proxy use.")
 | 
			
		||||
@click.pass_context
 | 
			
		||||
def search(
 | 
			
		||||
    ctx: click.Context,
 | 
			
		||||
    no_proxy: bool,
 | 
			
		||||
    profile: Optional[str] = None,
 | 
			
		||||
    proxy: Optional[str] = None
 | 
			
		||||
):
 | 
			
		||||
    if not ctx.invoked_subcommand:
 | 
			
		||||
        raise ValueError("A subcommand to invoke was not specified, the main code cannot continue.")
 | 
			
		||||
 | 
			
		||||
    log = logging.getLogger("search")
 | 
			
		||||
 | 
			
		||||
    service = Services.get_tag(ctx.invoked_subcommand)
 | 
			
		||||
    profile = profile
 | 
			
		||||
 | 
			
		||||
    if profile:
 | 
			
		||||
        log.info(f"Using profile: '{profile}'")
 | 
			
		||||
 | 
			
		||||
    with console.status("Loading Service Config...", spinner="dots"):
 | 
			
		||||
        service_config_path = Services.get_path(service) / config.filenames.config
 | 
			
		||||
        if service_config_path.exists():
 | 
			
		||||
            service_config = yaml.safe_load(service_config_path.read_text(encoding="utf8"))
 | 
			
		||||
            log.info("Service Config loaded")
 | 
			
		||||
        else:
 | 
			
		||||
            service_config = {}
 | 
			
		||||
        merge_dict(config.services.get(service), service_config)
 | 
			
		||||
 | 
			
		||||
    proxy_providers = []
 | 
			
		||||
    if no_proxy:
 | 
			
		||||
        ctx.params["proxy"] = None
 | 
			
		||||
    else:
 | 
			
		||||
        with console.status("Loading Proxy Providers...", spinner="dots"):
 | 
			
		||||
            if config.proxy_providers.get("basic"):
 | 
			
		||||
                proxy_providers.append(Basic(**config.proxy_providers["basic"]))
 | 
			
		||||
            if config.proxy_providers.get("nordvpn"):
 | 
			
		||||
                proxy_providers.append(NordVPN(**config.proxy_providers["nordvpn"]))
 | 
			
		||||
            if get_binary_path("hola-proxy"):
 | 
			
		||||
                proxy_providers.append(Hola())
 | 
			
		||||
            for proxy_provider in proxy_providers:
 | 
			
		||||
                log.info(f"Loaded {proxy_provider.__class__.__name__}: {proxy_provider}")
 | 
			
		||||
 | 
			
		||||
        if proxy:
 | 
			
		||||
            requested_provider = None
 | 
			
		||||
            if re.match(r"^[a-z]+:.+$", proxy, re.IGNORECASE):
 | 
			
		||||
                # requesting proxy from a specific proxy provider
 | 
			
		||||
                requested_provider, proxy = proxy.split(":", maxsplit=1)
 | 
			
		||||
            if re.match(r"^[a-z]{2}(?:\d+)?$", proxy, re.IGNORECASE):
 | 
			
		||||
                proxy = proxy.lower()
 | 
			
		||||
                with console.status(f"Getting a Proxy to {proxy}...", spinner="dots"):
 | 
			
		||||
                    if requested_provider:
 | 
			
		||||
                        proxy_provider = next((
 | 
			
		||||
                            x
 | 
			
		||||
                            for x in proxy_providers
 | 
			
		||||
                            if x.__class__.__name__.lower() == requested_provider
 | 
			
		||||
                        ), None)
 | 
			
		||||
                        if not proxy_provider:
 | 
			
		||||
                            log.error(f"The proxy provider '{requested_provider}' was not recognised.")
 | 
			
		||||
                            sys.exit(1)
 | 
			
		||||
                        proxy_uri = proxy_provider.get_proxy(proxy)
 | 
			
		||||
                        if not proxy_uri:
 | 
			
		||||
                            log.error(f"The proxy provider {requested_provider} had no proxy for {proxy}")
 | 
			
		||||
                            sys.exit(1)
 | 
			
		||||
                        proxy = ctx.params["proxy"] = proxy_uri
 | 
			
		||||
                        log.info(f"Using {proxy_provider.__class__.__name__} Proxy: {proxy}")
 | 
			
		||||
                    else:
 | 
			
		||||
                        for proxy_provider in proxy_providers:
 | 
			
		||||
                            proxy_uri = proxy_provider.get_proxy(proxy)
 | 
			
		||||
                            if proxy_uri:
 | 
			
		||||
                                proxy = ctx.params["proxy"] = proxy_uri
 | 
			
		||||
                                log.info(f"Using {proxy_provider.__class__.__name__} Proxy: {proxy}")
 | 
			
		||||
                                break
 | 
			
		||||
            else:
 | 
			
		||||
                log.info(f"Using explicit Proxy: {proxy}")
 | 
			
		||||
 | 
			
		||||
    ctx.obj = ContextData(
 | 
			
		||||
        config=service_config,
 | 
			
		||||
        cdm=None,
 | 
			
		||||
        proxy_providers=proxy_providers,
 | 
			
		||||
        profile=profile
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@search.result_callback()
 | 
			
		||||
def result(service: Service, profile: Optional[str] = None, **_: Any) -> None:
 | 
			
		||||
    log = logging.getLogger("search")
 | 
			
		||||
 | 
			
		||||
    service_tag = service.__class__.__name__
 | 
			
		||||
 | 
			
		||||
    with console.status("Authenticating with Service...", spinner="dots"):
 | 
			
		||||
        cookies = dl.get_cookie_jar(service_tag, profile)
 | 
			
		||||
        credential = dl.get_credentials(service_tag, profile)
 | 
			
		||||
        service.authenticate(cookies, credential)
 | 
			
		||||
        if cookies or credential:
 | 
			
		||||
            log.info("Authenticated with Service")
 | 
			
		||||
 | 
			
		||||
    search_results = Tree("Search Results", hide_root=True)
 | 
			
		||||
    with console.status("Searching...", spinner="dots"):
 | 
			
		||||
        for result in service.search():
 | 
			
		||||
            result_text = f"[bold text]{result.title}[/]"
 | 
			
		||||
            if result.url:
 | 
			
		||||
                result_text = f"[link={result.url}]{result_text}[/link]"
 | 
			
		||||
            if result.label:
 | 
			
		||||
                result_text += f"  [pink]{result.label}[/]"
 | 
			
		||||
            if result.description:
 | 
			
		||||
                result_text += f"\n[text2]{result.description}[/]"
 | 
			
		||||
            result_text += f"\n[bright_black]id: {result.id}[/]"
 | 
			
		||||
            search_results.add(result_text + "\n")
 | 
			
		||||
 | 
			
		||||
    # update cookies
 | 
			
		||||
    cookie_file = dl.get_cookie_path(service_tag, profile)
 | 
			
		||||
    if cookie_file:
 | 
			
		||||
        dl.save_cookies(cookie_file, service.session.cookies)
 | 
			
		||||
 | 
			
		||||
    console.print(Padding(
 | 
			
		||||
        Rule(f"[rule.text]{len(search_results.children)} Search Results"),
 | 
			
		||||
        (1, 2)
 | 
			
		||||
    ))
 | 
			
		||||
 | 
			
		||||
    if search_results.children:
 | 
			
		||||
        console.print(Padding(
 | 
			
		||||
            search_results,
 | 
			
		||||
            (0, 5)
 | 
			
		||||
        ))
 | 
			
		||||
    else:
 | 
			
		||||
        console.print(Padding(
 | 
			
		||||
            "[bold text]No matches[/]\n[bright_black]Please check spelling and search again....[/]",
 | 
			
		||||
            (0, 5)
 | 
			
		||||
        ))
 | 
			
		||||
							
								
								
									
										44
									
								
								devine/core/search_result.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										44
									
								
								devine/core/search_result.py
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,44 @@
 | 
			
		||||
from typing import Optional, Union
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class SearchResult:
 | 
			
		||||
    def __init__(
 | 
			
		||||
        self,
 | 
			
		||||
        id_: Union[str, int],
 | 
			
		||||
        title: str,
 | 
			
		||||
        description: Optional[str] = None,
 | 
			
		||||
        label: Optional[str] = None,
 | 
			
		||||
        url: Optional[str] = None
 | 
			
		||||
    ):
 | 
			
		||||
        """
 | 
			
		||||
        A Search Result for any support Title Type.
 | 
			
		||||
 | 
			
		||||
        Parameters:
 | 
			
		||||
            id_: The search result's Title ID.
 | 
			
		||||
            title: The primary display text, e.g., the Title's Name.
 | 
			
		||||
            description: The secondary display text, e.g., the Title's Description or
 | 
			
		||||
                further title information.
 | 
			
		||||
            label: The tertiary display text. This will typically be used to display
 | 
			
		||||
                an informative label or tag to the result. E.g., "unavailable", the
 | 
			
		||||
                title's price tag, region, etc.
 | 
			
		||||
            url: A hyperlink to the search result or title's page.
 | 
			
		||||
        """
 | 
			
		||||
        if not isinstance(id_, (str, int)):
 | 
			
		||||
            raise TypeError(f"Expected id_ to be a {str} or {int}, not {type(id_)}")
 | 
			
		||||
        if not isinstance(title, str):
 | 
			
		||||
            raise TypeError(f"Expected title to be a {str}, not {type(title)}")
 | 
			
		||||
        if not isinstance(description, (str, type(None))):
 | 
			
		||||
            raise TypeError(f"Expected description to be a {str}, not {type(description)}")
 | 
			
		||||
        if not isinstance(label, (str, type(None))):
 | 
			
		||||
            raise TypeError(f"Expected label to be a {str}, not {type(label)}")
 | 
			
		||||
        if not isinstance(url, (str, type(None))):
 | 
			
		||||
            raise TypeError(f"Expected url to be a {str}, not {type(url)}")
 | 
			
		||||
 | 
			
		||||
        self.id = id_
 | 
			
		||||
        self.title = title
 | 
			
		||||
        self.description = description
 | 
			
		||||
        self.label = label
 | 
			
		||||
        self.url = url
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
__all__ = ("SearchResult",)
 | 
			
		||||
@ -1,6 +1,7 @@
 | 
			
		||||
import base64
 | 
			
		||||
import logging
 | 
			
		||||
from abc import ABCMeta, abstractmethod
 | 
			
		||||
from collections.abc import Generator
 | 
			
		||||
from http.cookiejar import CookieJar
 | 
			
		||||
from typing import Optional, Union
 | 
			
		||||
from urllib.parse import urlparse
 | 
			
		||||
@ -16,6 +17,7 @@ from devine.core.config import config
 | 
			
		||||
from devine.core.console import console
 | 
			
		||||
from devine.core.constants import AnyTrack
 | 
			
		||||
from devine.core.credential import Credential
 | 
			
		||||
from devine.core.search_result import SearchResult
 | 
			
		||||
from devine.core.titles import Title_T, Titles_T
 | 
			
		||||
from devine.core.tracks import Chapters, Tracks
 | 
			
		||||
from devine.core.utilities import get_ip_info
 | 
			
		||||
@ -123,6 +125,17 @@ class Service(metaclass=ABCMeta):
 | 
			
		||||
                raise TypeError(f"Expected cookies to be a {CookieJar}, not {cookies!r}.")
 | 
			
		||||
            self.session.cookies.update(cookies)
 | 
			
		||||
 | 
			
		||||
    def search(self) -> Generator[SearchResult, None, None]:
 | 
			
		||||
        """
 | 
			
		||||
        Search by query for titles from the Service.
 | 
			
		||||
 | 
			
		||||
        The query must be taken as a CLI argument by the Service class.
 | 
			
		||||
        Ideally just re-use the title ID argument (i.e. self.title).
 | 
			
		||||
 | 
			
		||||
        Search results will be displayed in the order yielded.
 | 
			
		||||
        """
 | 
			
		||||
        raise NotImplementedError(f"Search functionality has not been implemented by {self.__class__.__name__}")
 | 
			
		||||
 | 
			
		||||
    def get_widevine_service_certificate(self, *, challenge: bytes, title: Title_T, track: AnyTrack) \
 | 
			
		||||
            -> Union[bytes, str]:
 | 
			
		||||
        """
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user