from __future__ import annotations
from ..utils import Request, Api, POST
from ..given import send_complete_request
from .cloud_base import ACloudObject, TokenType
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from ..utils import CloudContext
from .cloud_user import User
[docs]
class JobToken(ACloudObject):
"""Job token class for job-specific authentication.
This class handles the creation and management of tokens used for job operations.
:param label: Label for the token
:param duration: Token duration in seconds
:param token_id: Token ID (defaults to -1 for new tokens)
:param user_id: ID of the user the token belongs to
:param token: Token string
:param kwargs: Additional attributes to set on the token
"""
@classmethod
[docs]
def get_object_name(cls) -> str:
return "token"
@classmethod
[docs]
def get_endpoint(cls) -> str:
return "/api/auth/tokens"
@classmethod
[docs]
def get_api(cls, context: CloudContext) -> Api:
return context.apis["cloud"]
@classmethod
[docs]
def get_token_method(cls) -> str:
return TokenType.Login
[docs]
DURATION_KEY = "duration"
def __init__(self,
label: str = None,
duration: int = 99999,
token_id: int = -1,
user_id: int = -1,
token: str = "",
**kwargs):
[docs]
self.label: str = label
[docs]
self.token_id: int = kwargs.get("id", token_id)
[docs]
self.user_id: int = user_id
[docs]
self.duration: int = duration
[docs]
self.token: str = token
[docs]
def create(self, context: CloudContext, user: User) -> None:
"""Create this job token in the cloud.
:param context: Cloud context for API configuration
:param user: User providing authentication and owning the token
:raises AssertionError: If the user doesn't have a login token
:raises RuntimeError: If job token creation fails
"""
assert user.login_token, "Cannot create job token without being log"
context.request = self.get_create_request(context, user)
body = {key: self.__dict__[key] for key in
[JobToken.LABEL_KEY, JobToken.DURATION_KEY]}
context.request.body = body
send_complete_request(context)
if int(context.request.response.status_code) != 200:
raise RuntimeError(
f"Cannot create job token: {context.request.get_pretty_response()}")
response = context.request.response.json()
self.update_from_dict(response)
self.user_id = user.user_id
[docs]
def read(self, context: CloudContext, user: User) -> None:
"""Read this job token's information from the cloud.
:param context: Cloud context for API configuration
:param user: User providing authentication
:return: Dictionary with token data from response
:raises AssertionError: If the user doesn't have job tokens or the token wasn't created
:raises RuntimeError: If reading token data fails
"""
assert user.job_tokens, "Cannot get job token without being log"
assert self.token_id != -1, "Job token was never created, can't get it"
context.request = self.get_read_request(context, user)
send_complete_request(context)
if int(context.request.response.status_code) != 200:
raise RuntimeError(
f"Cannot get job token: {context.request.get_pretty_response()}")
response = context.request.response.json()
self.update_from_dict(response)
return response
[docs]
def delete(self, context: CloudContext, user: User) -> None:
"""Delete this job token from the cloud.
:param context: Cloud context for API configuration
:param user: User providing authentication
:raises AssertionError: If the token wasn't created
:raises RuntimeError: If deleting the token fails
"""
assert self.token_id != -1, "Job token was never created, can't delete it"
context.request = Request(api=self.get_api(context))
context.request.set_authorization_header(user.get_token(self.get_token_method()))
context.request.set_json_content_type()
context.request.set_url(self.get_endpoint()+"/delete")
context.request.set_method(POST)
context.request.body = {"tokens": [self.token]}
send_complete_request(context)
if int(context.request.response.status_code) != 200:
raise RuntimeError(
f"Cannot delete job token: {context.request.get_pretty_response()}")
@classmethod
[docs]
def list_all(cls, context: CloudContext, admin: User) -> list[JobToken]:
"""List all job tokens in the cloud.
:param context: Cloud context for API configuration
:param admin: Admin user providing authentication
:return: List of JobToken objects
:raises AssertionError: If the user is not an admin
:raises RuntimeError: If listing tokens fails
"""
assert admin.is_admin, "Can only list job_tokens as an admin"
context.request = JobToken.get_list_all_request(context, admin)
send_complete_request(context)
if int(context.request.response.status_code) != 200:
raise RuntimeError(
f"Cannot list users: {context.request.get_pretty_response()}")
return [JobToken(**elem) for elem in context.request.response.json()]
def __repr__(self) -> str:
return f"JobToken(id={self.token_id})"
[docs]
class LoginToken:
"""Login token class for user authentication.
This class manages the tokens used for user login and general API access.
"""
[docs]
ENDPOINT = "/api/login"
def __init__(self) -> None:
"""Initialize a new LoginToken object.
Login tokens are used for general API access and authentication.
"""
[docs]
def create(self, context: CloudContext, user=None) -> None:
"""Create a login token for the user.
:param context: Cloud context for API configuration
:param user: User to create the login token for
:raises RuntimeError: If login token creation fails
"""
context.request = Request(api=context.apis["account"])
context.request.set_form_content_type()
# Use string constants directly instead of importing User class
body = {key: user.__dict__[key] for key in
["email", "password"]} # Using string constants directly
body["mode"] = "admin" if user.is_admin else "user"
context.request.body = body
context.request.set_method(POST)
context.request.set_url(LoginToken.ENDPOINT)
send_complete_request(context)
if int(context.request.response.status_code) != 200:
raise RuntimeError(
f"Cannot create login token: {context.request.get_pretty_response()}")
response = context.request.response.json()
self.token = response["token"]
[docs]
class ApiKey(ACloudObject):
"""API key class for API authentication.
This class handles the creation and management of API keys for worker access.
"""
@classmethod
[docs]
def get_object_name(cls) -> str:
return "api_key"
@classmethod
[docs]
def get_endpoint(cls) -> str:
return "/api/api_keys"
@classmethod
[docs]
def get_api(cls, context: CloudContext) -> Api:
return context.apis["cloud"]
@classmethod
[docs]
def get_token_method(cls) -> str:
return TokenType.Login
[docs]
DURATION_KEY = "duration"
def __init__(self,
label: str = None,
duration: int = 99999,
api_key: str = "",
api_key_id: int = -1,
**_):
"""Initialize a new ApiKey object.
:param label: Label for the API key
:param duration: API key duration in seconds
:param api_key: API key string
:param api_key_id: API key ID (defaults to -1 for new API keys)
:param _: Additional attributes to set on the API key
"""
[docs]
self.api_key: str = api_key
[docs]
self.api_key_id: int = api_key_id
[docs]
self.label: str = label
[docs]
self.duration: int = duration
@property
[docs]
def token(self) -> str:
"""Get the token string for this API key.
:return: API key string
"""
return self.api_key
[docs]
def create(self, context: CloudContext, admin=None) -> None:
"""Create this API key in the cloud.
:param context: Cloud context for API configuration
:param admin: Admin user providing authentication
:raises AssertionError: If the user is not an admin
:raises RuntimeError: If API key creation fails
"""
assert admin.is_admin, "Can only create api key as an admin"
context.request = self.get_create_request(context, admin)
body = {key: admin.__dict__[key] for key in
[ApiKey.DURATION_KEY,
ApiKey.LABEL_KEY]}
context.request.body = body
send_complete_request(context)
if int(context.request.response.status_code) != 200:
raise RuntimeError(
f"Cannot create api_key: {context.request.get_pretty_response()}")
response = context.request.response.json()
self.get_update_request(response)