Source code for steps.api.cloud_objects.cloud_platform
from __future__ import annotations
from sys import _getframe
from uuid import uuid4
from ..utils import Request, Api, POST, GET
from ..given import send_complete_request
from .cloud_base import ACloudObject, TokenType
from .cloud_job import Job
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from ..utils import CloudContext
from .cloud_user import User
[docs]
class Platform(ACloudObject):
"""Platform class representing a worker in the cloud system.
This class handles worker registration, job management, and platform permissions.
:param name: Name of the platform
:param type: Type of platform (default: "simulator")
:param platform_id: Platform ID (None for new platforms)
:param worker_id: Worker ID (defaults to a new UUID)
:param specs: Platform specifications
:param kwargs: Additional attributes to set on the platform
:raises AssertionError: If Platform.ADMIN is None or not an admin
"""
@classmethod
@classmethod
@classmethod
@classmethod
# Define the constant directly to avoid circular imports
def __init__(self,
name: str = "",
type: str = "simulator",
platform_id: str = None,
worker_id: str = str(uuid4()),
specs: dict[str] = {"available_commands": ["probs"]},
**kwargs):
assert self.ADMIN is not None
assert Platform.ADMIN.is_admin, "Can handle platform only as an admin"
[docs]
def register(self, context: CloudContext, register_dict: dict[str] = None) -> None:
"""Register this platform in the cloud.
:param context: Cloud context for API configuration
:param register_dict: Dictionary with registration parameters (optional)
:raises RuntimeError: If platform registration fails
"""
if register_dict is None:
register_dict = {
"description": f"QA platform test: {self.name}",
"name": self.name,
"type": self.type,
"worker_id": self.worker_id,
"specs": {"available_commands": ["probs"]}
}
context.request = Request(api=self.get_api(context))
context.request.set_authorization_header(self.ADMIN.get_token(self.get_token_method()))
context.request.set_json_content_type()
context.request.set_method(POST)
context.request.set_url(Platform.REGISTER_ENDPOINT)
context.request.body = register_dict
send_complete_request(context)
if int(context.request.response.status_code) != 200:
raise RuntimeError(
f"Cannot register platform : {context.request.get_pretty_response()}")
def _get_error_message(self, method_name: str, context: CloudContext) -> str:
"""Generate an error message for failed API operations.
:param method_name: Name of the method that failed
:param context: Cloud context containing request/response data
:return: Formatted error message
"""
return f"Cannot {method_name} platform {self.name if self.name else self.platform_id}: "\
+ f"{context.request.get_pretty_response()}"
[docs]
def read(self, context: CloudContext) -> dict[str]:
"""Read this platform's information from the cloud.
:param context: Cloud context for API configuration
:return: Dictionary with platform data from response
:raises RuntimeError: If reading platform data fails
"""
name_or_id_key = "platform_name_or_id"
context.request = self.get_read_request(context, self.ADMIN)
context.request.set_url(Platform.get_endpoint()+"/{"+name_or_id_key+"}", {name_or_id_key: self.name})
send_complete_request(context)
if int(context.request.response.status_code) != 200:
raise RuntimeError(
f"Cannot get platform {self.name}: {context.request.get_pretty_response()}")
response = context.request.response.json()
self.update_from_dict(response)
return response
[docs]
def update(self, context: CloudContext, update_dict: dict) -> None:
"""Update this platform's information in the cloud.
:param context: Cloud context for API configuration
:param update_dict: Dictionary with fields to update
:raises RuntimeError: If updating platform data fails
"""
method_name = _getframe().f_code.co_name
context.request = self.get_update_request(context, self.ADMIN)
context.request.body = update_dict
send_complete_request(context)
if int(context.request.response.status_code) != 200:
raise RuntimeError(self._get_error_message(method_name, context))
[docs]
def delete(self, context: CloudContext) -> None:
"""Delete this platform from the cloud.
:param context: Cloud context for API configuration
:raises RuntimeError: If deleting the platform fails
"""
method_name = _getframe().f_code.co_name
context.request = self.get_delete_request(context, self.ADMIN)
send_complete_request(context)
if int(context.request.response.status_code) != 200:
raise RuntimeError(self._get_error_message(method_name, context))
[docs]
def fetch_job(self, context: CloudContext) -> Job:
"""Fetch a job for this platform to process.
:param context: Cloud context for API configuration
:return: Job object for the fetched job
:raises RuntimeError: If fetching a job fails
"""
method_name = _getframe().f_code.co_name
context.request = Request(api=self.get_api(context))
context.request.set_authorization_header(self.ADMIN.get_token(TokenType.API_Key))
context.request.set_json_content_type()
context.request.set_method(GET)
context.request.set_url(Platform.JOB_ENDPOINT+"/fetch/{platform_name}", {"platform_name": self.name})
send_complete_request(context)
if int(context.request.response.status_code) != 200:
raise RuntimeError(self._get_error_message(method_name, context))
return Job(context.request.response.json())
def _advance_job(self, context: CloudContext, job, advance_endpoint: str) -> Request:
request = Request(api=self.get_api(context))
request.set_authorization_header(self.ADMIN.get_token(TokenType.API_Key))
request.set_json_content_type()
request.set_method(POST)
# Use string constant "job_id" instead of importing Job class
job_id_key = "job_id"
request.set_url(
Platform.JOB_ENDPOINT+"/"+advance_endpoint+"/{"+job_id_key+"}",
{job_id_key: job.job_id}
)
return request
[docs]
def run_job(self, context: CloudContext, job) -> None:
method_name = _getframe().f_code.co_name.split("_")[0]
context.request = self._advance_job(context, job, method_name)
send_complete_request(context)
if int(context.request.response.status_code) != 200:
raise RuntimeError(
f"Cannot {method_name} job: {context.request.get_pretty_response()}")
[docs]
def progress_job(self, context: CloudContext, job, progress_dict: dict[str]) -> None:
method_name = _getframe().f_code.co_name.split("_")[0]
context.request = self._advance_job(context, job, method_name)
context.request.body = progress_dict
send_complete_request(context)
if int(context.request.response.status_code) != 200:
raise RuntimeError(
f"Cannot {method_name} job: {context.request.get_pretty_response()}")
[docs]
def finish_job(self, context: CloudContext, job, finish_dict: dict[str]) -> None:
method_name = _getframe().f_code.co_name.split("_")[0]
context.request = self._advance_job(context, job, method_name)
context.request.body = finish_dict
send_complete_request(context)
if int(context.request.response.status_code) != 200:
raise RuntimeError(
f"Cannot {method_name} job: {context.request.get_pretty_response()}")
def _assign_object(self, context: CloudContext, object_to_assign) -> None:
context.request = Request(api=self.get_api(context))
context.request.set_authorization_header(self.ADMIN.get_token(TokenType.Login))
context.request.set_json_content_type()
context.request.set_method(POST)
context.request.set_url(Platform.ASSIGN_ENDPOINT+"/"+object_to_assign.get_object_name() +
"s/{"+object_to_assign.get_object_id_key()+"}",
{object_to_assign.get_object_id_key(): object_to_assign.get_object_id()})
context.request.body = {"platforms": [{"platform_id": self.get_object_id()}]}
send_complete_request(context)
if int(context.request.response.status_code) != 200:
raise RuntimeError(
f"Can assign {object_to_assign.get_object_id_key()} "
"to platform: {context.request.get_pretty_response()}")
[docs]
def assign_user(self, context: CloudContext, user: User) -> None:
context.request = self._assign_object(context, user)
[docs]
def assign_token(self, context: CloudContext, job_token) -> None:
context.request = self._assign_object(context, job_token)
[docs]
def assign_company(self, context: CloudContext, company_id: int) -> None:
context.request = self.get_endpoint_id_request_with_extension(context, self.ADMIN, "company")
context.request.set_method(POST)
context.request.body = {"object_ids": [company_id], "price_per_shot": 0}
send_complete_request(context)
if int(context.request.response.status_code) != 200:
raise RuntimeError(
f"Can assign company to platform: {context.request.get_pretty_response()}")
@classmethod
[docs]
def list_all(cls, context: CloudContext) -> list[Platform]:
"""List all platforms in the cloud.
:param context: Cloud context for API configuration
:return: List of Platform objects
:raises AssertionError: If ADMIN is not an admin
:raises RuntimeError: If listing platforms fails
"""
assert cls.ADMIN.is_admin, "Can list platforms only as an admin"
context.request = Platform.get_list_all_request(context, cls.ADMIN)
send_complete_request(context)
if int(context.request.response.status_code) != 200:
raise RuntimeError(
f"Cannot list platforms: {context.request.get_pretty_response()}")
return [Platform(**elem) for elem in context.request.response.json()]