from __future__ import annotations
from string import ascii_letters, digits
from secrets import choice
from sys import _getframe
from ..utils import Api
from ..given import send_complete_request
from .cloud_base import ACloudObject, TokenType
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from ..utils import CloudContext
from .cloud_token import JobToken, LoginToken, ApiKey
[docs]
SYMBOLS = '*.!@#$%^&(){}[]:;<>,.?/~_+-=|\\'
[docs]
ALPHABET = ascii_letters + digits + SYMBOLS
[docs]
CHAR_REPLACE_MAP = {
" ": ".",
"-": ".",
"'": ".",
"é": "e",
"è": "e",
"à": "a",
"ï": "i",
"î": 'i',
"ô": "o"
}
def _sanitize_str_for_email(my_str: str) -> None:
my_str = my_str.lower()
for char_to_replace, substitute_char in CHAR_REPLACE_MAP.items():
my_str = my_str.replace(char_to_replace, substitute_char)
return my_str
[docs]
class User(ACloudObject):
"""User class representing a user in the cloud system.
This class handles all user-related operations including user creation,
authentication, token management, and CRUD operations.
:param first_name: User's first name
:param last_name: User's last name
:param user_id: User's ID (defaults to -1 for new users)
:param email: User's email address
:param password: User's password
:param platform_ids: List of platform IDs the user has access to
:param is_admin: Whether the user has admin privileges
:param priority: User's priority level (default: 2)
:param company_id: User's company ID
:param kwargs: Additional attributes to set on the user
"""
[docs]
PASSWORD_KEY = "password"
[docs]
FIRST_NAME_KEY = "first_name"
[docs]
LAST_NAME_KEY = "last_name"
[docs]
IS_ADMIN_KEY = "is_admin"
[docs]
COMPANY_ID_KEY = "company_id"
[docs]
PRIORITY_KEY = "priority"
@classmethod
[docs]
def get_object_name(cls) -> str:
return "user"
@classmethod
[docs]
def get_endpoint(cls) -> str:
return "/api/users"
@classmethod
[docs]
def get_api(cls, context: CloudContext) -> Api:
return context.apis["account"]
@classmethod
[docs]
def get_token_method(cls) -> str:
return TokenType.Login
[docs]
def get_token(self, token_type: TokenType) -> None:
"""Get a token of the specified type for this user.
:param token_type: Type of token to retrieve
:return: Token string for the requested token type
:raises NotImplementedError: If the token type is not supported
"""
if token_type == TokenType.Login:
return self.login_token.token
elif token_type == TokenType.Job:
return self.job_tokens[0].token
if token_type == TokenType.API_Key:
return self.api_key.token
else:
raise NotImplementedError(f"get_token not implemented for {token_type}")
def __init__(self,
first_name: str = "",
last_name: str = "",
user_id: int = -1,
email: str = "",
password: str = "",
platform_ids: list[str] = [],
is_admin: bool = False,
priority: int = 2,
company_id: int = None,
**kwargs):
[docs]
self.first_name: str = first_name
[docs]
self.last_name: str = last_name
[docs]
self.email: str = email
[docs]
self.password: str = password
[docs]
self.user_id: int = kwargs.get("id", user_id)
[docs]
self.is_admin: bool = is_admin
[docs]
self.priority: int = priority
[docs]
self.company_id: int = company_id
[docs]
self.job_tokens: list[JobToken] = []
[docs]
self.login_token: LoginToken = LoginToken()
[docs]
self.api_key: ApiKey = None
self._variables = {}
[docs]
def keys(self) -> list[str]:
return list(self.__dict__.keys())\
+ list(self._variables.keys())\
+ ["job_token", "job_token_id", "login_token"]
def __getitem__(self, key: str) -> None:
if key == "job_token":
assert self.job_tokens, "No existing job token"
return self.job_tokens[0].token
elif key == "job_token_id":
assert self.job_tokens, "No existing job token"
return self.job_tokens[0].token_id
elif key == "login_token":
return self.login_token.token
elif key == "api_key":
return self.api_key.token
elif key in self.__dict__:
return self.__dict__[key]
return self._variables[key]
def __setitem__(self, key: str, value) -> None:
if key == "job_token":
if not self.job_tokens:
self.job_tokens.append(JobToken())
self.job_tokens[0].token = value
elif key == "job_token_id":
if not self.job_tokens:
self.job_tokens.append(JobToken())
self.job_tokens[0].token_id = value
elif key == "login_token":
self.login_token.token = value
elif key == "api_key":
self.api_key.token = value
elif key in self.__dict__:
self.__dict__[key] = value
else:
self._variables[key] = value
[docs]
def generate_data(self) -> None:
"""Generate email and password data for this user.
Creates a standardized email based on first and last name,
and generates a secure random password that meets complexity requirements.
"""
self.email = f"qa.{_sanitize_str_for_email(self.first_name)}" + \
f".{_sanitize_str_for_email(self.last_name)}@quandela.com"
while True:
password = ''.join(choice(ALPHABET) for _ in range(20))
if (sum(c.islower() for c in password) >= 4
and sum(c.isupper() for c in password) >= 4
and sum(c.isdigit() for c in password) >= 4 and sum(c in ALPHABET for c in password)):
break
self.password = password
def _get_error_message(self, method_name: str, context: CloudContext) -> str:
"""Generate an error message for failed API operations.
:param method_name: Name of the method that failed
:param context: Cloud context containing request/response data
:return: Formatted error message
"""
return f"Cannot {method_name} user {self.email}: "\
+ f"{context.request.get_pretty_response()}"
[docs]
def create(self, context: CloudContext, user: User) -> None:
"""Create this user in the cloud.
:param context: Cloud context for API configuration
:param user: User providing authentication for the creation
:raises RuntimeError: If user creation fails
:raises AssertionError: If trying to create an admin user
"""
method_name = _getframe().f_code.co_name
assert not self.is_admin, f"Can't {method_name} user user"
context.request = self.get_create_request(context, user)
body = {key: self.__dict__[key] for key in
[User.FIRST_NAME_KEY,
User.LAST_NAME_KEY,
User.EMAIL_KEY,
User.PASSWORD_KEY,
User.PLATFORM_IDS_KEY,
User.IS_ADMIN_KEY,
User.COMPANY_ID_KEY,
User.PRIORITY_KEY] if self.__dict__[key] is not None}
body["priority"] = 2
context.request.body = body
send_complete_request(context)
if int(context.request.response.status_code) != 200:
raise RuntimeError(self._get_error_message(method_name, context))
response = context.request.response.json()
self.update_from_dict(response)
[docs]
def read(self, context: CloudContext, user=None) -> dict[str]:
"""Read this user's information from the cloud.
:param context: Cloud context for API configuration
:param user: User providing authentication (defaults to self)
:return: Dictionary with user data from response
:raises RuntimeError: If reading user data fails
:raises AssertionError: If the user has not been created yet
"""
if user is None:
user = self
method_name = _getframe().f_code.co_name
assert self.user_id != -1, f"User was never created, can't {method_name} it"
context.request = self.get_read_request(context, user)
send_complete_request(context)
if int(context.request.response.status_code) != 200:
raise RuntimeError(self._get_error_message(method_name, context))
response = context.request.response.json()
self.update_from_dict(response)
return response
[docs]
def update(self, context: CloudContext, user, update_dict: dict[str]) -> None:
"""Update this user's information in the cloud.
:param context: Cloud context for API configuration
:param user: User providing authentication (defaults to self)
:param update_dict: Dictionary with fields to update
:raises RuntimeError: If updating user data fails
:raises AssertionError: If the user has not been created yet
"""
if user is None:
user = self
method_name = _getframe().f_code.co_name
assert self.user_id != -1, f"User was never created, can't {method_name} it"
context.request = self.get_update_request(context, user)
context.body = update_dict
send_complete_request(context)
if int(context.request.response.status_code) != 200:
raise RuntimeError(self._get_error_message(method_name, context))
[docs]
def delete(self, context: CloudContext, user: User) -> None:
"""Delete this user from the cloud.
:param context: Cloud context for API configuration
:param user: User providing authentication
:raises RuntimeError: If deleting the user fails
:raises AssertionError: If the user has not been created or is an admin
"""
method_name = _getframe().f_code.co_name
assert self.user_id != -1, f"User was never created, can't {method_name} it"
assert not self.is_admin, f"Can only {method_name} user as an admin"
context.request = self.get_delete_request(context, user)
send_complete_request(context)
if int(context.request.response.status_code) != 200:
raise RuntimeError(self._get_error_message(method_name, context))
[docs]
def log(self, context: CloudContext) -> None:
"""Login the user and obtain a login token.
:param context: Cloud context for API configuration
:raises AssertionError: If the user is not an admin and hasn't been created
"""
method_name = _getframe().f_code.co_name
assert self.is_admin or self.user_id != -1, f"User was never created, can't {method_name} it"
self.login_token.create(context, self)
[docs]
def create_job_token(self, context: CloudContext, token_dict: dict[str]) -> None:
"""Create a job token for this user.
:param context: Cloud context for API configuration
:param token_dict: Dictionary with token parameters
:raises AssertionError: If the user hasn't been created
"""
method_name = _getframe().f_code.co_name
assert self.user_id != -1, f"User was never created, can't {method_name} it"
token = JobToken(**token_dict)
token.create(context, self)
self.job_tokens.append(token)
[docs]
def create_api_key(self, context: CloudContext, api_key_dict: dict[str]) -> None:
"""Create an API key for this user.
:param context: Cloud context for API configuration
:param api_key_dict: Dictionary with API key parameters
:raises AssertionError: If the user hasn't been created
"""
method_name = _getframe().f_code.co_name
assert self.user_id != -1, f"User was never created, can't {method_name} it"
api_key = ApiKey(**api_key_dict)
api_key.create(context, self)
self.api_key = api_key
@classmethod
[docs]
def list_all(cls, context: CloudContext, admin: User) -> list[User]:
"""List all users in the cloud.
:param context: Cloud context for API configuration
:param admin: Admin user providing authentication
:return: List of User objects
:raises RuntimeError: If listing users fails
:raises AssertionError: If the authenticating user is not an admin
"""
assert admin.is_admin, "Can only list users as an admin"
context.request = User.get_list_all_request(context, admin)
send_complete_request(context)
if int(context.request.response.status_code) != 200:
raise RuntimeError(
f"Cannot list users: {context.request.get_pretty_response()}")
return [User(**elem) for elem in context.request.response.json()]
def __repr__(self) -> str:
return self.email