Source code for steps.api.cloud_objects.cloud_token

from __future__ import annotations

from ..utils import Request, Api, POST
from ..given import send_complete_request

from .cloud_base import ACloudObject, TokenType

from typing import TYPE_CHECKING
if TYPE_CHECKING:
    from ..utils import CloudContext
    from .cloud_user import User


[docs] class JobToken(ACloudObject): """Job token class for job-specific authentication. This class handles the creation and management of tokens used for job operations. :param label: Label for the token :param duration: Token duration in seconds :param token_id: Token ID (defaults to -1 for new tokens) :param user_id: ID of the user the token belongs to :param token: Token string :param kwargs: Additional attributes to set on the token """ @classmethod
[docs] def get_object_name(cls) -> str: return "token"
@classmethod
[docs] def get_endpoint(cls) -> str: return "/api/auth/tokens"
@classmethod
[docs] def get_api(cls, context: CloudContext) -> Api: return context.apis["cloud"]
@classmethod
[docs] def get_token_method(cls) -> str: return TokenType.Login
[docs] DURATION_KEY = "duration"
[docs] LABEL_KEY = "label"
def __init__(self, label: str = None, duration: int = 99999, token_id: int = -1, user_id: int = -1, token: str = "", **kwargs):
[docs] self.label: str = label
[docs] self.token_id: int = kwargs.get("id", token_id)
[docs] self.user_id: int = user_id
[docs] self.duration: int = duration
[docs] self.token: str = token
[docs] def create(self, context: CloudContext, user: User) -> None: """Create this job token in the cloud. :param context: Cloud context for API configuration :param user: User providing authentication and owning the token :raises AssertionError: If the user doesn't have a login token :raises RuntimeError: If job token creation fails """ assert user.login_token, "Cannot create job token without being log" context.request = self.get_create_request(context, user) body = {key: self.__dict__[key] for key in [JobToken.LABEL_KEY, JobToken.DURATION_KEY]} context.request.body = body send_complete_request(context) if int(context.request.response.status_code) != 200: raise RuntimeError( f"Cannot create job token: {context.request.get_pretty_response()}") response = context.request.response.json() self.update_from_dict(response) self.user_id = user.user_id
[docs] def read(self, context: CloudContext, user: User) -> None: """Read this job token's information from the cloud. :param context: Cloud context for API configuration :param user: User providing authentication :return: Dictionary with token data from response :raises AssertionError: If the user doesn't have job tokens or the token wasn't created :raises RuntimeError: If reading token data fails """ assert user.job_tokens, "Cannot get job token without being log" assert self.token_id != -1, "Job token was never created, can't get it" context.request = self.get_read_request(context, user) send_complete_request(context) if int(context.request.response.status_code) != 200: raise RuntimeError( f"Cannot get job token: {context.request.get_pretty_response()}") response = context.request.response.json() self.update_from_dict(response) return response
[docs] def delete(self, context: CloudContext, user: User) -> None: """Delete this job token from the cloud. :param context: Cloud context for API configuration :param user: User providing authentication :raises AssertionError: If the token wasn't created :raises RuntimeError: If deleting the token fails """ assert self.token_id != -1, "Job token was never created, can't delete it" context.request = Request(api=self.get_api(context)) context.request.set_authorization_header(user.get_token(self.get_token_method())) context.request.set_json_content_type() context.request.set_url(self.get_endpoint()+"/delete") context.request.set_method(POST) context.request.body = {"tokens": [self.token]} send_complete_request(context) if int(context.request.response.status_code) != 200: raise RuntimeError( f"Cannot delete job token: {context.request.get_pretty_response()}")
@classmethod
[docs] def list_all(cls, context: CloudContext, admin: User) -> list[JobToken]: """List all job tokens in the cloud. :param context: Cloud context for API configuration :param admin: Admin user providing authentication :return: List of JobToken objects :raises AssertionError: If the user is not an admin :raises RuntimeError: If listing tokens fails """ assert admin.is_admin, "Can only list job_tokens as an admin" context.request = JobToken.get_list_all_request(context, admin) send_complete_request(context) if int(context.request.response.status_code) != 200: raise RuntimeError( f"Cannot list users: {context.request.get_pretty_response()}") return [JobToken(**elem) for elem in context.request.response.json()]
def __repr__(self) -> str: return f"JobToken(id={self.token_id})"
[docs] class LoginToken: """Login token class for user authentication. This class manages the tokens used for user login and general API access. """
[docs] ENDPOINT = "/api/login"
def __init__(self) -> None: """Initialize a new LoginToken object. Login tokens are used for general API access and authentication. """
[docs] self.token: str = None
[docs] def create(self, context: CloudContext, user=None) -> None: """Create a login token for the user. :param context: Cloud context for API configuration :param user: User to create the login token for :raises RuntimeError: If login token creation fails """ context.request = Request(api=context.apis["account"]) context.request.set_form_content_type() # Use string constants directly instead of importing User class body = {key: user.__dict__[key] for key in ["email", "password"]} # Using string constants directly body["mode"] = "admin" if user.is_admin else "user" context.request.body = body context.request.set_method(POST) context.request.set_url(LoginToken.ENDPOINT) send_complete_request(context) if int(context.request.response.status_code) != 200: raise RuntimeError( f"Cannot create login token: {context.request.get_pretty_response()}") response = context.request.response.json() self.token = response["token"]
[docs] class ApiKey(ACloudObject): """API key class for API authentication. This class handles the creation and management of API keys for worker access. """ @classmethod
[docs] def get_object_name(cls) -> str: return "api_key"
@classmethod
[docs] def get_endpoint(cls) -> str: return "/api/api_keys"
@classmethod
[docs] def get_api(cls, context: CloudContext) -> Api: return context.apis["cloud"]
@classmethod
[docs] def get_token_method(cls) -> str: return TokenType.Login
[docs] DURATION_KEY = "duration"
[docs] LABEL_KEY = "label"
def __init__(self, label: str = None, duration: int = 99999, api_key: str = "", api_key_id: int = -1, **_): """Initialize a new ApiKey object. :param label: Label for the API key :param duration: API key duration in seconds :param api_key: API key string :param api_key_id: API key ID (defaults to -1 for new API keys) :param _: Additional attributes to set on the API key """
[docs] self.api_key: str = api_key
[docs] self.api_key_id: int = api_key_id
[docs] self.label: str = label
[docs] self.duration: int = duration
@property
[docs] def token(self) -> str: """Get the token string for this API key. :return: API key string """ return self.api_key
[docs] def create(self, context: CloudContext, admin=None) -> None: """Create this API key in the cloud. :param context: Cloud context for API configuration :param admin: Admin user providing authentication :raises AssertionError: If the user is not an admin :raises RuntimeError: If API key creation fails """ assert admin.is_admin, "Can only create api key as an admin" context.request = self.get_create_request(context, admin) body = {key: admin.__dict__[key] for key in [ApiKey.DURATION_KEY, ApiKey.LABEL_KEY]} context.request.body = body send_complete_request(context) if int(context.request.response.status_code) != 200: raise RuntimeError( f"Cannot create api_key: {context.request.get_pretty_response()}") response = context.request.response.json() self.get_update_request(response)