@dataclass
class Job:
"""This class represents a particular job placed
or executed at a particular IVCAP deployment"""
id: str
name: str | None = None
request_content_type: str | None = None
result_content_type: str | None = None
requested_at: datetime.datetime | None = None
started_at: datetime.datetime | None = None
finished_at: datetime.datetime | None = None
policy: URN | None = None
account: URN | None = None
@classmethod
def _from_list_item(cls, item: JobListItem, ivcap: IVCAP):
kwargs = item.to_dict()
return cls(ivcap, **kwargs)
@classmethod
def from_create_job_response(cls, response: Response, service: Service):
if response.status_code == 200:
kwargs = {
"id": response.headers.get("ivcap-job-id"),
"service": service,
"result-content": response.json(),
"result-content-type": response.headers.get("content-type"),
}
return cls(service._ivcap, **kwargs)
elif response.status_code == 202:
j = response.json()
id = j.get("job-id")
return cls(service._ivcap, id=id, service=service)
else:
raise ("not implemented, yet")
def __init__(self, ivcap: IVCAP, **kwargs):
if not ivcap:
raise ValueError("missing 'ivcap' argument")
self._ivcap = ivcap
service = kwargs.pop("service")
if service is not None:
if isinstance(service, Service):
self._service_obj = service
self._service = service.id
else:
self._service_obj = None
self._service = service
self._request_content = None
self._result_content = None
self.__update__(**kwargs)
def __update__(self, **kwargs):
p = [
"id",
"name",
"request-content-type",
"result-content-type",
"requested-at",
"started-at",
"finished-at",
"policy",
"account",
]
hp = ["status", "request-content", "result-content"]
_set_fields(self, p, hp, kwargs)
self._status = JobStatus.from_string(self._status)
if self._service_obj == None and self._service != None:
self._service_obj = Service(id=self._service)
@property
def urn(self) -> str:
return self.id
def status(self, refresh=True) -> JobStatus:
if refresh:
self.refresh()
return self._status
async def status_async(self, refresh=True) -> Awaitable[JobStatus]:
if refresh:
await self.refresh_async()
return self._status
@property
def finished(self):
if self._finished:
return True
self.refresh()
return self._finished
@property
def _finished(self):
return self._status in [JobStatus.SUCCEEDED, JobStatus.FAILED, JobStatus.ERROR]
async def finished_async(self) -> Awaitable[bool]:
if self._status in [JobStatus.SUCCEEDED, JobStatus.FAILED, JobStatus.ERROR]:
return True
await self.refresh_async()
return self._status in [JobStatus.SUCCEEDED, JobStatus.FAILED, JobStatus.ERROR]
async def wait_for_finished_async(
self, max_wait_time: float | None = None, poll_interval: float = 5.0
) -> Awaitable[Job]:
import asyncio
start_time = datetime.now()
while not await self.finished_async():
if max_wait_time is not None:
elapsed = (datetime.now() - start_time).total_seconds()
if elapsed >= max_wait_time:
raise TimeoutError(
f"Job '{self.id}' did not finish within {max_wait_time} seconds"
)
await asyncio.sleep(poll_interval)
return self
@property
def succeeded(self):
return self.finished and self._status == JobStatus.SUCCEEDED
@property
async def succeeded_async(self):
finished = await self.finished_async()
return finished and self._status == JobStatus.SUCCEEDED
@property
def service(self) -> Service:
return self._service
@property
def result(self):
if self._result_content == None:
self.refresh()
return self._result_content
async def result_async(self):
if self._result_content == None:
await self.refresh_async()
return self._result_content
def refresh(self) -> Job:
if self._finished:
return self # no need to refresh
kwargs = self._refresh_top()
r = service_job_read.sync_detailed(**kwargs)
return self._refresh_bottom(r)
async def refresh_async(self) -> Awaitable[Job]:
if self._finished:
return self # no need to refresh
kwargs = self._refresh_top()
r = await service_job_read.asyncio_detailed(**kwargs)
return self._refresh_bottom(r)
def _refresh_top(self):
return {
"client": self._ivcap._client,
"id": self.id,
"service_id": self._service,
"with_request_content": self._request_content == None,
"with_result_content": self._result_content == None,
}
def _refresh_bottom(self, r: Response) -> Job:
if r.status_code >= 300:
return process_error("place_job", r)
kwargs = r.parsed.to_dict()
self.__update__(**kwargs)
return self
def __repr__(self):
status = self._status if self._status else "???"
return f"<Job id={self.id}, status={status}>"
def __hash__(self):
return hash(self.id)