The Basics


You can add your own playbook actions to Robusta with Python.

Please consider sharing your playbooks with the community.

Implementing your first playbook

Lets write a playbook action in Python:

from robusta.api import *

def my_action(event: PodEvent):
    # we have full access to the pod on which the alert fired
    pod = event.get_pod()
    pod_name =
    pod_logs = pod.get_logs()
    pod_processes = pod.exec("ps aux")

    # this is how you send data to slack or other destinations
        MarkdownBlock("*Oh no!* An alert occurred on " + pod_name),
        FileBlock("crashing-pod.log", pod_logs)

You'll need to package up the code as a playbook repository and load it into Robusta. See here for details.

Using your action

Once the playbooks package is loaded, you can use your action.

The action above receives a PodEvent so it can be used for pod-related triggers.

- triggers:
  - on_pod_update: {}
  - my_action: {}

Choosing an event class

In our above action, we want to exec commands on a pod, so obviously we'll need a pod as input. Therefore the action takes a PodEvent.

Some actions are interested in changes and not just static resources - for example, a playbook that shows you a diff of what changed. These actions should take one of the ChangeEvent classes. For example, PodChangeEvent

def pod_change_monitor(event: PodChangeEvent):"new object: {event.obj})"old object: {event.old_obj})

PodChangeEvent will fire on creations, updates, and deletions. You can check the event type with event.operation.

To write a more general action that monitors all Kubernetes changes, we can use KubernetesAnyChangeEvent.

You should always use the highest-possible event class when writing actions. This will let your action be used in as many scenarios as possible. See Events and Triggers for details.

Actions with parameters

Any action can define variables it needs. There are two steps:

  1. Define a class inheriting from ActionParams and use type-annotations to define variables

  2. Add the parameter class as an additional argument to the action

For example:

from robusta.api import *

class BashParams(ActionParams):
   bash_command: str

def pod_bash_enricher(event: PodEvent, params: BashParams):
    pod = event.get_pod()
    if not pod:
        logging.error(f"cannot run PodBashEnricher on event with no pod: {event}")

    block_list: List[BaseBlock] = []
    exec_result = pod.exec(params.bash_command)
    block_list.append(MarkdownBlock(f"Command results for *{params.bash_command}:*"))

We can now define the bash_command parameter in values.yaml:

- triggers:
  - on_pod_update: {}
  - pod_bash_enricher:
      bash_command: "ls -al /"

Under the hood, we use the excellent Pydantic library to implement this.

Please consult Pydantic's documentation for details. ActionParams is a drop-in substitute for Pydantic's BaseModel.


Sometimes you need to prevent an action from running too often. You can use the RateLimiter class for that:

from robusta.api import *

def argo_app_sync(event: ExecutionBaseEvent, params: ArgoAppParams):
    if not RateLimiter.mark_and_test(
        params.argo_url + params.argo_app_name,

The second parameter to RateLimiter.mark_and_test defines a key used for checking the rate limit. Each key is rate-limited individually.


Normally, multiple actions run one after another. (See Flow Control.)

A playbook can stop Robusta from running further actions by setting event.stop_processing = True.

 def my_first_action(event: EventChangeEvent):

        event.stop_processing = True  # no need to run any other playbook or action


Robusta uses many open source libraries, but two of them outshine all others:

  1. Hikaru

  2. Pydantic

We owe a special thank you to Tom Carroll and Samuel Colvin.

A further thank you is due to the countless developers who created other libraries we use. You rock.

Common gotchas

Datetime fields in Kubernetes resources are strings, not datetime objects. Use the utility function parse_kubernetes_datetime to convert them.