Skip to content

Job

A Job represents a running or completed execution of an IVCAP service. Jobs are created by calling service.request_job() and tracked via job.refresh().

Quick Reference

import time
from ivcap_client import JobStatus

# Submit
job = service.request_job(req)
print(f"Job ID: {job.id}")

# Poll
while not job.finished:
    time.sleep(5)
    job.refresh()

# Check result
if job.status() == JobStatus.SUCCEEDED:
    print(job.result)

JobStatus Enum

JobStatus represents all possible states of a job:

Value Meaning
UNKNOWN Status cannot be determined
PENDING Job created, awaiting a dispatcher slot
SCHEDULED Dispatcher has provisioned compute resources
EXECUTING Service container is actively processing
SUCCEEDED Job completed successfully; result is available
FAILED Service reported a business-logic failure
ERROR Platform-level error (OOM, timeout, infrastructure)

Class Documentation

Job dataclass

This class represents a particular job placed or executed at a particular IVCAP deployment

Source code in ivcap_client/job.py
@dataclass
class Job:
    """This class represents a particular job placed
    or executed at a particular IVCAP deployment"""

    id: str
    name: str | None = None
    request_content_type: str | None = None
    result_content_type: str | None = None
    requested_at: datetime.datetime | None = None
    started_at: datetime.datetime | None = None
    finished_at: datetime.datetime | None = None

    policy: URN | None = None
    account: URN | None = None

    @classmethod
    def _from_list_item(cls, item: JobListItem, ivcap: IVCAP):
        kwargs = item.to_dict()
        return cls(ivcap, **kwargs)

    @classmethod
    def from_create_job_response(cls, response: Response, service: Service):
        if response.status_code == 200:
            kwargs = {
                "id": response.headers.get("ivcap-job-id"),
                "service": service,
                "result-content": response.json(),
                "result-content-type": response.headers.get("content-type"),
            }
            return cls(service._ivcap, **kwargs)
        elif response.status_code == 202:
            j = response.json()
            id = j.get("job-id")
            return cls(service._ivcap, id=id, service=service)
        else:
            raise ("not implemented, yet")

    def __init__(self, ivcap: IVCAP, **kwargs):
        if not ivcap:
            raise ValueError("missing 'ivcap' argument")
        self._ivcap = ivcap
        service = kwargs.pop("service")
        if service is not None:
            if isinstance(service, Service):
                self._service_obj = service
                self._service = service.id
            else:
                self._service_obj = None
                self._service = service
        self._request_content = None
        self._result_content = None
        self.__update__(**kwargs)

    def __update__(self, **kwargs):
        p = [
            "id",
            "name",
            "request-content-type",
            "result-content-type",
            "requested-at",
            "started-at",
            "finished-at",
            "policy",
            "account",
        ]
        hp = ["status", "request-content", "result-content"]
        _set_fields(self, p, hp, kwargs)
        self._status = JobStatus.from_string(self._status)
        if self._service_obj == None and self._service != None:
            self._service_obj = Service(id=self._service)

    @property
    def urn(self) -> str:
        return self.id

    def status(self, refresh=True) -> JobStatus:
        if refresh:
            self.refresh()
        return self._status

    async def status_async(self, refresh=True) -> Awaitable[JobStatus]:
        if refresh:
            await self.refresh_async()
        return self._status

    @property
    def finished(self):
        if self._finished:
            return True
        self.refresh()
        return self._finished

    @property
    def _finished(self):
        return self._status in [JobStatus.SUCCEEDED, JobStatus.FAILED, JobStatus.ERROR]

    async def finished_async(self) -> Awaitable[bool]:
        if self._status in [JobStatus.SUCCEEDED, JobStatus.FAILED, JobStatus.ERROR]:
            return True
        await self.refresh_async()
        return self._status in [JobStatus.SUCCEEDED, JobStatus.FAILED, JobStatus.ERROR]

    async def wait_for_finished_async(
        self, max_wait_time: float | None = None, poll_interval: float = 5.0
    ) -> Awaitable[Job]:
        import asyncio

        start_time = datetime.now()
        while not await self.finished_async():
            if max_wait_time is not None:
                elapsed = (datetime.now() - start_time).total_seconds()
                if elapsed >= max_wait_time:
                    raise TimeoutError(
                        f"Job '{self.id}' did not finish within {max_wait_time} seconds"
                    )
            await asyncio.sleep(poll_interval)
        return self

    @property
    def succeeded(self):
        return self.finished and self._status == JobStatus.SUCCEEDED

    @property
    async def succeeded_async(self):
        finished = await self.finished_async()
        return finished and self._status == JobStatus.SUCCEEDED

    @property
    def service(self) -> Service:
        return self._service

    @property
    def result(self):
        if self._result_content == None:
            self.refresh()
        return self._result_content

    async def result_async(self):
        if self._result_content == None:
            await self.refresh_async()
        return self._result_content

    def refresh(self) -> Job:
        if self._finished:
            return self  # no need to refresh

        kwargs = self._refresh_top()
        r = service_job_read.sync_detailed(**kwargs)
        return self._refresh_bottom(r)

    async def refresh_async(self) -> Awaitable[Job]:
        if self._finished:
            return self  # no need to refresh

        kwargs = self._refresh_top()
        r = await service_job_read.asyncio_detailed(**kwargs)
        return self._refresh_bottom(r)

    def _refresh_top(self):
        return {
            "client": self._ivcap._client,
            "id": self.id,
            "service_id": self._service,
            "with_request_content": self._request_content == None,
            "with_result_content": self._result_content == None,
        }

    def _refresh_bottom(self, r: Response) -> Job:
        if r.status_code >= 300:
            return process_error("place_job", r)
        kwargs = r.parsed.to_dict()
        self.__update__(**kwargs)
        return self

    def __repr__(self):
        status = self._status if self._status else "???"
        return f"<Job id={self.id}, status={status}>"

    def __hash__(self):
        return hash(self.id)

Enum Documentation

JobStatus

Bases: Enum

Source code in ivcap_client/job.py
class JobStatus(Enum):
    UNKNOWN = "unknown"
    PENDING = "pending"
    SCHEDULED = "scheduled"
    EXECUTING = "executing"
    SUCCEEDED = "succeeded"
    FAILED = "failed"
    ERROR = "error"

    @classmethod
    def from_string(cls, s: str) -> JobStatus:
        try:
            return cls(s)
        except ValueError:
            return cls.UNKNOWN