Skip to content

Commit

Permalink
Merge pull request #41 from LamaAni/xcoms
Browse files Browse the repository at this point in the history
Added xcom support
  • Loading branch information
LamaAni authored May 1, 2021
2 parents 6a5f65d + 89d0d9d commit 136ae61
Show file tree
Hide file tree
Showing 10 changed files with 244 additions and 30 deletions.
22 changes: 16 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ Contributions are welcome. See [here](https://docs.github.com/en/free-pro-team@l
1. Full kubernetes error logs on failure.
1. Integrated operator airflow config, see below.
1. Integrated Jinja2 support for file templates with flag.
1. XCom
1. Log based events
1. Tested and working on [google cloud composer](https://cloud.google.com/composer).

### Two operator classes are available
Expand Down Expand Up @@ -195,17 +197,25 @@ startup_timeout_seconds | 10 | The max number of seconds to create the job befor
validate_body_on_init | False | Can be set to true only if jinja is disabled. Process the yaml when the object is created.
enable_jinja| True | Enable jinja on the body (str, or file), and the following args: command, arguments, image, envs, body, namespace, config_file, cluster_context
jinja_job_args | None | A dictionary or object to be used in the jinja template to render arguments. The jinja args are loaded under the keyword "job".
on_kube_api_event | None | A method to capture kube api log events. By default is none. log output pattern: `::kube_api:[name]=value`
parse_xcom_event| json parser | Parse the result of xcom events, `::kube_api:xcom={json values...}`

# Contribution
# XCom

Are welcome, please post issues or PR's if needed.
The implementation of XCom via the KubernetesJobOperator differes from the one by KuberenetesPodsOperator,
and therefore no backward compatible implementation for XCom currently exists.

To use xcom with KubernetesJobOperator simply add a log line to your pod log output,

```shell
::kube_api:xcom={"a":2,"[key]":"[value]"}
```

# Implementations still missing:
Note the value of the xcom must be in json format (for the default parser).

Add an issue (or better submit PR) if you need these.
# Contribution

1. XCom
1. Examples (other than TL;DR)
Are welcome, please post issues or PR's if needed.

# Licence

Expand Down
24 changes: 21 additions & 3 deletions airflow_kubernetes_job_operator/config.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import logging
from typing import Type, Dict
from enum import Enum
from airflow_kubernetes_job_operator.kube_api import config as kube_api_config
from airflow_kubernetes_job_operator.kube_api.utils import not_empty_string
from airflow_kubernetes_job_operator.kube_api.config import DEFAULT_KUBE_CONFIG_LOCATIONS
from airflow_kubernetes_job_operator.kube_api.queries import LogLine
from airflow_kubernetes_job_operator.kube_api.queries import LogLine, GetPodLogs
from airflow_kubernetes_job_operator.utils import resolve_path
from airflow.configuration import conf, log
from airflow.exceptions import AirflowConfigException
Expand Down Expand Up @@ -97,4 +97,22 @@ def get(
log = loc.strip()
if len(loc) == 0:
continue
DEFAULT_KUBE_CONFIG_LOCATIONS.insert(0, loc)
kube_api_config.DEFAULT_KUBE_CONFIG_LOCATIONS.insert(0, loc)

GetPodLogs.enable_kube_api_events = get(
"log_query_enable_api_events",
default=GetPodLogs.enable_kube_api_events,
otype=bool,
)

GetPodLogs.api_event_match_regexp = get(
"log_query_api_event_match_regexp",
default=GetPodLogs.api_event_match_regexp,
otype=str,
)

GetPodLogs.emit_api_events_as_log = get(
"log_query_emit_api_events_as_log",
default=GetPodLogs.emit_api_events_as_log,
otype=bool,
)
14 changes: 14 additions & 0 deletions airflow_kubernetes_job_operator/job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@
from airflow_kubernetes_job_operator.collections import JobRunnerDeletePolicy, JobRunnerException
from airflow_kubernetes_job_operator.config import SHOW_RUNNER_ID_IN_LOGS
from airflow_kubernetes_job_operator.kube_api import (
EventHandler,
KubeApiConfiguration,
GetAPIVersions,
KubeLogApiEvent,
KubeApiRestQuery,
KubeApiRestClient,
KubeResourceKind,
Expand Down Expand Up @@ -256,12 +258,14 @@ def execute_job(
self,
timeout: int = 60 * 5,
watcher_start_timeout: int = 10,
on_kube_api_event: callable = None,
):
"""Execute the job
Args:
timeout (int, optional): Execution timeout. Defaults to 60*5.
watcher_start_timeout (int, optional): The timeout to start watching the namespaces. Defaults to 10.
on_kube_api_event(callable, optional): Handle kube api events.
Returns:
KubeResourceState: The main resource (resources[0]) final state.
Expand Down Expand Up @@ -329,6 +333,16 @@ def execute_job(
label_selector=self.job_label_selector,
)

def handle_kube_api_event(event: KubeLogApiEvent):
self.log(f"{event.line.get_context_header()} KubeApiEvent {event.name} of {len(event.value)} [chars]")
if on_kube_api_event:
on_kube_api_event(event)

watcher.on(
watcher.pod_log_api_event_name,
handle_kube_api_event,
)

# start the watcher
if self.show_watcher_logs:
watcher.pipe_to_logger(self.logger)
Expand Down
1 change: 0 additions & 1 deletion airflow_kubernetes_job_operator/kube_api/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from airflow_kubernetes_job_operator.kube_api.utils import join_locations_list, not_empty_string
from airflow_kubernetes_job_operator.kube_api.collections import KubeResourceKind


DEFAULT_KUBE_CONFIG_LOCATIONS: List[str] = join_locations_list(
[kube_config.KUBE_CONFIG_DEFAULT_LOCATION],
os.environ.get("KUBE_API_DEFAULT_CONFIG_LOCATIONS", None),
Expand Down
102 changes: 87 additions & 15 deletions airflow_kubernetes_job_operator/kube_api/queries.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import re
from logging import Logger
import logging
from datetime import datetime
Expand Down Expand Up @@ -29,6 +30,19 @@ def default_detect_log_level(line: "LogLine", msg: str):
return logging.INFO


class KubeLogApiEvent(Event):
def __init__(
self,
name: str,
value: str,
line: "LogLine",
sender=None,
):
super().__init__(name, [], {}, sender=sender)
self.value = value
self.line = line


class LogLine:
show_kubernetes_log_timestamps: bool = False
autodetect_kuberentes_log_level: bool = True
Expand All @@ -53,8 +67,9 @@ def __init__(
super().__init__()
self.pod_name = pod_name
self.namespace = namespace
self.message = message
self.container_name = container_name

self.message = message
self.timestamp = timestamp or datetime.now()

def log(self, logger: Logger = kube_logger):
Expand All @@ -68,21 +83,28 @@ def log(self, logger: Logger = kube_logger):
msg,
)

def __str__(self):
return self.message

def __repr__(self):
def get_context_header(self):
header_parts = [
f"{self.timestamp}" if self.show_kubernetes_log_timestamps else None,
f"{self.namespace}/pods/{self.pod_name}",
self.container_name,
]

header = "".join([f"[{p}]" for p in header_parts if p is not None])
return f"{header}: {self.message}"
return "".join([f"[{p}]" for p in header_parts if p is not None])

def __str__(self):
return self.message

def __repr__(self):
return f"{self.get_context_header()}: {self.message}"


class GetPodLogs(KubeApiRestQuery):
enable_kube_api_events: bool = True
api_event_match_regexp: str = r"^\s*[:]{2}kube_api[:]([a-zA-Z0-9_-]+)[=](.*)$"
kube_api_event_name: str = "kube_api_event"
emit_api_events_as_log: bool = False

def __init__(
self,
name: str,
Expand All @@ -92,6 +114,8 @@ def __init__(
timeout: int = None,
container: str = None,
add_container_name_to_log: bool = None,
enable_kube_api_events: bool = None,
api_event_match_regexp: str = None,
):
"""Returns the pod logs for a pod. Can follow the pod logs
in real time.
Expand All @@ -104,9 +128,24 @@ def __init__(
follow (bool, optional): If true, keep streaming pod logs. Defaults to False.
timeout (int, optional): The read timeout, if specified will error if logs were not
returned in time. Defaults to None.
container (str, optional): Read from this specific containter.
add_container_name_to_log (bool, optional): Add containter names to the log line. None = true
if containter is defined.
enable_kube_api_events (bool, optional): Enabled kube api events messaging.
Defaults to GetPodLogs.enable_kube_api_events
api_event_match_regexp (str, optional): Kube api event match regexp.
Must have exactly two match groups (event name, value)
Defaults to GetPodLogs.api_event_match_regexp
Event binding:
To send a kube api event, place the following string in the pod output log,
"""
assert not_empty_string(name), ValueError("name must be a non empty string")
assert not_empty_string(namespace), ValueError("namespace must be a non empty string")
assert api_event_match_regexp is None or not_empty_string(api_event_match_regexp), ValueError(
"Event match regexp must me none or a non empty string"
)
assert container is None or not_empty_string(container), ValueError("container must be a non empty string")

kind: KubeResourceKind = KubeResourceKind.get_kind("Pod")
Expand Down Expand Up @@ -137,6 +176,10 @@ def __init__(
self.add_container_name_to_log = (
add_container_name_to_log if add_container_name_to_log is not None else container is not None
)
self.enable_kube_api_events = (
enable_kube_api_events if enable_kube_api_events is not None else GetPodLogs.enable_kube_api_events
)
self.api_event_match_regexp = api_event_match_regexp or GetPodLogs.api_event_match_regexp

def pre_request(self, client: "KubeApiRestClient"):
super().pre_request(client)
Expand Down Expand Up @@ -174,23 +217,52 @@ def on_reconnect(self, client: KubeApiRestClient):
self.auto_reconnect = False
raise ex

def parse_and_emit_events(self, line: LogLine):
events = re.findall(self.api_event_match_regexp, line.message or "", re.M)
has_events = False
for ev in events:
if not isinstance(ev, tuple) or len(ev) != 2:
continue
has_events = True
event_name = ev[0]
event_value = ev[1]
self.emit(
self.kube_api_event_name,
KubeLogApiEvent(
name=event_name,
value=event_value,
line=line,
sender=self,
),
)

return has_events

def parse_data(self, message_line: str):
self._last_timestamp = datetime.now()
timestamp = dateutil.parser.isoparse(message_line[: message_line.index(" ")])

message = message_line[message_line.index(" ") + 1 :] # noqa: E203
message = message.replace("\r", "")

lines = []

for message_line in message.split("\n"):
lines.append(
LogLine(
pod_name=self.name,
namespace=self.namespace,
message=message_line,
timestamp=timestamp,
container_name=self.container if self.add_container_name_to_log else None,
)
line = LogLine(
pod_name=self.name,
namespace=self.namespace,
message=message_line,
timestamp=timestamp,
container_name=self.container if self.add_container_name_to_log else None,
)

if self.enable_kube_api_events:
# checking for events and continue if needed.
if self.parse_and_emit_events(line) and not self.emit_api_events_as_log:
continue

lines.append(line)

return lines

def emit_data(self, data):
Expand Down
3 changes: 2 additions & 1 deletion airflow_kubernetes_job_operator/kube_api/utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os
from logging import Logger
from typing import List
import logging
Expand Down Expand Up @@ -29,7 +30,7 @@ def unqiue_with_order(lst) -> list:


def not_empty_string(val: str):
"""Returns true if the string is not empty (len>0) and not None """
"""Returns true if the string is not empty (len>0) and not None"""
return isinstance(val, str) and len(val) > 0


Expand Down
12 changes: 10 additions & 2 deletions airflow_kubernetes_job_operator/kube_api/watchers.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from airflow_kubernetes_job_operator.kube_api.queries import (
GetNamespaceResources,
GetPodLogs,
KubeLogApiEvent,
LogLine,
)

Expand Down Expand Up @@ -135,6 +136,7 @@ class NamespaceWatchQuery(KubeApiRestQuery):
deleted_event_name = "deleted"
watch_started_event_name = "watch_started"
pod_logs_reader_started_event_name = "pod_logs_reader_started"
pod_log_api_event_name = GetPodLogs.kube_api_event_name

def __init__(
self,
Expand Down Expand Up @@ -196,6 +198,9 @@ def watched_objects(self) -> List[NamespaceWatchQueryResourceState]:
def emit_log(self, data):
self.emit(self.pod_log_event_name, data)

def emit_api_event(self, api_event: KubeLogApiEvent):
self.emit(self.pod_log_api_event_name, api_event)

@thread_synchronized
def _create_pod_log_reader(
self,
Expand All @@ -205,7 +210,7 @@ def _create_pod_log_reader(
container: str = None,
follow=True,
is_single=False,
):
) -> GetPodLogs:
read_logs = GetPodLogs(
name=name,
namespace=namespace,
Expand Down Expand Up @@ -261,7 +266,7 @@ def process_data_state(self, data: dict, client: KubeApiRestClient):
continue

osw = self._object_states.get(uid)
read_logs = self._create_pod_log_reader(
read_logs: GetPodLogs = self._create_pod_log_reader(
logger_id=logger_id,
name=name,
namespace=namesoace,
Expand All @@ -283,6 +288,9 @@ def handle_error(sender, *args):

# binding only relevant events.
read_logs.on(read_logs.data_event_name, lambda line: self.emit_log(line))
read_logs.on(
read_logs.kube_api_event_name, lambda api_event: self.emit_api_event(api_event=api_event)
)
read_logs.on(read_logs.error_event_name, handle_error)
client.query_async(read_logs)

Expand Down
Loading

0 comments on commit 136ae61

Please sign in to comment.