from behave import given
from .utils import Context, try_read_from_variable
from .cloud import Platform, Job, JobToken
from .given import get_table, prepare_request
# use_step_matcher("re") # cucumberautocomplete is not working :'(
@given('I am preparing a request')
[docs]
def prepare_cloud_request(context):
"""Prepare a new request for the cloud API.
Shorthand for creating a request to the cloud API type.
:param context: The Behave context
"""
api_type = "cloud"
prepare_request(context, api_type)
# User
@given("I am logging as user {user_index}")
[docs]
def log_as_user(context: Context, user_index: str) -> None:
"""Log in as a specific test user.
:param context: The Behave context
:param user_index: Index of the user in the users list
"""
context.cloud.users[int(user_index)].log(context)
@given("I am logging as admin")
[docs]
def log_as_admin(context: Context) -> None:
"""Log in as the admin user.
:param context: The Behave context
"""
context.cloud.admin.log(context)
@given("I am getting the user {user_index}")
[docs]
def get_user(context: Context, user_index: str):
"""Get information about a specific user.
:param context: The Behave context
:param user_index: Index of the user in the users list
:return: User data from the response
"""
return context.cloud.users[int(user_index)].read(context, context.cloud.admin)
@given("I am creating a new job token called {token_label} for user {user_index}")
[docs]
def create_job_token(context: Context, token_label: str, user_index: str) -> None:
"""Create a new job token for a specific user.
:param context: The Behave context
:param token_label: Label for the new token
:param user_index: Index of the user to create the token for
"""
context.cloud.users[int(user_index)].create_job_token(context, {'label': token_label})
# PLATFORMS
@given('I am deleting a platform with "platform_id" as {platform_id_method}')
@given('I am registering a platform with "platform_name" as {platform_name_method}')
@given('I am getting info of the platform with "platform_name_or_id" as {platform_name_or_id_method}')
@given("I am registering a platform setting the payload as follow")
@given('I am fetching platform job with "platform_name" as {platform_name_method}')
@given("I am running the job with \"job_id\" as {job_id_method}")
@given("I am making progress for the job with \"job_id\" as {job_id_method} setting the payload as follow")
[docs]
def progress_job(context: Context, job_id_method: dict = None) -> None:
job_id = try_read_from_variable(job_id_method, context)
Platform().progress_job(context, Job(job_id=job_id), get_table(context))
@given("I am finishing the job with \"job_id\" as {job_id_method} setting the payload as follow")
[docs]
def finish_job(context: Context, job_id_method: dict = None) -> None:
job_id = try_read_from_variable(job_id_method, context)
Platform().finish_job(context, Job(job_id=job_id), get_table(context))
@given('I am assign user {user_index} to platform with "platform_id" as {platform_id_method}')
@given('I am assigning job token with "token_id" as {job_token_id_method} to platform with "platform_id" as {platform_id_method}') # noqa: E501
@given('I am assign user {user_index} and its job token to platform with "platform_name" as {platform_name_method}')
# JOB
@given('I am creating a job from user {user_index} on default platform')
[docs]
def create_job_simplified(context: Context, user_index: str):
Job().create(context, context.cloud.users[int(user_index)])
@given('I am creating a job from user {user_index} with "platform_name" as {platform_name_method}')
[docs]
def create_job(context: Context, user_index: str, platform_name_method: str) -> None:
platform_name = try_read_from_variable(platform_name_method, context)
Job().create(context, context.cloud.users[int(user_index)], {'platform_name': platform_name})
@given("I am creating a job from user {user_index} setting the payload as follow")
[docs]
def create_job_payload(context: Context, user_index: str) -> None:
Job().create(context, context.cloud.users[int(user_index)], get_table(context))
@given("I am getting the status of the job with \"job_id\" as {job_id_method} from user {user_index}")
[docs]
def get_job_status(context: Context, job_id_method: str, user_index: str) -> None:
job_id = try_read_from_variable(job_id_method, context)
Job(job_id=job_id).get_status(context, context.cloud.users[int(user_index)])
@given("I am getting the result of the job with \"job_id\" as {job_id_method} from user {user_index}")
[docs]
def get_job_result(context: Context, job_id_method: str, user_index: str) -> None:
job_id = try_read_from_variable(job_id_method, context)
Job(job_id=job_id).get_results(context, context.cloud.users[int(user_index)])
@given("I am getting the info of the job with \"job_id\" as {job_id_method}")
[docs]
def get_job_info(context: Context, job_id_method: str) -> None:
job_id = try_read_from_variable(job_id_method, context)
Job(job_id=job_id).read(context, context.cloud.admin)
@given("I am canceling the job with \"job_id\" as {job_id_method} from user {user_index}")
[docs]
def cancel_job(context: Context, job_id_method: str, user_index: str) -> None:
job_id = try_read_from_variable(job_id_method, context)
Job(job_id=job_id).cancel(context, context.cloud.users[int(user_index)])
@given('I skip this scenario if LIGHT_VERSION is "{state}"')
[docs]
def step_impl(context, state):
if state == "on" and not context.light_version:
context.scenario.skip("Skipping because LIGHT_VERSION is on")
elif state == "off" and context.light_version:
context.scenario.skip("Skipping because LIGHT_VERSION is off")