Source code for apolo_sdk._secrets

import base64
from dataclasses import dataclass
from typing import AsyncIterator, Optional

from yarl import URL

from ._config import Config
from ._core import _Core
from ._rewrite import rewrite_module
from ._utils import NoPublicConstructor, asyncgeneratorcontextmanager


@rewrite_module
@dataclass(frozen=True)
class Secret:
    key: str
    owner: str
    cluster_name: str
    org_name: str
    project_name: str

    @property
    def uri(self) -> URL:
        base = f"secret://{self.cluster_name}"
        if self.org_name:
            base += f"/{self.org_name}"
        return URL(f"{base}/{self.project_name}/{self.key}")


@rewrite_module
class Secrets(metaclass=NoPublicConstructor):
    def __init__(self, core: _Core, config: Config) -> None:
        self._core = core
        self._config = config

    def _get_secrets_url(self, cluster_name: Optional[str]) -> URL:
        if cluster_name is None:
            cluster_name = self._config.cluster_name
        return self._config.get_cluster(cluster_name).secrets_url

[docs] @asyncgeneratorcontextmanager async def list( self, cluster_name: Optional[str] = None, org_name: Optional[str] = None, project_name: Optional[str] = None, ) -> AsyncIterator[Secret]: url = self._get_secrets_url(cluster_name) params = {} params["org_name"] = org_name or self._config.org_name if project_name: params["project_name"] = project_name auth = await self._config._api_auth() async with self._core.request("GET", url, params=params, auth=auth) as resp: ret = await resp.json() for j in ret: yield Secret( key=j["key"], owner=j["owner"], cluster_name=cluster_name or self._config.cluster_name, org_name=j.get("org_name") or "NO_ORG", project_name=j["project_name"], )
[docs] async def add( self, key: str, value: bytes, cluster_name: Optional[str] = None, org_name: Optional[str] = None, project_name: Optional[str] = None, ) -> None: url = self._get_secrets_url(cluster_name) auth = await self._config._api_auth() data = { "key": key, "value": base64.b64encode(value).decode("ascii"), "org_name": org_name or self._config.org_name, "project_name": project_name or self._config.project_name_or_raise, } async with self._core.request("POST", url, auth=auth, json=data): pass
[docs] async def rm( self, key: str, cluster_name: Optional[str] = None, org_name: Optional[str] = None, project_name: Optional[str] = None, ) -> None: url = self._get_secrets_url(cluster_name) / key auth = await self._config._api_auth() params = { "project_name": project_name or self._config.project_name_or_raise, } org_name_val = org_name or self._config.org_name if org_name_val: params["org_name"] = org_name_val async with self._core.request("DELETE", url, auth=auth, params=params): pass