mirror of
https://github.com/devine-dl/devine.git
synced 2025-04-29 17:49:44 +00:00
215 lines
6.3 KiB
Python
215 lines
6.3 KiB
Python
from typing import Iterator, Optional, Union
|
|
from uuid import UUID
|
|
|
|
from requests import Session
|
|
|
|
from devine.core import __version__
|
|
from devine.core.vault import Vault
|
|
|
|
|
|
class API(Vault):
|
|
"""Key Vault using a simple RESTful HTTP API call."""
|
|
|
|
def __init__(self, name: str, uri: str, token: str):
|
|
super().__init__(name)
|
|
self.uri = uri.rstrip("/")
|
|
self.session = Session()
|
|
self.session.headers.update({
|
|
"User-Agent": f"Devine v{__version__}"
|
|
})
|
|
self.session.headers.update({
|
|
"Authorization": f"Bearer {token}"
|
|
})
|
|
|
|
def get_key(self, kid: Union[UUID, str], service: str) -> Optional[str]:
|
|
if isinstance(kid, UUID):
|
|
kid = kid.hex
|
|
|
|
data = self.session.get(
|
|
url=f"{self.uri}/{service.lower()}/{kid}",
|
|
headers={
|
|
"Accept": "application/json"
|
|
}
|
|
).json()
|
|
|
|
code = int(data.get("code", 0))
|
|
message = data.get("message")
|
|
error = {
|
|
0: None,
|
|
1: Exceptions.AuthRejected,
|
|
2: Exceptions.TooManyRequests,
|
|
3: Exceptions.ServiceTagInvalid,
|
|
4: Exceptions.KeyIdInvalid
|
|
}.get(code, ValueError)
|
|
|
|
if error:
|
|
raise error(f"{message} ({code})")
|
|
|
|
content_key = data.get("content_key")
|
|
if not content_key:
|
|
return None
|
|
|
|
if not isinstance(content_key, str):
|
|
raise ValueError(f"Expected {content_key} to be {str}, was {type(content_key)}")
|
|
|
|
return content_key
|
|
|
|
def get_keys(self, service: str) -> Iterator[tuple[str, str]]:
|
|
page = 1
|
|
|
|
while True:
|
|
data = self.session.get(
|
|
url=f"{self.uri}/{service.lower()}",
|
|
params={
|
|
"page": page,
|
|
"total": 10
|
|
},
|
|
headers={
|
|
"Accept": "application/json"
|
|
}
|
|
).json()
|
|
|
|
code = int(data.get("code", 0))
|
|
message = data.get("message")
|
|
error = {
|
|
0: None,
|
|
1: Exceptions.AuthRejected,
|
|
2: Exceptions.TooManyRequests,
|
|
3: Exceptions.PageInvalid,
|
|
4: Exceptions.ServiceTagInvalid,
|
|
}.get(code, ValueError)
|
|
|
|
if error:
|
|
raise error(f"{message} ({code})")
|
|
|
|
content_keys = data.get("content_keys")
|
|
if content_keys:
|
|
if not isinstance(content_keys, dict):
|
|
raise ValueError(f"Expected {content_keys} to be {dict}, was {type(content_keys)}")
|
|
|
|
for key_id, key in content_keys.items():
|
|
yield key_id, key
|
|
|
|
pages = int(data["pages"])
|
|
if pages <= page:
|
|
break
|
|
|
|
page += 1
|
|
|
|
def add_key(self, service: str, kid: Union[UUID, str], key: str) -> bool:
|
|
if isinstance(kid, UUID):
|
|
kid = kid.hex
|
|
|
|
data = self.session.post(
|
|
url=f"{self.uri}/{service.lower()}/{kid}",
|
|
json={
|
|
"content_key": key
|
|
},
|
|
headers={
|
|
"Accept": "application/json"
|
|
}
|
|
).json()
|
|
|
|
code = int(data.get("code", 0))
|
|
message = data.get("message")
|
|
error = {
|
|
0: None,
|
|
1: Exceptions.AuthRejected,
|
|
2: Exceptions.TooManyRequests,
|
|
3: Exceptions.ServiceTagInvalid,
|
|
4: Exceptions.KeyIdInvalid,
|
|
5: Exceptions.ContentKeyInvalid
|
|
}.get(code, ValueError)
|
|
|
|
if error:
|
|
raise error(f"{message} ({code})")
|
|
|
|
# the kid:key was new to the vault (optional)
|
|
added = bool(data.get("added"))
|
|
# the key for kid was changed/updated (optional)
|
|
updated = bool(data.get("updated"))
|
|
|
|
return added or updated
|
|
|
|
def add_keys(self, service: str, kid_keys: dict[Union[UUID, str], str]) -> int:
|
|
data = self.session.post(
|
|
url=f"{self.uri}/{service.lower()}",
|
|
json={
|
|
"content_keys": {
|
|
str(kid).replace("-", ""): key
|
|
for kid, key in kid_keys.items()
|
|
}
|
|
},
|
|
headers={
|
|
"Accept": "application/json"
|
|
}
|
|
).json()
|
|
|
|
code = int(data.get("code", 0))
|
|
message = data.get("message")
|
|
error = {
|
|
0: None,
|
|
1: Exceptions.AuthRejected,
|
|
2: Exceptions.TooManyRequests,
|
|
3: Exceptions.ServiceTagInvalid,
|
|
4: Exceptions.KeyIdInvalid,
|
|
5: Exceptions.ContentKeyInvalid
|
|
}.get(code, ValueError)
|
|
|
|
if error:
|
|
raise error(f"{message} ({code})")
|
|
|
|
# each kid:key that was new to the vault (optional)
|
|
added = int(data.get("added"))
|
|
# each key for a kid that was changed/updated (optional)
|
|
updated = int(data.get("updated"))
|
|
|
|
return added + updated
|
|
|
|
def get_services(self) -> Iterator[str]:
|
|
data = self.session.post(
|
|
url=self.uri,
|
|
headers={
|
|
"Accept": "application/json"
|
|
}
|
|
).json()
|
|
|
|
code = int(data.get("code", 0))
|
|
message = data.get("message")
|
|
error = {
|
|
0: None,
|
|
1: Exceptions.AuthRejected,
|
|
2: Exceptions.TooManyRequests,
|
|
}.get(code, ValueError)
|
|
|
|
if error:
|
|
raise error(f"{message} ({code})")
|
|
|
|
service_list = data.get("service_list", [])
|
|
|
|
if not isinstance(service_list, list):
|
|
raise ValueError(f"Expected {service_list} to be {list}, was {type(service_list)}")
|
|
|
|
for service in service_list:
|
|
yield service
|
|
|
|
|
|
class Exceptions:
|
|
class AuthRejected(Exception):
|
|
"""Authentication Error Occurred, is your token valid? Do you have permission to make this call?"""
|
|
|
|
class TooManyRequests(Exception):
|
|
"""Rate Limited; Sent too many requests in a given amount of time."""
|
|
|
|
class PageInvalid(Exception):
|
|
"""Requested page does not exist."""
|
|
|
|
class ServiceTagInvalid(Exception):
|
|
"""The Service Tag is invalid."""
|
|
|
|
class KeyIdInvalid(Exception):
|
|
"""The Key ID is invalid."""
|
|
|
|
class ContentKeyInvalid(Exception):
|
|
"""The Content Key is invalid."""
|