Source code for src.internal.dependencies.k8s_client
"""Create a K8S client for use in the app."""
from kubernetes.client import ApiClient, Configuration
from kubernetes.config import ConfigException, load_incluster_config, load_kube_config
from ...config.config import config
[docs]def get_k8s_client() -> ApiClient:
"""Create a K8S client to interact with the cluster.
Returns:
ApiClient: K8S client
"""
k8s_config = Configuration()
try:
load_incluster_config(k8s_config)
except ConfigException: # app not running within K8S cluster
try:
load_kube_config(client_configuration=k8s_config)
except ConfigException:
k8s_config.api_key["authorization"] = config.K8S_API_KEY
k8s_config.host = config.K8S_HOST
return ApiClient(k8s_config)