diff --git a/README.md b/README.md index d7dd241..566ae85 100755 --- a/README.md +++ b/README.md @@ -17,6 +17,8 @@ Contributions are welcome. See [here](https://docs.github.com/en/free-pro-team@l 1. Full kubernetes error logs on failure. 1. Integrated operator airflow config, see below. 1. Integrated Jinja2 support for file templates with flag. +1. XCom +1. Log based events 1. Tested and working on [google cloud composer](https://cloud.google.com/composer). ### Two operator classes are available @@ -195,17 +197,25 @@ startup_timeout_seconds | 10 | The max number of seconds to create the job befor validate_body_on_init | False | Can be set to true only if jinja is disabled. Process the yaml when the object is created. enable_jinja| True | Enable jinja on the body (str, or file), and the following args: command, arguments, image, envs, body, namespace, config_file, cluster_context jinja_job_args | None | A dictionary or object to be used in the jinja template to render arguments. The jinja args are loaded under the keyword "job". +on_kube_api_event | None | A method to capture kube api log events. By default is none. log output pattern: `::kube_api:[name]=value` +parse_xcom_event| json parser | Parse the result of xcom events, `::kube_api:xcom={json values...}` -# Contribution +# XCom -Are welcome, please post issues or PR's if needed. +The implementation of XCom via the KubernetesJobOperator differes from the one by KuberenetesPodsOperator, +and therefore no backward compatible implementation for XCom currently exists. + +To use xcom with KubernetesJobOperator simply add a log line to your pod log output, + +```shell +::kube_api:xcom={"a":2,"[key]":"[value]"} +``` -# Implementations still missing: +Note the value of the xcom must be in json format (for the default parser). -Add an issue (or better submit PR) if you need these. +# Contribution -1. XCom -1. Examples (other than TL;DR) +Are welcome, please post issues or PR's if needed. # Licence diff --git a/airflow_kubernetes_job_operator/config.py b/airflow_kubernetes_job_operator/config.py index 2c73f02..44fc1d3 100755 --- a/airflow_kubernetes_job_operator/config.py +++ b/airflow_kubernetes_job_operator/config.py @@ -1,9 +1,9 @@ import logging from typing import Type, Dict from enum import Enum +from airflow_kubernetes_job_operator.kube_api import config as kube_api_config from airflow_kubernetes_job_operator.kube_api.utils import not_empty_string -from airflow_kubernetes_job_operator.kube_api.config import DEFAULT_KUBE_CONFIG_LOCATIONS -from airflow_kubernetes_job_operator.kube_api.queries import LogLine +from airflow_kubernetes_job_operator.kube_api.queries import LogLine, GetPodLogs from airflow_kubernetes_job_operator.utils import resolve_path from airflow.configuration import conf, log from airflow.exceptions import AirflowConfigException @@ -97,4 +97,22 @@ def get( log = loc.strip() if len(loc) == 0: continue - DEFAULT_KUBE_CONFIG_LOCATIONS.insert(0, loc) + kube_api_config.DEFAULT_KUBE_CONFIG_LOCATIONS.insert(0, loc) + +GetPodLogs.enable_kube_api_events = get( + "log_query_enable_api_events", + default=GetPodLogs.enable_kube_api_events, + otype=bool, +) + +GetPodLogs.api_event_match_regexp = get( + "log_query_api_event_match_regexp", + default=GetPodLogs.api_event_match_regexp, + otype=str, +) + +GetPodLogs.emit_api_events_as_log = get( + "log_query_emit_api_events_as_log", + default=GetPodLogs.emit_api_events_as_log, + otype=bool, +) diff --git a/airflow_kubernetes_job_operator/job_runner.py b/airflow_kubernetes_job_operator/job_runner.py index d531198..9649875 100755 --- a/airflow_kubernetes_job_operator/job_runner.py +++ b/airflow_kubernetes_job_operator/job_runner.py @@ -10,8 +10,10 @@ from airflow_kubernetes_job_operator.collections import JobRunnerDeletePolicy, JobRunnerException from airflow_kubernetes_job_operator.config import SHOW_RUNNER_ID_IN_LOGS from airflow_kubernetes_job_operator.kube_api import ( + EventHandler, KubeApiConfiguration, GetAPIVersions, + KubeLogApiEvent, KubeApiRestQuery, KubeApiRestClient, KubeResourceKind, @@ -256,12 +258,14 @@ def execute_job( self, timeout: int = 60 * 5, watcher_start_timeout: int = 10, + on_kube_api_event: callable = None, ): """Execute the job Args: timeout (int, optional): Execution timeout. Defaults to 60*5. watcher_start_timeout (int, optional): The timeout to start watching the namespaces. Defaults to 10. + on_kube_api_event(callable, optional): Handle kube api events. Returns: KubeResourceState: The main resource (resources[0]) final state. @@ -329,6 +333,16 @@ def execute_job( label_selector=self.job_label_selector, ) + def handle_kube_api_event(event: KubeLogApiEvent): + self.log(f"{event.line.get_context_header()} KubeApiEvent {event.name} of {len(event.value)} [chars]") + if on_kube_api_event: + on_kube_api_event(event) + + watcher.on( + watcher.pod_log_api_event_name, + handle_kube_api_event, + ) + # start the watcher if self.show_watcher_logs: watcher.pipe_to_logger(self.logger) diff --git a/airflow_kubernetes_job_operator/kube_api/config.py b/airflow_kubernetes_job_operator/kube_api/config.py index 64e22b3..ba103c0 100755 --- a/airflow_kubernetes_job_operator/kube_api/config.py +++ b/airflow_kubernetes_job_operator/kube_api/config.py @@ -7,7 +7,6 @@ from airflow_kubernetes_job_operator.kube_api.utils import join_locations_list, not_empty_string from airflow_kubernetes_job_operator.kube_api.collections import KubeResourceKind - DEFAULT_KUBE_CONFIG_LOCATIONS: List[str] = join_locations_list( [kube_config.KUBE_CONFIG_DEFAULT_LOCATION], os.environ.get("KUBE_API_DEFAULT_CONFIG_LOCATIONS", None), diff --git a/airflow_kubernetes_job_operator/kube_api/queries.py b/airflow_kubernetes_job_operator/kube_api/queries.py index 89f1ad8..b724dc3 100755 --- a/airflow_kubernetes_job_operator/kube_api/queries.py +++ b/airflow_kubernetes_job_operator/kube_api/queries.py @@ -1,3 +1,4 @@ +import re from logging import Logger import logging from datetime import datetime @@ -29,6 +30,19 @@ def default_detect_log_level(line: "LogLine", msg: str): return logging.INFO +class KubeLogApiEvent(Event): + def __init__( + self, + name: str, + value: str, + line: "LogLine", + sender=None, + ): + super().__init__(name, [], {}, sender=sender) + self.value = value + self.line = line + + class LogLine: show_kubernetes_log_timestamps: bool = False autodetect_kuberentes_log_level: bool = True @@ -53,8 +67,9 @@ def __init__( super().__init__() self.pod_name = pod_name self.namespace = namespace - self.message = message self.container_name = container_name + + self.message = message self.timestamp = timestamp or datetime.now() def log(self, logger: Logger = kube_logger): @@ -68,21 +83,28 @@ def log(self, logger: Logger = kube_logger): msg, ) - def __str__(self): - return self.message - - def __repr__(self): + def get_context_header(self): header_parts = [ f"{self.timestamp}" if self.show_kubernetes_log_timestamps else None, f"{self.namespace}/pods/{self.pod_name}", self.container_name, ] - header = "".join([f"[{p}]" for p in header_parts if p is not None]) - return f"{header}: {self.message}" + return "".join([f"[{p}]" for p in header_parts if p is not None]) + + def __str__(self): + return self.message + + def __repr__(self): + return f"{self.get_context_header()}: {self.message}" class GetPodLogs(KubeApiRestQuery): + enable_kube_api_events: bool = True + api_event_match_regexp: str = r"^\s*[:]{2}kube_api[:]([a-zA-Z0-9_-]+)[=](.*)$" + kube_api_event_name: str = "kube_api_event" + emit_api_events_as_log: bool = False + def __init__( self, name: str, @@ -92,6 +114,8 @@ def __init__( timeout: int = None, container: str = None, add_container_name_to_log: bool = None, + enable_kube_api_events: bool = None, + api_event_match_regexp: str = None, ): """Returns the pod logs for a pod. Can follow the pod logs in real time. @@ -104,9 +128,24 @@ def __init__( follow (bool, optional): If true, keep streaming pod logs. Defaults to False. timeout (int, optional): The read timeout, if specified will error if logs were not returned in time. Defaults to None. + container (str, optional): Read from this specific containter. + add_container_name_to_log (bool, optional): Add containter names to the log line. None = true + if containter is defined. + enable_kube_api_events (bool, optional): Enabled kube api events messaging. + Defaults to GetPodLogs.enable_kube_api_events + api_event_match_regexp (str, optional): Kube api event match regexp. + Must have exactly two match groups (event name, value) + Defaults to GetPodLogs.api_event_match_regexp + + Event binding: + To send a kube api event, place the following string in the pod output log, + """ assert not_empty_string(name), ValueError("name must be a non empty string") assert not_empty_string(namespace), ValueError("namespace must be a non empty string") + assert api_event_match_regexp is None or not_empty_string(api_event_match_regexp), ValueError( + "Event match regexp must me none or a non empty string" + ) assert container is None or not_empty_string(container), ValueError("container must be a non empty string") kind: KubeResourceKind = KubeResourceKind.get_kind("Pod") @@ -137,6 +176,10 @@ def __init__( self.add_container_name_to_log = ( add_container_name_to_log if add_container_name_to_log is not None else container is not None ) + self.enable_kube_api_events = ( + enable_kube_api_events if enable_kube_api_events is not None else GetPodLogs.enable_kube_api_events + ) + self.api_event_match_regexp = api_event_match_regexp or GetPodLogs.api_event_match_regexp def pre_request(self, client: "KubeApiRestClient"): super().pre_request(client) @@ -174,23 +217,52 @@ def on_reconnect(self, client: KubeApiRestClient): self.auto_reconnect = False raise ex + def parse_and_emit_events(self, line: LogLine): + events = re.findall(self.api_event_match_regexp, line.message or "", re.M) + has_events = False + for ev in events: + if not isinstance(ev, tuple) or len(ev) != 2: + continue + has_events = True + event_name = ev[0] + event_value = ev[1] + self.emit( + self.kube_api_event_name, + KubeLogApiEvent( + name=event_name, + value=event_value, + line=line, + sender=self, + ), + ) + + return has_events + def parse_data(self, message_line: str): self._last_timestamp = datetime.now() timestamp = dateutil.parser.isoparse(message_line[: message_line.index(" ")]) message = message_line[message_line.index(" ") + 1 :] # noqa: E203 message = message.replace("\r", "") + lines = [] + for message_line in message.split("\n"): - lines.append( - LogLine( - pod_name=self.name, - namespace=self.namespace, - message=message_line, - timestamp=timestamp, - container_name=self.container if self.add_container_name_to_log else None, - ) + line = LogLine( + pod_name=self.name, + namespace=self.namespace, + message=message_line, + timestamp=timestamp, + container_name=self.container if self.add_container_name_to_log else None, ) + + if self.enable_kube_api_events: + # checking for events and continue if needed. + if self.parse_and_emit_events(line) and not self.emit_api_events_as_log: + continue + + lines.append(line) + return lines def emit_data(self, data): diff --git a/airflow_kubernetes_job_operator/kube_api/utils.py b/airflow_kubernetes_job_operator/kube_api/utils.py index 8553fd2..5053df0 100755 --- a/airflow_kubernetes_job_operator/kube_api/utils.py +++ b/airflow_kubernetes_job_operator/kube_api/utils.py @@ -1,3 +1,4 @@ +import os from logging import Logger from typing import List import logging @@ -29,7 +30,7 @@ def unqiue_with_order(lst) -> list: def not_empty_string(val: str): - """Returns true if the string is not empty (len>0) and not None """ + """Returns true if the string is not empty (len>0) and not None""" return isinstance(val, str) and len(val) > 0 diff --git a/airflow_kubernetes_job_operator/kube_api/watchers.py b/airflow_kubernetes_job_operator/kube_api/watchers.py index 1512a98..45e20aa 100755 --- a/airflow_kubernetes_job_operator/kube_api/watchers.py +++ b/airflow_kubernetes_job_operator/kube_api/watchers.py @@ -17,6 +17,7 @@ from airflow_kubernetes_job_operator.kube_api.queries import ( GetNamespaceResources, GetPodLogs, + KubeLogApiEvent, LogLine, ) @@ -135,6 +136,7 @@ class NamespaceWatchQuery(KubeApiRestQuery): deleted_event_name = "deleted" watch_started_event_name = "watch_started" pod_logs_reader_started_event_name = "pod_logs_reader_started" + pod_log_api_event_name = GetPodLogs.kube_api_event_name def __init__( self, @@ -196,6 +198,9 @@ def watched_objects(self) -> List[NamespaceWatchQueryResourceState]: def emit_log(self, data): self.emit(self.pod_log_event_name, data) + def emit_api_event(self, api_event: KubeLogApiEvent): + self.emit(self.pod_log_api_event_name, api_event) + @thread_synchronized def _create_pod_log_reader( self, @@ -205,7 +210,7 @@ def _create_pod_log_reader( container: str = None, follow=True, is_single=False, - ): + ) -> GetPodLogs: read_logs = GetPodLogs( name=name, namespace=namespace, @@ -261,7 +266,7 @@ def process_data_state(self, data: dict, client: KubeApiRestClient): continue osw = self._object_states.get(uid) - read_logs = self._create_pod_log_reader( + read_logs: GetPodLogs = self._create_pod_log_reader( logger_id=logger_id, name=name, namespace=namesoace, @@ -283,6 +288,9 @@ def handle_error(sender, *args): # binding only relevant events. read_logs.on(read_logs.data_event_name, lambda line: self.emit_log(line)) + read_logs.on( + read_logs.kube_api_event_name, lambda api_event: self.emit_api_event(api_event=api_event) + ) read_logs.on(read_logs.error_event_name, handle_error) client.query_async(read_logs) diff --git a/airflow_kubernetes_job_operator/kubernetes_job_operator.py b/airflow_kubernetes_job_operator/kubernetes_job_operator.py index 0c6c577..cddfcf7 100755 --- a/airflow_kubernetes_job_operator/kubernetes_job_operator.py +++ b/airflow_kubernetes_job_operator/kubernetes_job_operator.py @@ -1,8 +1,9 @@ import jinja2 +import json from typing import List, Union from airflow.utils.decorators import apply_defaults from airflow.operators import BaseOperator -from airflow_kubernetes_job_operator.kube_api import KubeResourceState +from airflow_kubernetes_job_operator.kube_api import KubeResourceState, KubeLogApiEvent from airflow_kubernetes_job_operator.utils import ( to_kubernetes_valid_name, ) @@ -19,6 +20,18 @@ ) +def xcom_value_parser(value: str) -> dict: + value = value.strip() + try: + value: dict = json.loads(value) + assert isinstance(value, dict), "Value must be a json object (dict)" + except Exception as ex: + raise KubernetesJobOperatorException( + "XCom messages (with default parser) must be in json object format:", *ex.args + ) + return value + + class KubernetesJobOperatorDefaultsBase(BaseOperator): @apply_defaults def __init__(self, **kwargs) -> None: @@ -49,6 +62,8 @@ def __init__( validate_body_on_init: bool = DEFAULT_VALIDATE_BODY_ON_INIT, enable_jinja: bool = True, jinja_job_args: dict = None, + on_kube_api_event: callable = None, + parse_xcom_event: xcom_value_parser = xcom_value_parser, **kwargs, ): """A operator that executes an airflow task as a kubernetes Job. @@ -81,6 +96,9 @@ def __init__( command, arguments, image, envs, body, namespace, config_file, cluster_context jinja_job_args {dict} -- A dictionary or object to be used in the jinja template to render arguments. The jinja args are loaded under the keyword "job". + on_kube_api_event {callable, optional} -- a method to catch api events when called. lambda api_event, context: ... + parse_xcom_event {xcom_value_parser, optional} -- parse an incoming xcom event value. + Must return a dictionary with key/value pairs. Auto completed yaml values (if missing): All: @@ -137,6 +155,8 @@ def __init__( self.body = body self.namespace = namespace self.get_logs = get_logs + self.on_kube_api_event = on_kube_api_event + self.parse_xcom_event = parse_xcom_event self.delete_policy = delete_policy # kubernetes config properties. @@ -299,6 +319,23 @@ def pre_execute(self, context): # call parent. return super().pre_execute(context) + def handle_kube_api_event(self, event: KubeLogApiEvent, context): + if self.on_kube_api_event: + self.on_kube_api_event(event, context) + + if event.name == "xcom": + values_dict = self.parse_xcom_event(event.value or "{}") + has_been_updated = False + for key in values_dict.keys(): + self.xcom_push( + context=context, + key=key, + value=values_dict.get(key), + ) + has_been_updated = True + if has_been_updated: + self.log.info(f"XCom updated, keys: " + ", ".join(values_dict.keys())) + def execute(self, context): """Call to execute the kubernetes job. @@ -313,9 +350,14 @@ def execute(self, context): ) self._job_is_executing = True try: + rslt = self.job_runner.execute_job( watcher_start_timeout=self.startup_timeout_seconds, timeout=self._internal_wait_kuberentes_object_timeout, + on_kube_api_event=lambda event: self.handle_kube_api_event( + event=event, + context=context, + ), ) if rslt == KubeResourceState.Failed: diff --git a/airflow_kubernetes_job_operator/kubernetes_legacy_job_operator.py b/airflow_kubernetes_job_operator/kubernetes_legacy_job_operator.py index 18e66c4..0595dcd 100755 --- a/airflow_kubernetes_job_operator/kubernetes_legacy_job_operator.py +++ b/airflow_kubernetes_job_operator/kubernetes_legacy_job_operator.py @@ -3,7 +3,7 @@ from typing import List, Optional, Union from airflow_kubernetes_job_operator.job_runner import JobRunnerDeletePolicy from airflow_kubernetes_job_operator.utils import resolve_relative_path -from airflow_kubernetes_job_operator.kubernetes_job_operator import KubernetesJobOperator +from airflow_kubernetes_job_operator.kubernetes_job_operator import KubernetesJobOperator, xcom_value_parser from airflow_kubernetes_job_operator.config import DEFAULT_VALIDATE_BODY_ON_INIT try: @@ -69,6 +69,8 @@ def __init__( validate_body_on_init: bool = DEFAULT_VALIDATE_BODY_ON_INIT, enable_jinja: bool = True, jinja_job_args: dict = None, + on_kube_api_event: callable = None, + parse_xcom_event: xcom_value_parser = xcom_value_parser, *args, **kwargs, ): @@ -165,6 +167,9 @@ def __init__( (default: {from env/airflow config: AIRFLOW__KUBE_JOB_OPERATOR__validate_body_on_init or False}) jinja_job_args {dict} -- A dictionary or object to be used in the jinja template to render arguments. The jinja args are loaded under the keyword "job". + on_kube_api_event {callable, optional} -- a method to catch api events when called. lambda api_event, context: ... + parse_xcom_event {xcom_value_parser, optional} -- parse an incoming xcom event value. + Must return a dictionary with key/value pairs. """ delete_policy = ( delete_policy or JobRunnerDeletePolicy.IfSucceeded @@ -193,6 +198,8 @@ def __init__( enable_jinja=enable_jinja, image_pull_policy=image_pull_policy, jinja_job_args=jinja_job_args, + on_kube_api_event=on_kube_api_event, + parse_xcom_event=parse_xcom_event, *args, **kwargs, ) diff --git a/tests/dags/test_xcoms.py b/tests/dags/test_xcoms.py new file mode 100644 index 0000000..4e8b83d --- /dev/null +++ b/tests/dags/test_xcoms.py @@ -0,0 +1,43 @@ +from utils import default_args, name_from_file +import logging +from airflow import DAG + +from airflow.models import TaskInstance +from airflow.operators.python_operator import PythonOperator +from airflow_kubernetes_job_operator.kubernetes_legacy_job_operator import KubernetesJobOperator + +dag = DAG( + name_from_file(__file__), + default_args=default_args, + description="Test base job operator", + schedule_interval=None, + catchup=False, +) + + +def print_xcom_pull(ti: TaskInstance, **context): + xcom_value = ti.xcom_pull(task_ids="test_kube_api_event_with_xcom", key="a") + ti.log.info(xcom_value) + + +with dag: + kube_task = KubernetesJobOperator( + task_id="test_kube_api_event_with_xcom", + image="ubuntu:latest", + command=[ + "echo", + '::kube_api:xcom={"a":2,"b":"someval"}', + ], + on_kube_api_event= + ) + + kube_task >> PythonOperator( + task_id="test_pull_xcom", + dag=dag, + python_callable=print_xcom_pull, + provide_context=True, + ) + +if __name__ == "__main__": + dag.clear() + dag.run()