@dataclass
class Service:
"""This class represents a particular service available
in a particular IVCAP deployment"""
id: URN | None = None
name: str | None = None
description: str | None = None
banner: str | None = None
policy: URN | None = None
published_at: datetime.datetime | None = None
policy: URN | None = None
account: URN | None = None
@classmethod
def _from_list_item(cls, item: ServiceListItemT, ivcap: IVCAP):
kwargs = item.to_dict()
return cls(ivcap, **kwargs)
def __init__(self, ivcap: IVCAP, **kwargs):
if not ivcap:
raise ValueError("missing 'ivcap' argument")
self._ivcap = ivcap
self._request_model = None
self.__update__(**kwargs)
def __update__(self, **kwargs):
p = ["id", "name", "description", "banner", "policy", "published-at", "account"]
hp = ["status"]
_set_fields(self, p, hp, kwargs)
self._parameters: dict[str, ServiceParameter] | None = None
params = kwargs.get("parameters")
if params != None:
pd = dict(
map(
lambda d: [
d["name"].replace("-", "_"),
ServiceParameter(ParameterDefT.from_dict(d)),
],
params,
)
)
self._parameters = pd
def status(self, refresh=True) -> ServiceStatusRTStatus:
if refresh:
self.refresh()
return self._status
@property
def parameters(self) -> dict[str, ServiceParameter]:
if not self._parameters:
self.refresh()
return self._parameters
@property
def mandatory_parameters(self) -> set[str]:
v = self.parameters.values()
f = map(lambda p: p.name, filter(lambda p: not p.is_optional, v))
return set(f)
@property
def request_model(self) -> type[BaseModel]:
if not self._request_model:
return self._fetch_request_model()
return self._request_model
def _fetch_request_model(self) -> type[BaseModel]:
if not self._request_model:
schema = "urn:sd-core:schema.ai-tool.1"
l = self._ivcap.list_aspects(
schema=schema, entity=self.id, include_content=False, limit=2
)
if not l.has_next():
raise ValueError("cannot find request (tool) model for this service")
m = next(l)
if l.has_next():
raise OverflowError("Found more then one model definition")
js = m.aspect["fn_schema"]
self._request_model = model_from_json_schema(js, f"Service{id(self)}")
return self._request_model
async def request_model_async(self) -> Awaitable[type[BaseModel]]:
if self._request_model:
return self._request_model
return await asyncio.to_thread(self._fetch_request_model)
def request_job(
self, data: BaseModel | object | IO[str], timeout: int | None = 0
) -> Job:
kwargs = self._get_request_job_args(data, timeout)
response = self._ivcap._client.get_httpx_client().request(**kwargs)
return self._process_job_reply(response)
async def request_job_async(
self,
data: BaseModel | object | IO[str],
max_wait_time: float | None = None,
poll_interval: float = 5.0,
) -> Awaitable[Job]:
start_time = datetime.datetime.now()
kwargs = self._get_request_job_args(data, max_wait_time)
response = await self._ivcap._client.get_async_httpx_client().request(**kwargs)
job = self._process_job_reply(response)
remaining = max_wait_time
if max_wait_time:
elapsed = (datetime.datetime.now() - start_time).total_seconds()
remaining = max_wait_time - elapsed
if remaining <= 0:
raise TimeoutError(
f"Job '{self.id}' did not finish within {max_wait_time} seconds"
)
return await job.wait_for_finished_async(
max_wait_time=remaining, poll_interval=poll_interval
)
def _get_request_job_args(
self, data: BaseModel | object | IO[str], timeout: int | None = 0
):
headers: dict[str, Any] = {
"Timeout": str(timeout if timeout != None else 0),
"Content-Type": "application/json",
}
kwargs: dict[str, Any] = {
"method": "post",
"url": f"/1/services2/{self.id}/jobs",
}
# serialise 'data' into a json object
if (
isinstance(data, io.IOBase)
and hasattr(data, "read")
and callable(data.read)
):
try:
# Attempt to load JSON from the file object
loaded_body = json.load(data)
body = json.dumps(loaded_body, indent=2)
except json.JSONDecodeError:
raise ValueError(
"The provided file object does not contain valid JSON."
)
elif is_dataclass(data):
body = json.dumps(asdict(data), indent=2)
elif isinstance(data, BaseModel):
body = data.model_dump_json(indent=2)
else:
raise TypeError(
"Input data must be a dataclass object, Pydantic instance, "
"or a readable file object containing JSON."
)
kwargs["data"] = body
kwargs["headers"] = headers
return kwargs
def _process_job_reply(self, response: Response) -> Job:
if response.status_code >= 300:
return process_error("request_job", response)
from ivcap_client.job import Job
return Job.from_create_job_response(response, self)
def refresh(self) -> Service:
r = service_service_read.sync_detailed(self.id, client=self._ivcap._client)
if r.status_code >= 300:
return process_error("create_service", r)
p: ServiceStatusRT = r.parsed
self.__update__(**p.to_dict())
return self
def __repr__(self):
name = self.name if self.name else "???"
return f"<Service id={self.id}, name={name}>"