2024-01-24 20:09:59 +00:00

215 lines
6.3 KiB
Python

from typing import Iterator, Optional, Union
from uuid import UUID
from requests import Session
from devine.core import __version__
from devine.core.vault import Vault
class API(Vault):
"""Key Vault using a simple RESTful HTTP API call."""
def __init__(self, name: str, uri: str, token: str):
super().__init__(name)
self.uri = uri.rstrip("/")
self.session = Session()
self.session.headers.update({
"User-Agent": f"Devine v{__version__}"
})
self.session.headers.update({
"Authorization": f"Bearer {token}"
})
def get_key(self, kid: Union[UUID, str], service: str) -> Optional[str]:
if isinstance(kid, UUID):
kid = kid.hex
data = self.session.get(
url=f"{self.uri}/{service.lower()}/{kid}",
headers={
"Accept": "application/json"
}
).json()
code = int(data.get("code", 0))
message = data.get("message")
error = {
0: None,
1: Exceptions.AuthRejected,
2: Exceptions.TooManyRequests,
3: Exceptions.ServiceTagInvalid,
4: Exceptions.KeyIdInvalid
}.get(code, ValueError)
if error:
raise error(f"{message} ({code})")
content_key = data.get("content_key")
if not content_key:
return None
if not isinstance(content_key, str):
raise ValueError(f"Expected {content_key} to be {str}, was {type(content_key)}")
return content_key
def get_keys(self, service: str) -> Iterator[tuple[str, str]]:
page = 1
while True:
data = self.session.get(
url=f"{self.uri}/{service.lower()}",
params={
"page": page,
"total": 10
},
headers={
"Accept": "application/json"
}
).json()
code = int(data.get("code", 0))
message = data.get("message")
error = {
0: None,
1: Exceptions.AuthRejected,
2: Exceptions.TooManyRequests,
3: Exceptions.PageInvalid,
4: Exceptions.ServiceTagInvalid,
}.get(code, ValueError)
if error:
raise error(f"{message} ({code})")
content_keys = data.get("content_keys")
if content_keys:
if not isinstance(content_keys, dict):
raise ValueError(f"Expected {content_keys} to be {dict}, was {type(content_keys)}")
for key_id, key in content_keys.items():
yield key_id, key
pages = int(data["pages"])
if pages <= page:
break
page += 1
def add_key(self, service: str, kid: Union[UUID, str], key: str) -> bool:
if isinstance(kid, UUID):
kid = kid.hex
data = self.session.post(
url=f"{self.uri}/{service.lower()}/{kid}",
json={
"content_key": key
},
headers={
"Accept": "application/json"
}
).json()
code = int(data.get("code", 0))
message = data.get("message")
error = {
0: None,
1: Exceptions.AuthRejected,
2: Exceptions.TooManyRequests,
3: Exceptions.ServiceTagInvalid,
4: Exceptions.KeyIdInvalid,
5: Exceptions.ContentKeyInvalid
}.get(code, ValueError)
if error:
raise error(f"{message} ({code})")
# the kid:key was new to the vault (optional)
added = bool(data.get("added"))
# the key for kid was changed/updated (optional)
updated = bool(data.get("updated"))
return added or updated
def add_keys(self, service: str, kid_keys: dict[Union[UUID, str], str]) -> int:
data = self.session.post(
url=f"{self.uri}/{service.lower()}",
json={
"content_keys": {
str(kid).replace("-", ""): key
for kid, key in kid_keys.items()
}
},
headers={
"Accept": "application/json"
}
).json()
code = int(data.get("code", 0))
message = data.get("message")
error = {
0: None,
1: Exceptions.AuthRejected,
2: Exceptions.TooManyRequests,
3: Exceptions.ServiceTagInvalid,
4: Exceptions.KeyIdInvalid,
5: Exceptions.ContentKeyInvalid
}.get(code, ValueError)
if error:
raise error(f"{message} ({code})")
# each kid:key that was new to the vault (optional)
added = int(data.get("added"))
# each key for a kid that was changed/updated (optional)
updated = int(data.get("updated"))
return added + updated
def get_services(self) -> Iterator[str]:
data = self.session.post(
url=self.uri,
headers={
"Accept": "application/json"
}
).json()
code = int(data.get("code", 0))
message = data.get("message")
error = {
0: None,
1: Exceptions.AuthRejected,
2: Exceptions.TooManyRequests,
}.get(code, ValueError)
if error:
raise error(f"{message} ({code})")
service_list = data.get("service_list", [])
if not isinstance(service_list, list):
raise ValueError(f"Expected {service_list} to be {list}, was {type(service_list)}")
for service in service_list:
yield service
class Exceptions:
class AuthRejected(Exception):
"""Authentication Error Occurred, is your token valid? Do you have permission to make this call?"""
class TooManyRequests(Exception):
"""Rate Limited; Sent too many requests in a given amount of time."""
class PageInvalid(Exception):
"""Requested page does not exist."""
class ServiceTagInvalid(Exception):
"""The Service Tag is invalid."""
class KeyIdInvalid(Exception):
"""The Key ID is invalid."""
class ContentKeyInvalid(Exception):
"""The Content Key is invalid."""