from __future__ import annotations
from .utils import CloudContextHandler
from .cloud_objects import User, JobToken, Job, Platform
[docs]
class Cloud:
"""Cloud class for managing cloud resources during testing.
This class provides functionality for initializing test resources,
managing users, platforms and jobs for QA automation.
:param users_path: Path to the CSV file containing user information
:param config: Configuration dictionary with environment variables
:param context: CloudContextHandler instance for making API requests
"""
def __init__(self, users_path: str, config: dict, context: CloudContextHandler):
[docs]
self.environnement = config
[docs]
self.admin = User(
email=self.environnement.get("ADMIN_EMAIL"),
password=self.environnement.get("ADMIN_PASSWORD"),
user_id=-1,
is_admin=True)
self.load_users(users_path, context)
[docs]
def admin_initialise(self, context: CloudContextHandler):
"""Initialize the admin user with necessary tokens.
Logs in as admin, finds admin's user ID, removes existing job tokens,
and creates new tokens for test execution.
:param context: CloudContextHandler instance for making API requests
"""
# log as admin
self.admin.log(context)
# search for admin user id
for user in User.list_all(context, self.admin):
if user.email == self.admin.email:
self.admin.user_id = user.user_id
break
# update admin info
self.admin.read(context, self.admin)
# delete all existing job token admin
for job_token in JobToken.list_all(context, self.admin):
if job_token.user_id == self.admin.user_id:
job_token.delete(context, self.admin)
# create a job token
self.admin.create_job_token(context, {"label": "qa"})
if not context.light_version:
# TODO: need to delete all existing api_key admin
# create an api key
self.admin.create_api_key(context, {"label": "qa"})
else:
self.admin.api_key = self.admin.job_tokens[0]
[docs]
def initialise(self, context: CloudContextHandler):
"""Initialize the complete test environment.
Sets up admin, clears previous test data, creates test platforms and users,
and creates a simple job for testing.
:param context: CloudContextHandler instance for making API requests
"""
self.admin_initialise(context)
Platform.ADMIN = self.admin
self.clear(context)
self.create_platform(context)
self.create_users(context)
job = Job()
job.create(context, self.users[0])
self.users[0]["simple_qa_job_id"] = job.job_id
[docs]
def load_users(self, users_path: str, context: CloudContextHandler):
"""Load user information from a CSV file.
Reads user data from the CSV file, creates User objects with generated
email and password data, and adds them to the users list.
:param users_path: Path to the CSV file containing user information
:param context: CloudContextHandler instance for context information
"""
with open(users_path, 'r') as user_csv:
self.users: list[User] = []
for line in user_csv.readlines():
line = line.split(';')
company_id = None
if not context.light_version:
company_id = self.environnement.get("COMPANY_ID", 1)
user = User(first_name=line[0].strip(), last_name=line[1].strip(), company_id=company_id)
user.generate_data()
self.users.append(user)
[docs]
def delete_users(self, context):
"""Delete test users from the cloud system.
Finds and deletes all users from the cloud system that match
the test users loaded from the CSV file.
:param context: CloudContextHandler instance for making API requests
"""
cloud_users = User.list_all(context, self.admin)
for cloud_user in cloud_users:
for user in self.users:
if cloud_user.email == user.email:
cloud_user.delete(context, self.admin)
break
[docs]
def create_users(self, context):
"""Create test users in the cloud system.
Creates users in the cloud, assigns them to the default platform,
logs them in, creates job tokens, and assigns tokens to the platform.
:param context: CloudContextHandler instance with users_nb_to_create setting
"""
for i in range(context.users_nb_to_create):
self.users[i].create(context, self.admin)
self.default_platform.assign_user(context, self.users[i])
self.users[i].log(context)
self.users[i].create_job_token(context, {"label": "qa"})
self.default_platform.assign_token(context, self.users[i].job_tokens[0])
[docs]
def clear(self, context: CloudContextHandler):
"""Clear existing test resources from the cloud system.
Removes all test users and platforms to ensure a clean environment.
:param context: CloudContextHandler instance for making API requests
"""
self.delete_users(context)
self.delete_platforms(context)