Source code for steps.api.given_cloud

from behave import given

from .utils import Context, try_read_from_variable
from .cloud import Platform, Job, JobToken
from .given import get_table, prepare_request

# use_step_matcher("re") # cucumberautocomplete is not working :'(


@given('I am preparing a request')
[docs] def prepare_cloud_request(context): """Prepare a new request for the cloud API. Shorthand for creating a request to the cloud API type. :param context: The Behave context """ api_type = "cloud" prepare_request(context, api_type)
# User @given("I am logging as user {user_index}")
[docs] def log_as_user(context: Context, user_index: str) -> None: """Log in as a specific test user. :param context: The Behave context :param user_index: Index of the user in the users list """ context.cloud.users[int(user_index)].log(context)
@given("I am logging as admin")
[docs] def log_as_admin(context: Context) -> None: """Log in as the admin user. :param context: The Behave context """ context.cloud.admin.log(context)
@given("I am getting the user {user_index}")
[docs] def get_user(context: Context, user_index: str): """Get information about a specific user. :param context: The Behave context :param user_index: Index of the user in the users list :return: User data from the response """ return context.cloud.users[int(user_index)].read(context, context.cloud.admin)
@given("I am creating a new job token called {token_label} for user {user_index}")
[docs] def create_job_token(context: Context, token_label: str, user_index: str) -> None: """Create a new job token for a specific user. :param context: The Behave context :param token_label: Label for the new token :param user_index: Index of the user to create the token for """ context.cloud.users[int(user_index)].create_job_token(context, {'label': token_label})
# PLATFORMS @given('I am deleting a platform with "platform_id" as {platform_id_method}')
[docs] def delete_platform(context: Context, platform_id_method: str) -> None: """Delete a platform by its ID. :param context: The Behave context :param platform_id_method: Method to get the platform ID """ platform_id = try_read_from_variable(platform_id_method, context) Platform(platform_id=platform_id).delete(context)
@given('I am registering a platform with "platform_name" as {platform_name_method}')
[docs] def register_platform_simplify(context: Context, platform_name_method) -> None: platform_name = try_read_from_variable(platform_name_method, context) platform = Platform(name=platform_name) platform.register(context)
@given('I am getting info of the platform with "platform_name_or_id" as {platform_name_or_id_method}')
[docs] def get_platform(context: Context, platform_name_or_id_method: str) -> Platform: platform_name = try_read_from_variable(platform_name_or_id_method, context) platform = Platform(name=platform_name) platform.read(context) return platform
@given("I am registering a platform setting the payload as follow")
[docs] def register_platform(context: Context) -> None: registration_dict = get_table(context) platform = Platform(**registration_dict) platform.register(context, registration_dict)
@given('I am fetching platform job with "platform_name" as {platform_name_method}')
[docs] def fetch_platform_job(context: Context, platform_name_method: dict = None) -> None: platform_name = try_read_from_variable(platform_name_method, context) platform = Platform(name=platform_name) platform.fetch_job(context)
@given("I am running the job with \"job_id\" as {job_id_method}")
[docs] def run_platform_job(context: Context, job_id_method: dict = None) -> None: job_id = try_read_from_variable(job_id_method, context) Platform().run_job(context, Job(job_id=job_id))
@given("I am making progress for the job with \"job_id\" as {job_id_method} setting the payload as follow")
[docs] def progress_job(context: Context, job_id_method: dict = None) -> None: job_id = try_read_from_variable(job_id_method, context) Platform().progress_job(context, Job(job_id=job_id), get_table(context))
@given("I am finishing the job with \"job_id\" as {job_id_method} setting the payload as follow")
[docs] def finish_job(context: Context, job_id_method: dict = None) -> None: job_id = try_read_from_variable(job_id_method, context) Platform().finish_job(context, Job(job_id=job_id), get_table(context))
@given('I am assign user {user_index} to platform with "platform_id" as {platform_id_method}')
[docs] def assign_user_to_platform(context: Context, user_index: str, platform_id_method: str): platform_id = try_read_from_variable(platform_id_method, context) Platform(platform_id=platform_id).assign_user(context, context.cloud.users[int(user_index)])
@given('I am assigning job token with "token_id" as {job_token_id_method} to platform with "platform_id" as {platform_id_method}') # noqa: E501
[docs] def assign_token_to_platform(context: Context, job_token_id_method: str, platform_id_method: str) -> None: token_id = try_read_from_variable(job_token_id_method, context) platform_id = try_read_from_variable(platform_id_method, context) Platform(platform_id=platform_id).assign_user(context, JobToken(token_id=token_id))
@given('I am assign user {user_index} and its job token to platform with "platform_name" as {platform_name_method}')
[docs] def assign_user_and_token_to_platform(context: Context, user_index: str, platform_name_method: str): user_index = int(user_index) platform = get_platform(context, platform_name_method) platform.assign_user(context, context.cloud.users[user_index]) platform.assign_token(context, context.cloud.users[user_index].job_tokens[0])
# JOB @given('I am creating a job from user {user_index} on default platform')
[docs] def create_job_simplified(context: Context, user_index: str): Job().create(context, context.cloud.users[int(user_index)])
@given('I am creating a job from user {user_index} with "platform_name" as {platform_name_method}')
[docs] def create_job(context: Context, user_index: str, platform_name_method: str) -> None: platform_name = try_read_from_variable(platform_name_method, context) Job().create(context, context.cloud.users[int(user_index)], {'platform_name': platform_name})
@given("I am creating a job from user {user_index} setting the payload as follow")
[docs] def create_job_payload(context: Context, user_index: str) -> None: Job().create(context, context.cloud.users[int(user_index)], get_table(context))
@given("I am getting the status of the job with \"job_id\" as {job_id_method} from user {user_index}")
[docs] def get_job_status(context: Context, job_id_method: str, user_index: str) -> None: job_id = try_read_from_variable(job_id_method, context) Job(job_id=job_id).get_status(context, context.cloud.users[int(user_index)])
@given("I am getting the result of the job with \"job_id\" as {job_id_method} from user {user_index}")
[docs] def get_job_result(context: Context, job_id_method: str, user_index: str) -> None: job_id = try_read_from_variable(job_id_method, context) Job(job_id=job_id).get_results(context, context.cloud.users[int(user_index)])
@given("I am getting the info of the job with \"job_id\" as {job_id_method}")
[docs] def get_job_info(context: Context, job_id_method: str) -> None: job_id = try_read_from_variable(job_id_method, context) Job(job_id=job_id).read(context, context.cloud.admin)
@given("I am canceling the job with \"job_id\" as {job_id_method} from user {user_index}")
[docs] def cancel_job(context: Context, job_id_method: str, user_index: str) -> None: job_id = try_read_from_variable(job_id_method, context) Job(job_id=job_id).cancel(context, context.cloud.users[int(user_index)])
@given('I skip this scenario if LIGHT_VERSION is "{state}"')
[docs] def step_impl(context, state): if state == "on" and not context.light_version: context.scenario.skip("Skipping because LIGHT_VERSION is on") elif state == "off" and context.light_version: context.scenario.skip("Skipping because LIGHT_VERSION is off")