Source code for steps.api.cloud_objects.cloud_user

from __future__ import annotations

from string import ascii_letters, digits
from secrets import choice
from sys import _getframe

from ..utils import Api
from ..given import send_complete_request

from .cloud_base import ACloudObject, TokenType

from typing import TYPE_CHECKING
if TYPE_CHECKING:
    from ..utils import CloudContext

from .cloud_token import JobToken, LoginToken, ApiKey

[docs] SYMBOLS = '*.!@#$%^&(){}[]:;<>,.?/~_+-=|\\'
[docs] ALPHABET = ascii_letters + digits + SYMBOLS
[docs] CHAR_REPLACE_MAP = { " ": ".", "-": ".", "'": ".", "é": "e", "è": "e", "à": "a", "ï": "i", "î": 'i', "ô": "o" }
def _sanitize_str_for_email(my_str: str) -> None: my_str = my_str.lower() for char_to_replace, substitute_char in CHAR_REPLACE_MAP.items(): my_str = my_str.replace(char_to_replace, substitute_char) return my_str
[docs] class User(ACloudObject): """User class representing a user in the cloud system. This class handles all user-related operations including user creation, authentication, token management, and CRUD operations. :param first_name: User's first name :param last_name: User's last name :param user_id: User's ID (defaults to -1 for new users) :param email: User's email address :param password: User's password :param platform_ids: List of platform IDs the user has access to :param is_admin: Whether the user has admin privileges :param priority: User's priority level (default: 2) :param company_id: User's company ID :param kwargs: Additional attributes to set on the user """
[docs] EMAIL_KEY = "email"
[docs] PASSWORD_KEY = "password"
[docs] FIRST_NAME_KEY = "first_name"
[docs] LAST_NAME_KEY = "last_name"
[docs] PLATFORM_IDS_KEY = "platform_ids"
[docs] IS_ADMIN_KEY = "is_admin"
[docs] COMPANY_ID_KEY = "company_id"
[docs] PRIORITY_KEY = "priority"
@classmethod
[docs] def get_object_name(cls) -> str: return "user"
@classmethod
[docs] def get_endpoint(cls) -> str: return "/api/users"
@classmethod
[docs] def get_api(cls, context: CloudContext) -> Api: return context.apis["account"]
@classmethod
[docs] def get_token_method(cls) -> str: return TokenType.Login
[docs] def get_token(self, token_type: TokenType) -> None: """Get a token of the specified type for this user. :param token_type: Type of token to retrieve :return: Token string for the requested token type :raises NotImplementedError: If the token type is not supported """ if token_type == TokenType.Login: return self.login_token.token elif token_type == TokenType.Job: return self.job_tokens[0].token if token_type == TokenType.API_Key: return self.api_key.token else: raise NotImplementedError(f"get_token not implemented for {token_type}")
def __init__(self, first_name: str = "", last_name: str = "", user_id: int = -1, email: str = "", password: str = "", platform_ids: list[str] = [], is_admin: bool = False, priority: int = 2, company_id: int = None, **kwargs):
[docs] self.first_name: str = first_name
[docs] self.last_name: str = last_name
[docs] self.email: str = email
[docs] self.password: str = password
[docs] self.user_id: int = kwargs.get("id", user_id)
[docs] self.is_admin: bool = is_admin
[docs] self.priority: int = priority
[docs] self.company_id: int = company_id
[docs] self.platform_ids: list[str] = platform_ids
[docs] self.job_tokens: list[JobToken] = []
[docs] self.login_token: LoginToken = LoginToken()
[docs] self.api_key: ApiKey = None
self._variables = {}
[docs] def keys(self) -> list[str]: return list(self.__dict__.keys())\ + list(self._variables.keys())\ + ["job_token", "job_token_id", "login_token"]
def __getitem__(self, key: str) -> None: if key == "job_token": assert self.job_tokens, "No existing job token" return self.job_tokens[0].token elif key == "job_token_id": assert self.job_tokens, "No existing job token" return self.job_tokens[0].token_id elif key == "login_token": return self.login_token.token elif key == "api_key": return self.api_key.token elif key in self.__dict__: return self.__dict__[key] return self._variables[key] def __setitem__(self, key: str, value) -> None: if key == "job_token": if not self.job_tokens: self.job_tokens.append(JobToken()) self.job_tokens[0].token = value elif key == "job_token_id": if not self.job_tokens: self.job_tokens.append(JobToken()) self.job_tokens[0].token_id = value elif key == "login_token": self.login_token.token = value elif key == "api_key": self.api_key.token = value elif key in self.__dict__: self.__dict__[key] = value else: self._variables[key] = value
[docs] def generate_data(self) -> None: """Generate email and password data for this user. Creates a standardized email based on first and last name, and generates a secure random password that meets complexity requirements. """ self.email = f"qa.{_sanitize_str_for_email(self.first_name)}" + \ f".{_sanitize_str_for_email(self.last_name)}@quandela.com" while True: password = ''.join(choice(ALPHABET) for _ in range(20)) if (sum(c.islower() for c in password) >= 4 and sum(c.isupper() for c in password) >= 4 and sum(c.isdigit() for c in password) >= 4 and sum(c in ALPHABET for c in password)): break self.password = password
def _get_error_message(self, method_name: str, context: CloudContext) -> str: """Generate an error message for failed API operations. :param method_name: Name of the method that failed :param context: Cloud context containing request/response data :return: Formatted error message """ return f"Cannot {method_name} user {self.email}: "\ + f"{context.request.get_pretty_response()}"
[docs] def create(self, context: CloudContext, user: User) -> None: """Create this user in the cloud. :param context: Cloud context for API configuration :param user: User providing authentication for the creation :raises RuntimeError: If user creation fails :raises AssertionError: If trying to create an admin user """ method_name = _getframe().f_code.co_name assert not self.is_admin, f"Can't {method_name} user user" context.request = self.get_create_request(context, user) body = {key: self.__dict__[key] for key in [User.FIRST_NAME_KEY, User.LAST_NAME_KEY, User.EMAIL_KEY, User.PASSWORD_KEY, User.PLATFORM_IDS_KEY, User.IS_ADMIN_KEY, User.COMPANY_ID_KEY, User.PRIORITY_KEY] if self.__dict__[key] is not None} body["priority"] = 2 context.request.body = body send_complete_request(context) if int(context.request.response.status_code) != 200: raise RuntimeError(self._get_error_message(method_name, context)) response = context.request.response.json() self.update_from_dict(response)
[docs] def read(self, context: CloudContext, user=None) -> dict[str]: """Read this user's information from the cloud. :param context: Cloud context for API configuration :param user: User providing authentication (defaults to self) :return: Dictionary with user data from response :raises RuntimeError: If reading user data fails :raises AssertionError: If the user has not been created yet """ if user is None: user = self method_name = _getframe().f_code.co_name assert self.user_id != -1, f"User was never created, can't {method_name} it" context.request = self.get_read_request(context, user) send_complete_request(context) if int(context.request.response.status_code) != 200: raise RuntimeError(self._get_error_message(method_name, context)) response = context.request.response.json() self.update_from_dict(response) return response
[docs] def update(self, context: CloudContext, user, update_dict: dict[str]) -> None: """Update this user's information in the cloud. :param context: Cloud context for API configuration :param user: User providing authentication (defaults to self) :param update_dict: Dictionary with fields to update :raises RuntimeError: If updating user data fails :raises AssertionError: If the user has not been created yet """ if user is None: user = self method_name = _getframe().f_code.co_name assert self.user_id != -1, f"User was never created, can't {method_name} it" context.request = self.get_update_request(context, user) context.body = update_dict send_complete_request(context) if int(context.request.response.status_code) != 200: raise RuntimeError(self._get_error_message(method_name, context))
[docs] def delete(self, context: CloudContext, user: User) -> None: """Delete this user from the cloud. :param context: Cloud context for API configuration :param user: User providing authentication :raises RuntimeError: If deleting the user fails :raises AssertionError: If the user has not been created or is an admin """ method_name = _getframe().f_code.co_name assert self.user_id != -1, f"User was never created, can't {method_name} it" assert not self.is_admin, f"Can only {method_name} user as an admin" context.request = self.get_delete_request(context, user) send_complete_request(context) if int(context.request.response.status_code) != 200: raise RuntimeError(self._get_error_message(method_name, context))
[docs] def log(self, context: CloudContext) -> None: """Login the user and obtain a login token. :param context: Cloud context for API configuration :raises AssertionError: If the user is not an admin and hasn't been created """ method_name = _getframe().f_code.co_name assert self.is_admin or self.user_id != -1, f"User was never created, can't {method_name} it" self.login_token.create(context, self)
[docs] def create_job_token(self, context: CloudContext, token_dict: dict[str]) -> None: """Create a job token for this user. :param context: Cloud context for API configuration :param token_dict: Dictionary with token parameters :raises AssertionError: If the user hasn't been created """ method_name = _getframe().f_code.co_name assert self.user_id != -1, f"User was never created, can't {method_name} it" token = JobToken(**token_dict) token.create(context, self) self.job_tokens.append(token)
[docs] def create_api_key(self, context: CloudContext, api_key_dict: dict[str]) -> None: """Create an API key for this user. :param context: Cloud context for API configuration :param api_key_dict: Dictionary with API key parameters :raises AssertionError: If the user hasn't been created """ method_name = _getframe().f_code.co_name assert self.user_id != -1, f"User was never created, can't {method_name} it" api_key = ApiKey(**api_key_dict) api_key.create(context, self) self.api_key = api_key
@classmethod
[docs] def list_all(cls, context: CloudContext, admin: User) -> list[User]: """List all users in the cloud. :param context: Cloud context for API configuration :param admin: Admin user providing authentication :return: List of User objects :raises RuntimeError: If listing users fails :raises AssertionError: If the authenticating user is not an admin """ assert admin.is_admin, "Can only list users as an admin" context.request = User.get_list_all_request(context, admin) send_complete_request(context) if int(context.request.response.status_code) != 200: raise RuntimeError( f"Cannot list users: {context.request.get_pretty_response()}") return [User(**elem) for elem in context.request.response.json()]
def __repr__(self) -> str: return self.email