Source code for robusta.integrations.kubernetes.api_client_utils

import datetime
import io
import logging
import os
import re
import tarfile
import tempfile
import time
import traceback
from typing import List, Optional

from hikaru.model.rel_1_26 import Job
from kubernetes import config
from kubernetes.client.api import core_v1_api
from kubernetes.client.rest import ApiException
from kubernetes.stream import stream

RUNNING_STATE = "Running"
SUCCEEDED_STATE = "Succeeded"

try:
    if os.getenv("KUBERNETES_SERVICE_HOST"):
        config.load_incluster_config()
    else:
        config.load_kube_config()
except config.config_exception.ConfigException as e:
    logging.warning(f"Running without kube-config! e={e}")

default_exec_command = ["/bin/sh", "-c"]


def wait_until(read_function, predicate_function, timeout_sec: float, backoff_wait_sec: float):
    """
    repeatedly calls predicate_function(read_function)) until predicate_function returns True or we timeout
    return the last result of read_function() on success and raises an exception on timeout
    between attempts, we wait backoff_wait_sec seconds
    """
    start_time_sec = time.time()

    while start_time_sec + timeout_sec > time.time():
        try:
            resp = read_function()
            if predicate_function(resp):
                return resp
        except ApiException:
            logging.error(f"failed calling read_function {traceback.format_exc()}")

        time.sleep(backoff_wait_sec)

    raise Exception("Failed to reach wait condition")


def wait_until_job_complete(job: Job, timeout):
    """
    wait until a kubernetes Job object either succeeds or fails at least once
    """

    def is_job_complete(j: Job) -> bool:
        return j.status.completionTime is not None or j.status.failed is not None

    return wait_until(
        lambda: Job.readNamespacedJob(job.metadata.name, job.metadata.namespace).obj,
        is_job_complete,
        timeout,
        5,
    )


# TODO: refactor to use wait_until function
def wait_for_pod_status(name, namespace, status: str, timeout_sec: float, backoff_wait_sec: float) -> str:
    pod_details = f"pod status: {name} {namespace} {status} {timeout_sec}"
    logging.debug(f"waiting for {pod_details}")

    start_time_sec = time.time()
    while start_time_sec + timeout_sec > time.time():
        try:
            core_v1 = core_v1_api.CoreV1Api()
            resp = core_v1.read_namespaced_pod_status(name, namespace)

            if resp.status.phase == status:
                logging.debug(f"reached {pod_details}")
                return status

        except ApiException:
            logging.error(f"failed to get pod status {name} {namespace} {traceback.format_exc()}")

        time.sleep(backoff_wait_sec)

    logging.debug(f"failed to reach {pod_details}")
    return "FAIL"


def exec_shell_command(name, shell_command: str, namespace="default", container=None):
    commands = default_exec_command.copy()
    commands.append(shell_command)
    return exec_commands(name, commands, namespace, container)


def upload_file(name: str, destination: str, contents: bytes, namespace="default", container=None):
    core_v1 = core_v1_api.CoreV1Api()
    resp = stream(
        core_v1.connect_get_namespaced_pod_exec,
        name,
        namespace,
        container=container,
        command=["tar", "xvf", "-", "-C", "/"],
        stderr=True,
        stdin=True,
        stdout=True,
        tty=False,
        _preload_content=False,
    )

    with tempfile.TemporaryFile() as local_tar_file:
        with tarfile.open(fileobj=local_tar_file, mode="w") as tar:
            tarinfo = tarfile.TarInfo(destination)
            tarinfo.size = len(contents)
            tar.addfile(tarinfo, fileobj=io.BytesIO(contents))

        local_tar_file.seek(0)
        resp.write_stdin(local_tar_file.read())

        while resp.is_open():
            resp.update(timeout=1)
            if resp.peek_stdout():
                print("STDOUT: %s" % resp.read_stdout())
            if resp.peek_stderr():
                print("STDERR: %s" % resp.read_stderr())
            else:
                break
        resp.close()


def get_pod_logs(
    name,
    namespace="default",
    container="",
    previous=None,
    tail_lines=None,
    since_seconds=None,
):
    resp = None
    try:
        core_v1 = core_v1_api.CoreV1Api()
        resp = core_v1.read_namespaced_pod_log(
            name,
            namespace,
            container=container,
            previous=previous,
            tail_lines=tail_lines,
            since_seconds=since_seconds,
            _preload_content=False  # If this flag is not used, double quotes in json objects in stdout are converted
            # by the kubernetes client to single quotes, which makes json.loads() fail.
            # We therefore use this flag in order to get the raw bytes and decode the output
        ).data.decode("utf-8")

    except ApiException as e:
        if e.status != 404:
            logging.exception(f"failed to get pod logs {name} {namespace} {container}")
            resp = ""

    logging.debug(f"get logs {resp}")
    return resp


def list_available_services(
    namespace="default",
):
    resp = None
    try:
        core_v1 = core_v1_api.CoreV1Api()
        resp = core_v1.list_namespaced_service(
            namespace,
            _preload_content=False  # If this flag is not used, double quotes in json objects in stdout are converted
            # by the kubernetes client to single quotes, which makes json.loads() fail.
            # We therefore use this flag in order to get the raw bytes and decode the output
        ).data.decode("utf-8")

    except ApiException as e:
        if e.status != 404:
            logging.exception(f"failed to get services in {namespace} namespace")
            resp = ""

    logging.debug(f"list available services {resp}")
    return resp


def prepare_pod_command(cmd) -> Optional[List[str]]:
    if type(cmd) == list:
        return cmd
    elif type(cmd) == str:
        return cmd.split(" ")  # cmd need to be a list of strings
    elif cmd is None:
        return None
    else:
        logging.exception(f"cmd {cmd} has unknown type {type(cmd)}")
        return cmd


def exec_commands(name, exec_command, namespace="default", container=None):
    logging.debug(
        f"Executing command name: {name} command: {exec_command} namespace: {namespace} container: {container}"
    )

    # verify pod state before connecting
    pod_status = wait_for_pod_status(name, namespace, RUNNING_STATE, 90, 0.2)  # TODO config
    if pod_status != RUNNING_STATE:
        msg = f"Not running exec commands. Pod {name} {namespace} is not in running state"
        logging.error(msg)
        return msg

    wsclient = None
    try:
        core_v1 = core_v1_api.CoreV1Api()
        wsclient = stream(
            core_v1.connect_get_namespaced_pod_exec,
            name,
            namespace,
            container=container,
            command=exec_command,
            stderr=True,
            stdin=False,
            stdout=True,
            tty=False,
            _preload_content=False,  # fix https://github.com/kubernetes-client/python/issues/811
        )
        wsclient.run_forever()
        response = wsclient.read_all()
        logging.debug(f"exec command response {response}")
    finally:
        if wsclient is not None:
            wsclient.close()

    return response


def to_kubernetes_name(name, prefix=""):
    """
    Returns a valid and unique kubernetes name based on prefix and name, replacing characters in name as necessary
    see https://kubernetes.io/docs/concepts/overview/working-with-objects/names/
    """
    unique_id = str(time.time()).replace(".", "-")
    safe_name = re.sub("[^0-9a-zA-Z\\-]+", "-", name)
    return f"{prefix}{safe_name}-{unique_id}"[:63]


[docs]def parse_kubernetes_datetime(k8s_datetime: str) -> datetime.datetime: """ Parse a date/time related field in a Kubernetes object and convert it from string to a python datetime object """ return datetime.datetime.strptime(k8s_datetime, "%Y-%m-%dT%H:%M:%S%z")
def parse_kubernetes_datetime_with_ms(k8s_datetime: str) -> datetime.datetime: return datetime.datetime.strptime(k8s_datetime, "%Y-%m-%dT%H:%M:%S.%f%z") def parse_kubernetes_datetime_to_ms(k8s_datetime: str) -> float: """ for timestamps eventTime has milliseconds and firstTimestamp,lastTimestamp,creationTimestamp do not. we parse the more commonly used timestamp first """ try: return parse_kubernetes_datetime(k8s_datetime).timestamp() * 1000 except ValueError: return parse_kubernetes_datetime_with_ms(k8s_datetime).timestamp() * 1000