from __future__ import annotations
from uuid import uuid4
from ..utils import Request, Api, POST, GET
from ..given import send_complete_request
from .cloud_base import ACloudObject, TokenType
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from ..utils import CloudContext
from .cloud_user import User
[docs]
class Job(ACloudObject):
"""Job class representing a computational job in the cloud system.
This class handles job creation, monitoring, and management operations.
:param platform_name: Name of the platform to run the job on
:param job_name: Name of the job
:param payload: Job payload containing command, circuit, parameters, etc.
:param pcvl_version: Version of the PCVL library to use
:param process_id: Unique ID for the process (defaults to a new UUID)
:param job_id: Job ID (empty for new jobs)
:param _: Additional attributes to set on the job
"""
[docs]
JOB_NAME_KEY = "job_name"
[docs]
PAYLOAD_KEY = "payload"
[docs]
PCVL_VERSION_KEY = "pcvl_version"
[docs]
PROCESS_ID_KEY = "process_id"
[docs]
DEFAULT_PAYLOAD = {"command": "probs",
"circuit": ":PCVL:zip:"
+ "eJyzCnAO87FydM4sSi7NLLFydfTM9K9wdI7MSg52DsyO9AkNCtWu9DANqMj3"
+ "cg50hAPP9GwvBM+xEKgWwXPxRFNrEegYlu/jDNTj7mzoGhZQnGEWYkF1ewCY7jxM",
"input_state": ":PCVL:BasicState:|1,1>",
"parameters": {"min_detected_photons": 2},
"max_shots": 10000,
"job_context": None}
@classmethod
[docs]
def get_object_name(cls) -> str:
return "job"
@classmethod
[docs]
def get_endpoint(cls) -> str:
return "/api/jobs"
@classmethod
[docs]
def get_api(cls, context: CloudContext) -> Api:
return context.apis["cloud"]
@classmethod
[docs]
def get_token_method(cls) -> str:
return TokenType.Job
# Default platform name constant matches Platform.DEFAULT_NAME
def __init__(self,
platform_name: str = DEFAULT_PLATFORM_NAME,
job_name: str = "qa_job",
payload: dict[str] = DEFAULT_PAYLOAD,
pcvl_version: str = "0.12.0",
process_id: str = str(uuid4()),
job_id: str = "",
**_):
[docs]
self.job_name: str = job_name
[docs]
self.payload: dict[str] = payload
[docs]
self.pcvl_version: str = pcvl_version
[docs]
self.process_id: str = process_id
[docs]
def create(self, context: CloudContext, user, create_dict: dict = {}) -> None:
"""Create this job in the cloud.
:param context: Cloud context for API configuration
:param user: User providing authentication
:param create_dict: Additional parameters for job creation
:raises RuntimeError: If job creation fails
"""
context.request = self.get_create_request(context, user)
context.request.body = {key: self.__dict__[key] for key in
[Job.JOB_NAME_KEY,
Job.PLATFORM_NAME_KEY,
Job.PAYLOAD_KEY,
Job.PCVL_VERSION_KEY,
Job.PROCESS_ID_KEY
] if self.__dict__[key] is not None}
context.request.body.update(create_dict)
send_complete_request(context)
if int(context.request.response.status_code) != 200:
raise RuntimeError(
f"Cannot create job {self.job_name}: {context.request.get_pretty_response()}")
response = context.request.response.json()
self.update_from_dict(response)
[docs]
def read(self, context: CloudContext, user) -> dict[str]:
"""Read this job's information from the cloud.
:param context: Cloud context for API configuration
:param user: User providing authentication
:return: Dictionary with job data from response
:raises RuntimeError: If reading job data fails
"""
context.request = self.get_read_request(context, user)
context.request.set_authorization_header(user.get_token(TokenType.Login))
send_complete_request(context)
if int(context.request.response.status_code) != 200:
raise RuntimeError(
f"Cannot get job {self.job_name}: {context.request.get_pretty_response()}")
response = context.request.response.json()
self.update_from_dict(response)
return response
[docs]
def cancel(self, context: CloudContext, user) -> dict[str]:
"""Cancel this job.
:param context: Cloud context for API configuration
:param user: User providing authentication
:raises RuntimeError: If canceling the job fails
"""
context.request = self.get_endpoint_id_request_with_extension(context, user, "cancel")
context.request.set_method(POST)
send_complete_request(context)
if int(context.request.response.status_code) != 200:
raise RuntimeError(
f"Cannot cancel job {self.job_name}: {context.request.get_pretty_response()}")
def _get_info(self, context: CloudContext, user, info_type: str) -> dict[str]:
"""Helper method to get job information (status or results).
:param context: Cloud context for API configuration
:param user: User providing authentication
:param info_type: Type of information to retrieve ("status" or "result")
:return: Dictionary with job information
:raises AssertionError: If info_type is not valid
:raises RuntimeError: If retrieving job information fails
"""
assert info_type in ["status", "result"]
context.request = self.get_endpoint_id_request_with_extension(context, user, info_type)
context.request.set_method(GET)
send_complete_request(context)
if int(context.request.response.status_code) != 200:
raise RuntimeError(
f"Cannot get job {self.job_name}: {context.request.get_pretty_response()}")
response = context.request.response.json()
self.update_from_dict(response)
return response
[docs]
def get_status(self, context: CloudContext, user) -> dict[str]:
"""Get the current status of this job.
:param context: Cloud context for API configuration
:param user: User providing authentication
:return: Dictionary with job status information
:raises RuntimeError: If retrieving job status fails
"""
return self._get_info(context, user, "status")
[docs]
def get_results(self, context: CloudContext, user) -> dict[str]:
"""Get the results of this job.
:param context: Cloud context for API configuration
:param user: User providing authentication
:return: Dictionary with job results
:raises RuntimeError: If retrieving job results fails
"""
return self._get_info(context, user, "result")
@classmethod
[docs]
def cancel_many(cls, context: CloudContext, jobs: list, admin: User) -> None:
"""Cancel multiple jobs at once.
:param context: Cloud context for API configuration
:param jobs: List of Job objects to cancel
:param admin: Admin user providing authentication
:raises AssertionError: If the user is not an admin
:raises RuntimeError: If canceling the jobs fails
"""
assert admin.is_admin, "Can cancel jobs only as an admin"
context.request = Request(api=cls.get_api(context))
context.request.set_authorization_header(admin.get_token(TokenType.Login))
context.request.set_json_content_type()
context.request.set_method(POST)
context.request.set_url(cls.get_endpoint()+"/cancel")
job_ids = [job.job_id for job in jobs]
context.request.body = {"job_ids": job_ids}
send_complete_request(context)
if int(context.request.response.status_code) != 200:
raise RuntimeError(
f"Cannot cancel jobs {job_ids}: {context.request.get_pretty_response()}")
@classmethod
[docs]
def list_all(cls, context: CloudContext, admin: User) -> list[Job]:
"""List all jobs in the cloud.
:param context: Cloud context for API configuration
:param admin: Admin user providing authentication
:return: List of Job objects
:raises AssertionError: If the user is not an admin
:raises RuntimeError: If listing jobs fails
"""
assert admin.is_admin, "Can only list jobs as an admin"
context.request = Job.get_list_all_request(context, admin)
context.request.set_param("size", 9999)
context.request.set_url(cls.get_endpoint()+"/page") # TODO: Should not be page
send_complete_request(context)
if int(context.request.response.status_code) != 200:
raise RuntimeError(
f"Cannot list jobs: {context.request.get_pretty_response()}")
return [Job(**elem) for elem in context.request.response.json()["content"]]