Source code for apolo_sdk._apps

from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, AsyncIterator, List, Optional

from aiohttp import WSMsgType
from yarl import URL

from ._config import Config
from ._core import _Core
from ._rewrite import rewrite_module
from ._utils import NoPublicConstructor, asyncgeneratorcontextmanager


@rewrite_module
@dataclass(frozen=True)
class AppTemplate:
    name: str
    title: str
    version: str
    short_description: str = ""
    tags: List[str] = field(default_factory=list)


@rewrite_module
@dataclass(frozen=True)
class AppValue:
    app_instance_id: str
    type: str
    path: str
    value: Any


@rewrite_module
@dataclass(frozen=True)
class App:
    id: str
    name: str
    display_name: str
    template_name: str
    template_version: str
    project_name: str
    org_name: str
    state: str


@rewrite_module
class Apps(metaclass=NoPublicConstructor):
    def __init__(self, core: _Core, config: Config) -> None:
        self._core = core
        self._config = config

    def _build_base_url(
        self,
        cluster_name: Optional[str] = None,
        org_name: Optional[str] = None,
        project_name: Optional[str] = None,
    ) -> URL:
        cluster_name = cluster_name or self._config.cluster_name
        if org_name is None:
            org_name = self._config.org_name
            if org_name is None:
                raise ValueError("Organization name is required")
        if project_name is None:
            project_name = self._config.project_name
            if project_name is None:
                raise ValueError("Project name is required")

        # Get the base URL without the /api/v1 prefix
        base_url = self._config.api_url.with_path("")
        url = (
            base_url
            / "apis/apps/v1/cluster"
            / cluster_name
            / "org"
            / org_name
            / "project"
            / project_name
        )
        return url

    def _get_monitoring_url(self, cluster_name: Optional[str]) -> URL:
        if cluster_name is None:
            cluster_name = self._config.cluster_name
        return self._config.get_cluster(cluster_name).monitoring_url.with_path(
            "/api/v1"
        )

[docs] @asyncgeneratorcontextmanager async def list( self, cluster_name: Optional[str] = None, org_name: Optional[str] = None, project_name: Optional[str] = None, ) -> AsyncIterator[App]: url = ( self._build_base_url( cluster_name=cluster_name, org_name=org_name, project_name=project_name, ) / "instances" ) auth = await self._config._api_auth() async with self._core.request("GET", url, auth=auth) as resp: data = await resp.json() for item in data["items"]: yield App( id=item["id"], name=item["name"], display_name=item["display_name"], template_name=item["template_name"], template_version=item["template_version"], project_name=item["project_name"], org_name=item["org_name"], state=item["state"], )
async def install( self, app_data: dict[str, Any], cluster_name: Optional[str] = None, org_name: Optional[str] = None, project_name: Optional[str] = None, ) -> None: url = ( self._build_base_url( cluster_name=cluster_name, org_name=org_name, project_name=project_name, ) / "instances" ) auth = await self._config._api_auth() async with self._core.request("POST", url, json=app_data, auth=auth): pass async def uninstall( self, app_id: str, cluster_name: Optional[str] = None, org_name: Optional[str] = None, project_name: Optional[str] = None, ) -> None: url = ( self._build_base_url( cluster_name=cluster_name, org_name=org_name, project_name=project_name, ) / "instances" / app_id ) auth = await self._config._api_auth() async with self._core.request("DELETE", url, auth=auth): pass
[docs] @asyncgeneratorcontextmanager async def get_values( self, app_id: Optional[str] = None, value_type: Optional[str] = None, cluster_name: Optional[str] = None, org_name: Optional[str] = None, project_name: Optional[str] = None, ) -> AsyncIterator[AppValue]: """Get values from app instances. Args: app_id: Optional app instance ID to filter values value_type: Optional value type to filter cluster_name: Optional cluster name override org_name: Optional organization name override project_name: Optional project name override Returns: An async iterator of AppValue objects """ base_url = self._build_base_url( cluster_name=cluster_name, org_name=org_name, project_name=project_name, ) if app_id is not None: url = base_url / "instances" / app_id / "values" else: url = base_url / "instances" / "values" params = {} if value_type is not None: params["type"] = value_type auth = await self._config._api_auth() async with self._core.request("GET", url, params=params, auth=auth) as resp: data = await resp.json() for item in data["items"]: yield AppValue( app_instance_id=item["app_instance_id"], type=item["type"], path=item["path"], value=item.get("value"), )
[docs] @asyncgeneratorcontextmanager async def list_templates( self, cluster_name: Optional[str] = None, org_name: Optional[str] = None, project_name: Optional[str] = None, ) -> AsyncIterator[AppTemplate]: url = ( self._build_base_url( cluster_name=cluster_name, org_name=org_name, project_name=project_name, ) / "templates" ) auth = await self._config._api_auth() async with self._core.request("GET", url, auth=auth) as resp: data = await resp.json() for item in data: # Create the AppTemplate object with only the required fields yield AppTemplate( name=item.get("name", ""), version=item.get("version", ""), title=item.get("title", ""), short_description=item.get("short_description", ""), tags=item.get("tags", []), )
[docs] @asyncgeneratorcontextmanager async def list_template_versions( self, name: str, cluster_name: Optional[str] = None, org_name: Optional[str] = None, project_name: Optional[str] = None, ) -> AsyncIterator[AppTemplate]: """List all available versions for a specific app template. Args: name: The name of the app template cluster_name: Optional cluster name override org_name: Optional organization name override project_name: Optional project name override Returns: An async iterator of AppTemplate objects """ url = ( self._build_base_url( cluster_name=cluster_name, org_name=org_name, project_name=project_name, ) / "templates" / name ) auth = await self._config._api_auth() async with self._core.request("GET", url, auth=auth) as resp: data = await resp.json() for item in data: # Return AppTemplate objects with the same name but different versions yield AppTemplate( name=name, version=item.get("version", ""), title=item.get("title", ""), short_description=item.get("short_description", ""), tags=item.get("tags", []), )
[docs] @asyncgeneratorcontextmanager async def logs( self, app_id: str, *, cluster_name: Optional[str] = None, org_name: Optional[str] = None, project_name: Optional[str] = None, since: Optional[datetime] = None, timestamps: bool = False, ) -> AsyncIterator[bytes]: """Get logs for an app instance. Args: app_id: The ID of the app instance cluster_name: Optional cluster name override org_name: Optional organization name override project_name: Optional project name override since: Optional timestamp to start logs from timestamps: Include timestamps in the logs output Returns: An async iterator of log chunks as bytes """ url = self._get_monitoring_url(cluster_name) / "apps" / app_id / "log_ws" if url.scheme == "https": # pragma: no cover url = url.with_scheme("wss") else: url = url.with_scheme("ws") if since is not None: if since.tzinfo is None: # Interpret naive datetime object as local time. since = since.astimezone() # pragma: no cover url = url.update_query(since=since.isoformat()) if timestamps: url = url.update_query(timestamps="true") auth = await self._config._api_auth() async with self._core.ws_connect( url, auth=auth, timeout=None, heartbeat=30, ) as ws: async for msg in ws: if msg.type == WSMsgType.BINARY: if msg.data: yield msg.data elif msg.type == WSMsgType.ERROR: # pragma: no cover raise ws.exception() # type: ignore else: # pragma: no cover raise RuntimeError(f"Incorrect WebSocket message: {msg!r}")