Source code for steps.api.cloud_objects.cloud_job

from __future__ import annotations

from uuid import uuid4

from ..utils import Request, Api, POST, GET
from ..given import send_complete_request

from .cloud_base import ACloudObject, TokenType

from typing import TYPE_CHECKING
if TYPE_CHECKING:
    from ..utils import CloudContext
    from .cloud_user import User


[docs] class Job(ACloudObject): """Job class representing a computational job in the cloud system. This class handles job creation, monitoring, and management operations. :param platform_name: Name of the platform to run the job on :param job_name: Name of the job :param payload: Job payload containing command, circuit, parameters, etc. :param pcvl_version: Version of the PCVL library to use :param process_id: Unique ID for the process (defaults to a new UUID) :param job_id: Job ID (empty for new jobs) :param _: Additional attributes to set on the job """
[docs] JOB_NAME_KEY = "job_name"
[docs] PLATFORM_NAME_KEY = "platform_name"
[docs] PAYLOAD_KEY = "payload"
[docs] PCVL_VERSION_KEY = "pcvl_version"
[docs] PROCESS_ID_KEY = "process_id"
[docs] DEFAULT_PAYLOAD = {"command": "probs", "circuit": ":PCVL:zip:" + "eJyzCnAO87FydM4sSi7NLLFydfTM9K9wdI7MSg52DsyO9AkNCtWu9DANqMj3" + "cg50hAPP9GwvBM+xEKgWwXPxRFNrEegYlu/jDNTj7mzoGhZQnGEWYkF1ewCY7jxM", "input_state": ":PCVL:BasicState:|1,1>", "parameters": {"min_detected_photons": 2}, "max_shots": 10000, "job_context": None}
@classmethod
[docs] def get_object_name(cls) -> str: return "job"
@classmethod
[docs] def get_endpoint(cls) -> str: return "/api/jobs"
@classmethod
[docs] def get_api(cls, context: CloudContext) -> Api: return context.apis["cloud"]
@classmethod
[docs] def get_token_method(cls) -> str: return TokenType.Job
# Default platform name constant matches Platform.DEFAULT_NAME
[docs] DEFAULT_PLATFORM_NAME = "sim:qa"
def __init__(self, platform_name: str = DEFAULT_PLATFORM_NAME, job_name: str = "qa_job", payload: dict[str] = DEFAULT_PAYLOAD, pcvl_version: str = "0.12.0", process_id: str = str(uuid4()), job_id: str = "", **_):
[docs] self.platform_name: str = platform_name
[docs] self.job_name: str = job_name
[docs] self.payload: dict[str] = payload
[docs] self.pcvl_version: str = pcvl_version
[docs] self.process_id: str = process_id
[docs] self.job_id = job_id
[docs] def create(self, context: CloudContext, user, create_dict: dict = {}) -> None: """Create this job in the cloud. :param context: Cloud context for API configuration :param user: User providing authentication :param create_dict: Additional parameters for job creation :raises RuntimeError: If job creation fails """ context.request = self.get_create_request(context, user) context.request.body = {key: self.__dict__[key] for key in [Job.JOB_NAME_KEY, Job.PLATFORM_NAME_KEY, Job.PAYLOAD_KEY, Job.PCVL_VERSION_KEY, Job.PROCESS_ID_KEY ] if self.__dict__[key] is not None} context.request.body.update(create_dict) send_complete_request(context) if int(context.request.response.status_code) != 200: raise RuntimeError( f"Cannot create job {self.job_name}: {context.request.get_pretty_response()}") response = context.request.response.json() self.update_from_dict(response)
[docs] def read(self, context: CloudContext, user) -> dict[str]: """Read this job's information from the cloud. :param context: Cloud context for API configuration :param user: User providing authentication :return: Dictionary with job data from response :raises RuntimeError: If reading job data fails """ context.request = self.get_read_request(context, user) context.request.set_authorization_header(user.get_token(TokenType.Login)) send_complete_request(context) if int(context.request.response.status_code) != 200: raise RuntimeError( f"Cannot get job {self.job_name}: {context.request.get_pretty_response()}") response = context.request.response.json() self.update_from_dict(response) return response
[docs] def cancel(self, context: CloudContext, user) -> dict[str]: """Cancel this job. :param context: Cloud context for API configuration :param user: User providing authentication :raises RuntimeError: If canceling the job fails """ context.request = self.get_endpoint_id_request_with_extension(context, user, "cancel") context.request.set_method(POST) send_complete_request(context) if int(context.request.response.status_code) != 200: raise RuntimeError( f"Cannot cancel job {self.job_name}: {context.request.get_pretty_response()}")
def _get_info(self, context: CloudContext, user, info_type: str) -> dict[str]: """Helper method to get job information (status or results). :param context: Cloud context for API configuration :param user: User providing authentication :param info_type: Type of information to retrieve ("status" or "result") :return: Dictionary with job information :raises AssertionError: If info_type is not valid :raises RuntimeError: If retrieving job information fails """ assert info_type in ["status", "result"] context.request = self.get_endpoint_id_request_with_extension(context, user, info_type) context.request.set_method(GET) send_complete_request(context) if int(context.request.response.status_code) != 200: raise RuntimeError( f"Cannot get job {self.job_name}: {context.request.get_pretty_response()}") response = context.request.response.json() self.update_from_dict(response) return response
[docs] def get_status(self, context: CloudContext, user) -> dict[str]: """Get the current status of this job. :param context: Cloud context for API configuration :param user: User providing authentication :return: Dictionary with job status information :raises RuntimeError: If retrieving job status fails """ return self._get_info(context, user, "status")
[docs] def get_results(self, context: CloudContext, user) -> dict[str]: """Get the results of this job. :param context: Cloud context for API configuration :param user: User providing authentication :return: Dictionary with job results :raises RuntimeError: If retrieving job results fails """ return self._get_info(context, user, "result")
@classmethod
[docs] def cancel_many(cls, context: CloudContext, jobs: list, admin: User) -> None: """Cancel multiple jobs at once. :param context: Cloud context for API configuration :param jobs: List of Job objects to cancel :param admin: Admin user providing authentication :raises AssertionError: If the user is not an admin :raises RuntimeError: If canceling the jobs fails """ assert admin.is_admin, "Can cancel jobs only as an admin" context.request = Request(api=cls.get_api(context)) context.request.set_authorization_header(admin.get_token(TokenType.Login)) context.request.set_json_content_type() context.request.set_method(POST) context.request.set_url(cls.get_endpoint()+"/cancel") job_ids = [job.job_id for job in jobs] context.request.body = {"job_ids": job_ids} send_complete_request(context) if int(context.request.response.status_code) != 200: raise RuntimeError( f"Cannot cancel jobs {job_ids}: {context.request.get_pretty_response()}")
@classmethod
[docs] def list_all(cls, context: CloudContext, admin: User) -> list[Job]: """List all jobs in the cloud. :param context: Cloud context for API configuration :param admin: Admin user providing authentication :return: List of Job objects :raises AssertionError: If the user is not an admin :raises RuntimeError: If listing jobs fails """ assert admin.is_admin, "Can only list jobs as an admin" context.request = Job.get_list_all_request(context, admin) context.request.set_param("size", 9999) context.request.set_url(cls.get_endpoint()+"/page") # TODO: Should not be page send_complete_request(context) if int(context.request.response.status_code) != 200: raise RuntimeError( f"Cannot list jobs: {context.request.get_pretty_response()}") return [Job(**elem) for elem in context.request.response.json()["content"]]