Source code for robusta.core.reporting.blocks

# TODO: add a KubernetesBlock for rendering Kubernetes object in a standard way
# Notes on how we define all the classes below:
# 1. We use pydantic and not dataclasses so that field types are validated
# 2. We add __init__ methods ourselves for convenience. Without our own __init__ method, something like
#       HeaderBlock("foo") doesn't work. Only HeaderBlock(text="foo") would be allowed by pydantic.
import gzip
import json
import itertools
import logging
import textwrap
from copy import deepcopy
from datetime import datetime
from enum import Enum
from typing import Any, Callable, Dict, List, Literal, Optional, Sequence, Tuple, Union

import hikaru
from hikaru import DiffDetail, DiffType
from hikaru.model.rel_1_26 import HikaruDocumentBase
from pydantic import BaseModel

from robusta.core.model.base_params import ChartValuesFormat

try:
    from tabulate import tabulate
except ImportError:

    def tabulate(*args, **kwargs):
        raise ImportError("Please install tabulate to use the TableBlock")


from prometrix import PrometheusQueryResult

from robusta.core.model.env_vars import PRINTED_TABLE_MAX_WIDTH
from robusta.core.model.pods import format_unit
from robusta.core.reporting.base import BaseBlock
from robusta.core.reporting.consts import ScanType
from robusta.core.reporting.custom_rendering import render_value

BLOCK_SIZE_LIMIT = 2997  # due to slack block size limit of 3000


[docs]class MarkdownBlock(BaseBlock): """ A Block of `Markdown <https://en.wikipedia.org/wiki/Markdown>`__ """ text: str def __init__(self, text: str, dedent: bool = False): """ :param text: one or more paragraphs of Markdown markup :param dedent: if True, remove common indentation so that you can use multi-line docstrings. """ if dedent: if text[0] == "\n": text = text[1:] text = textwrap.dedent(text) if len(text) >= BLOCK_SIZE_LIMIT: text = text[:BLOCK_SIZE_LIMIT] + "..." super().__init__(text=text)
[docs]class DividerBlock(BaseBlock): """ A visual separator between other blocks """ pass
[docs]class FileBlock(BaseBlock): """ A file of any type. Used for images, log files, binary files, and more. """ filename: str contents: bytes def __init__( self, filename: str, contents: bytes, **kwargs, ): """ :param filename: the file's name :param contents: the file's contents """ super().__init__( filename=filename, contents=contents, **kwargs, ) def is_text_file(self): return self.filename.endswith((".txt", ".log")) def zip(self): try: self.contents = gzip.compress(self.contents) self.filename = self.filename + ".gz" except Exception as exc: logging.error(f"Unexpected error occurred while zipping file {self.filename}") logging.exception(exc) def truncate_content(self, max_file_size_bytes: int) -> bytes: """ Truncates the log file by removing lines from the beginning until its size is within the given limit. """ # we don't want to truncate other files like images if not self.is_text_file(): return self.contents decoded_content = self.contents.decode("utf-8") content_length = len(decoded_content) if content_length <= max_file_size_bytes: return self.contents lines = decoded_content.splitlines() byte_length_newline = len("\n".encode("utf-8")) truncated_lines: List[str] = [] for idx, line in enumerate(lines): line_content_length = len(line) + byte_length_newline content_length -= line_content_length if content_length <= max_file_size_bytes: truncated_lines = lines[idx + 1 :] break return "\n".join(truncated_lines).encode("utf-8")
class EmptyFileBlock(BaseBlock): """ Handle empty log files """ metadata: dict = {} filename: str def __init__( self, filename: str, remarks: str, metadata: Optional[dict] = None, **kwargs, ): """ :param filename: the file's name :param contents: the file's contents """ super().__init__( filename=filename, metadata=metadata or {}, **kwargs, ) self.metadata["is_empty"] = True self.metadata["remarks"] = remarks
[docs]class HeaderBlock(BaseBlock): """ Text formatted as a header """ text: str def __init__(self, text: str): """ :param text: the header """ super().__init__(text=text)
[docs]class ListBlock(BaseBlock): """ A list of items, nicely formatted """ items: List[str] def __init__(self, items: List[str]): """ :param items: a list of strings """ super().__init__(items=items) def to_markdown(self) -> MarkdownBlock: mrkdwn = [f"* {item}" for item in self.items] return MarkdownBlock("\n".join(mrkdwn))
# TODO: we should add a generalization of this which isn't K8s specific
[docs]class KubernetesDiffBlock(BaseBlock): """ A diff between two versions of a Kubernetes object """ diffs: List[DiffDetail] old: Optional[str] new: Optional[str] old_obj: Optional[HikaruDocumentBase] new_obj: Optional[HikaruDocumentBase] resource_name: Optional[str] num_additions: Optional[int] num_deletions: Optional[int] num_modifications: Optional[int] kind: str # note that interesting_diffs might be a subset of the full diff between old and new def __init__( self, interesting_diffs: List[DiffDetail], old: Optional[HikaruDocumentBase], new: Optional[HikaruDocumentBase], name: str, kind: str, namespace: str = None, ): """ :param interesting_diffs: parts of the diff to emphasize - some sinks will only show these to save space :param old: the old version of the object :param new: the new version of the object """ num_additions = len([d for d in interesting_diffs if d.diff_type == DiffType.ADDED]) num_deletions = len([d for d in interesting_diffs if d.diff_type == DiffType.REMOVED]) num_modifications = len(interesting_diffs) - num_additions - num_deletions resource_name = self._obj_to_name(old, name, namespace) or self._obj_to_name(new, name, namespace) super().__init__( diffs=interesting_diffs, old=self._obj_to_content(old), new=self._obj_to_content(new), old_obj=old, new_obj=new, resource_name=resource_name, num_additions=num_additions, num_deletions=num_deletions, num_modifications=num_modifications, kind=kind, ) def get_description(self): if not self.old: return f"{self.kind} created" elif not self.new: return f"{self.kind} deleted" else: return self.__updated_description() def __updated_description(self): length = 5 updated_fields = [] for diff in self.diffs: if diff.path: # Stripping any integer values like '0', '1', '2', etc. from the path array which denotes array # indices. eg: ['spec', 'template', 'spec', 'containers', '0', 'image'] stripped_path = [path for path in diff.path if not path.isdigit()] if stripped_path: updated_fields.append(stripped_path[-1]) updated_fields_str = "" if updated_fields: updated_fields_str = f"\n ● Attributes: " updated_fields_len = len(updated_fields) if updated_fields_len < length: updated_fields_str += f"{', '.join(updated_fields)}" else: updated_fields_str += ( f"{', '.join(updated_fields[:length])} ... (and {updated_fields_len - length} more)" ) return f"{self.kind} updated{updated_fields_str}" @staticmethod def _obj_to_content(obj: Optional[HikaruDocumentBase]): if obj is None: return "" else: return hikaru.get_yaml(obj) @staticmethod def _obj_to_name(obj: Optional[HikaruDocumentBase], name: str, namespace: str = ""): if obj is None: return "" kind = getattr(obj, "kind", "").lower() obj_name = "" if kind: obj_name += f"{kind}/" if namespace: obj_name += f"{namespace}/" return f"{obj_name}{name}.yaml"
[docs]class JsonBlock(BaseBlock): """ Json data """ json_str: str def __init__(self, json_str: str): """ :param json_str: json as a string """ super().__init__(json_str=json_str)
class TableBlockFormat(Enum): vertical: str = "vertical"
[docs]class TableBlock(BaseBlock): """ Table display of a list of lists. Note: Wider tables appears as a file attachment on Slack, because they aren't rendered properly inline :var column_width: Hint to sink for the portion of size each column should use. Not supported by all sinks. example: [1, 1, 1, 2] use twice the size for last column. """ rows: List[List] headers: Sequence[str] = () column_renderers: Dict = {} table_name: str = "" column_width: List[int] = None metadata: Dict[str, Any] = {} table_format: Optional[TableBlockFormat] = None def __init__( self, rows: List[List], headers: Sequence[str] = (), column_renderers: Dict = {}, table_name: str = "", column_width: List[int] = None, metadata: Dict[str, Any] = {}, table_format: Optional[TableBlockFormat] = None, **kwargs, ): """ :param rows: a list of rows. each row is a list of columns :param headers: names of each column """ super().__init__( rows=rows, headers=headers, column_renderers=column_renderers, table_name=table_name, column_width=column_width, metadata=metadata, **kwargs, ) if table_format: self.metadata["format"] = TableBlockFormat.vertical.value @classmethod def __calc_max_width(cls, headers, rendered_rows, table_max_width: int) -> List[int]: # We need to make sure the total table width, doesn't exceed the max width, # otherwise, the table is printed corrupted columns_max_widths = [len(header) for header in headers] for row in rendered_rows: for idx, val in enumerate(row): columns_max_widths[idx] = max(len(str(val)), columns_max_widths[idx]) if sum(columns_max_widths) > table_max_width: # We want to limit the widest column largest_width = max(columns_max_widths) widest_column_idx = columns_max_widths.index(largest_width) diff = sum(columns_max_widths) - table_max_width columns_max_widths[widest_column_idx] = largest_width - diff if columns_max_widths[widest_column_idx] < 0: # in case the diff is bigger than the largest column # just divide equally columns_max_widths = [ int(table_max_width / len(columns_max_widths)) for i in range(0, len(columns_max_widths)) ] return columns_max_widths @classmethod def __trim_rows(cls, contents: str, max_chars: int): # We need to make sure that the total character count doesn't exceed max_chars, # but if we cut off a row in the middle then it messes up the whole table. # So instead remove entire rows at a time if len(contents) <= max_chars: return contents truncator = "\n..." max_chars -= len(truncator) lines = contents.splitlines() length_so_far = 0 lines_to_include = 0 for line in lines: new_length = length_so_far + len("\n") + len(line) if new_length > max_chars: break else: length_so_far = new_length lines_to_include += 1 return "\n".join(lines[:lines_to_include]) + truncator @classmethod def __to_strings_rows(cls, rows): # This is just to assert all row column values are strings. Tabulate might fail on other types return [list(map(lambda column_value: str(column_value), row)) for row in rows] def to_markdown(self, max_chars=None, add_table_header: bool = True) -> MarkdownBlock: table_header = f"{self.table_name}\n" if self.table_name else "" table_header = "" if not add_table_header else table_header prefix = f"{table_header}```\n" suffix = "\n```" table_contents = self.to_table_string() if max_chars is not None: max_chars = max_chars - len(prefix) - len(suffix) table_contents = self.__trim_rows(table_contents, max_chars) return MarkdownBlock(f"{prefix}{table_contents}{suffix}") def to_table_string(self, table_max_width: int = PRINTED_TABLE_MAX_WIDTH, table_fmt: str = "presto") -> str: rendered_rows = self.__to_strings_rows(self.render_rows()) col_max_width = self.__calc_max_width(self.headers, rendered_rows, table_max_width) return tabulate( rendered_rows, headers=self.headers, tablefmt=table_fmt, maxcolwidths=col_max_width, ) def render_rows(self) -> List[List]: if self.column_renderers is None: return self.rows new_rows = deepcopy(self.rows) for column_name, renderer_type in self.column_renderers.items(): column_idx = self.headers.index(column_name) for row in new_rows: row[column_idx] = render_value(renderer_type, row[column_idx]) return new_rows
[docs]class KubernetesFieldsBlock(TableBlock): """ A nicely formatted Kubernetes objects, with a subset of the fields shown """ def __init__( self, k8s_obj: HikaruDocumentBase, fields: List[str], explanations: Dict[str, str] = {}, ): """ :param k8s_obj: a kubernetes object :param fields: a list of fields to display. for example ["metadata.name", "metadata.namespace"] :param explanations: an explanation for each field. for example {"metadata.name": "the pods name"} """ if explanations: rows = [[f, k8s_obj.object_at_path(f.split(".")), explanations.get(f, "")] for f in fields] super().__init__(rows=rows, headers=["field", "value", "explanation"]) else: rows = [[f, k8s_obj.object_at_path(f.split("."))] for f in fields] super().__init__(rows=rows, headers=["field", "value"])
class CallbackChoice(BaseModel): action: Callable action_params: Optional[BaseModel] kubernetes_object: Optional[Any] class Config: arbitrary_types_allowed = True
[docs]class CallbackBlock(BaseBlock): """ A set of buttons that allows callbacks from the sink - for example, a button in Slack that will trigger another action when clicked """ choices: Dict[str, CallbackChoice] def __init__(self, choices: Dict[str, CallbackChoice]): """ :param choices: a dict mapping between each the text on each button to the action it triggers """ super().__init__(choices=choices)
class LinkProp(BaseModel): text: str url: str class LinksBlock(BaseBlock): """ A set of links """ links: List[LinkProp] = [] class PrometheusBlockLineData(BaseBlock): legend: str value: Any class PrometheusBlock(BaseBlock): """ Formatted prometheus query results with metadata """ data: PrometheusQueryResult metadata: Dict[str, str] vertical_lines: Optional[List[PrometheusBlockLineData]] horizontal_lines: Optional[List[PrometheusBlockLineData]] y_axis_type: Optional[ChartValuesFormat] graph_name: Optional[str] def __init__( self, data: PrometheusQueryResult, query: str, vertical_lines: Optional[List[PrometheusBlockLineData]] = None, horizontal_lines: Optional[List[PrometheusBlockLineData]] = None, y_axis_type: Optional[ChartValuesFormat] = None, graph_name: Optional[str] = None, metrics_legends_labels: Optional[List[str]] = None, ): """ :param data: the PrometheusQueryResult generated created from a prometheus query :param query: the Prometheus query run """ metadata = {"query-result-version": "1.0", "query": query} super().__init__( data=data, metadata=metadata, vertical_lines=vertical_lines, horizontal_lines=horizontal_lines, y_axis_type=y_axis_type, graph_name=graph_name, ) if metrics_legends_labels: self.metadata["metrics_legends_labels"] = json.dumps(metrics_legends_labels) def dict(self, *args, **kwargs) -> Dict[str, Any]: obj_dict = super().dict() obj_dict["y_axis_type"] = str(self.y_axis_type) if self.y_axis_type else None return obj_dict class ScanReportRow(BaseModel): scan_id: str # UUID scan_type: ScanType kind: Optional[str] name: Optional[str] namespace: Optional[str] container: Optional[str] content: List[Any] # scan result data priority: float TableRow = Tuple[str, ...] ScanTable = Dict[str, List[TableRow]] class ScanReportBlock(BaseBlock): title: str scan_id: str # UUID type: ScanType start_time: datetime end_time: datetime score: str results: List[ScanReportRow] config: str metadata: Dict[str, Any] = {} @property def grade(self): score = int(self.score) if score >= 90: return "A" elif score >= 80: return "B" elif score >= 70: return "C" elif score >= 60: return "D" elif score >= 50: return "E" else: return "F" @property def table_headers(self) -> List[str]: raise NotImplementedError("table_headers property must be implemented in the subclass") @property def table_data(self) -> ScanTable: raise NotImplementedError("table_data property must be implemented in the subclass") @property def table_widths(self) -> Tuple[int, ...]: raise NotImplementedError("table_widths property must be implemented in the subclass") class PopeyeScanReportBlock(ScanReportBlock): @property def table_headers(self) -> List[str]: return ["Priority", "Namespace", "Name", "Container", "Issues"] @property def table_widths(self) -> Tuple[int, ...]: return (10, 25, 25, 25, 65) @property def table_data(self) -> ScanTable: sorted_data = sorted(self.results, key=lambda x: x.kind or "", reverse=True) groupped_data = itertools.groupby(sorted_data, key=lambda x: x.kind) return { kind or "": [ ( self.level_to_string(row.priority), row.name or "", row.namespace or "", row.container or "", self.scan_row_content_format(row), ) for row in rows ] for kind, rows in groupped_data } @staticmethod def level_to_string(level: Union[int, float]) -> str: level = int(level) if level == 1: return "Info" elif level == 2: return "Warning" elif level == 3: return "**Error**" else: return "OK" def scan_row_content_format(self, row: ScanReportRow) -> str: return "\n".join(f"{self.level_to_string(i['level'])} {i['message']}" for i in row.content) class KRRScanReportBlock(ScanReportBlock): @property def table_headers(self) -> List[str]: return [ "Priority", "Namespace", "Name", "Container", "CPU Request", "CPU Limit", "Memory Request", "Memory Limit", ] @property def table_widths(self) -> Tuple[int, ...]: return (5, 10, 10, 10, 8, 8, 8, 8) @staticmethod def __format_krr_value(value: Union[float, Literal["?"], None]) -> str: if value is None: return "unset" elif isinstance(value, str): return "?" else: return format_unit(value) @staticmethod def __format_content_str(allocated: float, recommended: float, info: Optional[str]) -> str: if allocated is None and recommended is None: return "unset" res = f"{KRRScanReportBlock.__format_krr_value(allocated)} -> {KRRScanReportBlock.__format_krr_value(recommended)}" if info is not None: res += f"\n({info})" return res @staticmethod def __parse_content(content) -> Tuple[str, ...]: content_by_resource = {row["resource"]: row for row in content} res = [] for resource in ["cpu", "memory"]: for field in ["request", "limit"]: row = content_by_resource.get(resource) if not row: # NOTE: We should always have a row for each resource, # but if for some reason this is not true, add some value not to break the table res.append("?") continue allocated = row.get("allocated", {}).get(field, "?") recommended = row.get("recommended", {}).get(field, "?") info = row.get("info") res.append(KRRScanReportBlock.__format_content_str(allocated, recommended, info)) return tuple(res) @property def table_data(self) -> ScanTable: sorted_data = sorted(self.results, key=lambda x: x.kind or "", reverse=True) groupped_data = itertools.groupby(sorted_data, key=lambda x: x.kind) return { kind or "": [ ( self.__priority_to_severity(row.priority), row.namespace or "", row.name or "", row.container or "", *self.__parse_content(row.content), ) for row in rows ] for kind, rows in groupped_data } @staticmethod def __priority_to_severity(priority: Union[int, float]) -> str: priority = int(priority) if priority == 4: return "**CRITICAL**" elif priority == 3: return "WARNING" elif priority == 2: return "OK" elif priority == 1: return "GOOD" else: return "UNKNOWN" class EventRow(BaseModel): type: Optional[str] reason: Optional[str] message: Optional[str] kind: str name: str namespace: Optional[str] time: Optional[str] class EventsBlock(TableBlock): """ Table display of a Events, Persists the events on the Robusta Platform. Note: Wider tables appears as a file attachment on Slack, because they aren't rendered properly inline """ events: List[EventRow] = [] def __init__( self, events: List[EventRow], rows: List[List], headers: Sequence[str] = (), column_renderers: Dict = {}, table_name: str = "", column_width: List[int] = None, ): """ :param rows: a list of rows. each row is a list of columns :param headers: names of each column """ super().__init__( events=events, rows=rows, headers=headers, column_renderers=column_renderers, table_name=table_name, column_width=column_width, ) class EventsRef(BaseBlock): namespace: Optional[str] name: str kind: str class GraphBlock(FileBlock): graph_data: Optional[PrometheusBlock] def __init__( self, filename: str, contents: bytes, graph_data: Optional[PrometheusBlock] = None, **kwargs, ): super().__init__( filename=filename, contents=contents, graph_data=graph_data, **kwargs, )