Source code for steps.api.cloud_objects.cloud_platform

from __future__ import annotations

from sys import _getframe
from uuid import uuid4

from ..utils import Request, Api, POST, GET
from ..given import send_complete_request

from .cloud_base import ACloudObject, TokenType
from .cloud_job import Job

from typing import TYPE_CHECKING
if TYPE_CHECKING:
    from ..utils import CloudContext
    from .cloud_user import User


[docs] class Platform(ACloudObject): """Platform class representing a worker in the cloud system. This class handles worker registration, job management, and platform permissions. :param name: Name of the platform :param type: Type of platform (default: "simulator") :param platform_id: Platform ID (None for new platforms) :param worker_id: Worker ID (defaults to a new UUID) :param specs: Platform specifications :param kwargs: Additional attributes to set on the platform :raises AssertionError: If Platform.ADMIN is None or not an admin """
[docs] ADMIN = None
[docs] REGISTER_ENDPOINT = "/admin/platforms/register"
[docs] JOB_ENDPOINT = "/admin/job"
[docs] ASSIGN_ENDPOINT = "/api/platform-permissions"
@classmethod
[docs] def get_object_name(cls) -> str: return "platform"
@classmethod
[docs] def get_endpoint(cls) -> str: return "/api/platforms"
@classmethod
[docs] def get_api(cls, context: CloudContext) -> Api: return context.apis["cloud"]
@classmethod
[docs] def get_token_method(cls) -> str: return TokenType.Login
[docs] PLATFORM_NAME_KEY = "platform_name"
# Define the constant directly to avoid circular imports
[docs] DEFAULT_NAME = "sim:qa"
def __init__(self, name: str = "", type: str = "simulator", platform_id: str = None, worker_id: str = str(uuid4()), specs: dict[str] = {"available_commands": ["probs"]}, **kwargs): assert self.ADMIN is not None assert Platform.ADMIN.is_admin, "Can handle platform only as an admin"
[docs] self.name = name
[docs] self.type = type
[docs] self.worker_id: str = worker_id
[docs] self.platform_id: str = kwargs.get("id", platform_id)
[docs] self.specs = specs
[docs] def register(self, context: CloudContext, register_dict: dict[str] = None) -> None: """Register this platform in the cloud. :param context: Cloud context for API configuration :param register_dict: Dictionary with registration parameters (optional) :raises RuntimeError: If platform registration fails """ if register_dict is None: register_dict = { "description": f"QA platform test: {self.name}", "name": self.name, "type": self.type, "worker_id": self.worker_id, "specs": {"available_commands": ["probs"]} } context.request = Request(api=self.get_api(context)) context.request.set_authorization_header(self.ADMIN.get_token(self.get_token_method())) context.request.set_json_content_type() context.request.set_method(POST) context.request.set_url(Platform.REGISTER_ENDPOINT) context.request.body = register_dict send_complete_request(context) if int(context.request.response.status_code) != 200: raise RuntimeError( f"Cannot register platform : {context.request.get_pretty_response()}")
def _get_error_message(self, method_name: str, context: CloudContext) -> str: """Generate an error message for failed API operations. :param method_name: Name of the method that failed :param context: Cloud context containing request/response data :return: Formatted error message """ return f"Cannot {method_name} platform {self.name if self.name else self.platform_id}: "\ + f"{context.request.get_pretty_response()}"
[docs] def read(self, context: CloudContext) -> dict[str]: """Read this platform's information from the cloud. :param context: Cloud context for API configuration :return: Dictionary with platform data from response :raises RuntimeError: If reading platform data fails """ name_or_id_key = "platform_name_or_id" context.request = self.get_read_request(context, self.ADMIN) context.request.set_url(Platform.get_endpoint()+"/{"+name_or_id_key+"}", {name_or_id_key: self.name}) send_complete_request(context) if int(context.request.response.status_code) != 200: raise RuntimeError( f"Cannot get platform {self.name}: {context.request.get_pretty_response()}") response = context.request.response.json() self.update_from_dict(response) return response
[docs] def update(self, context: CloudContext, update_dict: dict) -> None: """Update this platform's information in the cloud. :param context: Cloud context for API configuration :param update_dict: Dictionary with fields to update :raises RuntimeError: If updating platform data fails """ method_name = _getframe().f_code.co_name context.request = self.get_update_request(context, self.ADMIN) context.request.body = update_dict send_complete_request(context) if int(context.request.response.status_code) != 200: raise RuntimeError(self._get_error_message(method_name, context))
[docs] def delete(self, context: CloudContext) -> None: """Delete this platform from the cloud. :param context: Cloud context for API configuration :raises RuntimeError: If deleting the platform fails """ method_name = _getframe().f_code.co_name context.request = self.get_delete_request(context, self.ADMIN) send_complete_request(context) if int(context.request.response.status_code) != 200: raise RuntimeError(self._get_error_message(method_name, context))
[docs] def fetch_job(self, context: CloudContext) -> Job: """Fetch a job for this platform to process. :param context: Cloud context for API configuration :return: Job object for the fetched job :raises RuntimeError: If fetching a job fails """ method_name = _getframe().f_code.co_name context.request = Request(api=self.get_api(context)) context.request.set_authorization_header(self.ADMIN.get_token(TokenType.API_Key)) context.request.set_json_content_type() context.request.set_method(GET) context.request.set_url(Platform.JOB_ENDPOINT+"/fetch/{platform_name}", {"platform_name": self.name}) send_complete_request(context) if int(context.request.response.status_code) != 200: raise RuntimeError(self._get_error_message(method_name, context)) return Job(context.request.response.json())
def _advance_job(self, context: CloudContext, job, advance_endpoint: str) -> Request: request = Request(api=self.get_api(context)) request.set_authorization_header(self.ADMIN.get_token(TokenType.API_Key)) request.set_json_content_type() request.set_method(POST) # Use string constant "job_id" instead of importing Job class job_id_key = "job_id" request.set_url( Platform.JOB_ENDPOINT+"/"+advance_endpoint+"/{"+job_id_key+"}", {job_id_key: job.job_id} ) return request
[docs] def run_job(self, context: CloudContext, job) -> None: method_name = _getframe().f_code.co_name.split("_")[0] context.request = self._advance_job(context, job, method_name) send_complete_request(context) if int(context.request.response.status_code) != 200: raise RuntimeError( f"Cannot {method_name} job: {context.request.get_pretty_response()}")
[docs] def progress_job(self, context: CloudContext, job, progress_dict: dict[str]) -> None: method_name = _getframe().f_code.co_name.split("_")[0] context.request = self._advance_job(context, job, method_name) context.request.body = progress_dict send_complete_request(context) if int(context.request.response.status_code) != 200: raise RuntimeError( f"Cannot {method_name} job: {context.request.get_pretty_response()}")
[docs] def finish_job(self, context: CloudContext, job, finish_dict: dict[str]) -> None: method_name = _getframe().f_code.co_name.split("_")[0] context.request = self._advance_job(context, job, method_name) context.request.body = finish_dict send_complete_request(context) if int(context.request.response.status_code) != 200: raise RuntimeError( f"Cannot {method_name} job: {context.request.get_pretty_response()}")
def _assign_object(self, context: CloudContext, object_to_assign) -> None: context.request = Request(api=self.get_api(context)) context.request.set_authorization_header(self.ADMIN.get_token(TokenType.Login)) context.request.set_json_content_type() context.request.set_method(POST) context.request.set_url(Platform.ASSIGN_ENDPOINT+"/"+object_to_assign.get_object_name() + "s/{"+object_to_assign.get_object_id_key()+"}", {object_to_assign.get_object_id_key(): object_to_assign.get_object_id()}) context.request.body = {"platforms": [{"platform_id": self.get_object_id()}]} send_complete_request(context) if int(context.request.response.status_code) != 200: raise RuntimeError( f"Can assign {object_to_assign.get_object_id_key()} " "to platform: {context.request.get_pretty_response()}")
[docs] def assign_user(self, context: CloudContext, user: User) -> None: context.request = self._assign_object(context, user)
[docs] def assign_token(self, context: CloudContext, job_token) -> None: context.request = self._assign_object(context, job_token)
[docs] def assign_company(self, context: CloudContext, company_id: int) -> None: context.request = self.get_endpoint_id_request_with_extension(context, self.ADMIN, "company") context.request.set_method(POST) context.request.body = {"object_ids": [company_id], "price_per_shot": 0} send_complete_request(context) if int(context.request.response.status_code) != 200: raise RuntimeError( f"Can assign company to platform: {context.request.get_pretty_response()}")
@classmethod
[docs] def list_all(cls, context: CloudContext) -> list[Platform]: """List all platforms in the cloud. :param context: Cloud context for API configuration :return: List of Platform objects :raises AssertionError: If ADMIN is not an admin :raises RuntimeError: If listing platforms fails """ assert cls.ADMIN.is_admin, "Can list platforms only as an admin" context.request = Platform.get_list_all_request(context, cls.ADMIN) send_complete_request(context) if int(context.request.response.status_code) != 200: raise RuntimeError( f"Cannot list platforms: {context.request.get_pretty_response()}") return [Platform(**elem) for elem in context.request.response.json()]