Skip to content

Commit

Permalink
Merge pull request #8 from LamaAni/move_config_to_runner
Browse files Browse the repository at this point in the history
BUG - Error using configuration file - #7
  • Loading branch information
LamaAni authored Jun 19, 2020
2 parents 0c1bc70 + 436c67a commit 4f9d507
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 60 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ Pipefile.lock
# ignore the test airflow db.
tests/airflow.db
tests/logs/
tests/airflow-webserver.pid

# Byte-compiled / optimized / DLL files
__pycache__/
Expand Down
94 changes: 37 additions & 57 deletions airflow_kubernetes_job_operator/job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@
ThreadedKubernetesResourcesWatcher,
)
from airflow_kubernetes_job_operator.event_handler import EventHandler
from airflow_kubernetes_job_operator.utils import randomString, get_yaml_path_value
from airflow_kubernetes_job_operator.utils import (
randomString,
get_yaml_path_value,
)

JOB_RUNNER_INSTANCE_ID_LABEL = "job-runner-instance-id"
KUBERNETES_IN_CLUSTER_SERVICE_ACCOUNT_PATH = (
"/var/run/secrets/kubernetes.io/serviceaccount"
)
KUBERNETES_IN_CLUSTER_SERVICE_ACCOUNT_PATH = "/var/run/secrets/kubernetes.io/serviceaccount"


class JobRunner(EventHandler):
Expand All @@ -31,10 +32,11 @@ def __init__(self):
print(info.status)
print(info.yaml)
"""
self._loaded_config_file = None
super().__init__()

def load_kuberntes_configuration(
self, in_cluster: bool = None, config_file: str = None, context: str = None
self, in_cluster: bool = None, config_file: str = None, context: str = None,
):
"""Loads the appropriate kubernetes configuration into the global
context.
Expand All @@ -48,9 +50,7 @@ def load_kuberntes_configuration(
context {str} -- The context to load. If None loads the current
context (default: {None})
"""
in_cluster = (
in_cluster or os.environ.get("KUBERNETES_SERVICE_HOST", None) is not None
)
in_cluster = in_cluster or os.environ.get("KUBERNETES_SERVICE_HOST", None) is not None

# loading the current config to use.
if in_cluster:
Expand All @@ -61,39 +61,42 @@ def load_kuberntes_configuration(
config_file
), f"Cannot find kubernetes configuration file @ {config_file}"

# NOTE: When loading from a config file there is no way
# (as of the current kubernetes package) to retrieve the loaded
# configuration. As such, kubernetes.config.list_kube_config_contexts
# will not provide the last loaded configuration and will revert
# to the default config. Therefore if a config file is loaded
# into the runner it will be used to retrieve the default namespace
# (or other locations)
self._loaded_config_file = config_file

kubernetes.config.load_kube_config(config_file=config_file, context=context)

@staticmethod
def get_current_namespace():
def get_current_namespace(self):
"""Returns the current namespace.
Returns:
str
"""
namespace = ""
try:
in_cluster_namespace_fpath = os.path.join(
KUBERNETES_IN_CLUSTER_SERVICE_ACCOUNT_PATH, "namespace"
)
in_cluster_namespace_fpath = os.path.join(KUBERNETES_IN_CLUSTER_SERVICE_ACCOUNT_PATH, "namespace")
if os.path.exists(in_cluster_namespace_fpath):
with open(in_cluster_namespace_fpath, "r", encoding="utf-8") as nsfile:
namespace = nsfile.read()
else:
contexts, active_context = kubernetes.config.list_kube_config_contexts()
(contexts, active_context,) = kubernetes.config.list_kube_config_contexts(
config_file=self._loaded_config_file
)
namespace = (
active_context["context"]["namespace"]
if "namespace" in active_context["context"]
else "default"
active_context["context"]["namespace"] if "namespace" in active_context["context"] else "default"
)
except Exception as e:
raise Exception(
"Could not resolve current namespace, you must provide a namespace or a context file",
e,
"Could not resolve current namespace, you must provide a namespace or a context file", e,
)
return namespace

def prepare_job_yaml(
self, job_yaml, random_name_postfix_length: int = 0, force_job_name: str = None
) -> dict:
def prepare_job_yaml(self, job_yaml, random_name_postfix_length: int = 0, force_job_name: str = None,) -> dict:
"""Pre-prepare the job yaml dictionary for execution,
can also accept a string input.
Expand Down Expand Up @@ -133,11 +136,7 @@ def prepare_job_yaml(
return

# make sure the yaml is an dict.
job_yaml = (
copy.deepcopy(job_yaml)
if isinstance(job_yaml, dict)
else yaml.safe_load(job_yaml)
)
job_yaml = copy.deepcopy(job_yaml) if isinstance(job_yaml, dict) else yaml.safe_load(job_yaml)

def get(path_names, default=None):
try:
Expand All @@ -149,13 +148,10 @@ def get(path_names, default=None):

def assert_defined(path_names: list, def_name=None):
path_string = ".".join(map(lambda v: str(v), path_names))
assert (
get(path_names) is not None
), f"job {def_name or path_names[-1]} must be defined @ {path_string}"
assert get(path_names) is not None, f"job {def_name or path_names[-1]} must be defined @ {path_string}"

assert get(["kind"]) == "Job", (
"job_yaml resource must be of 'kind' 'Job', recived "
+ get(["kind"], "[unknown]")
assert get(["kind"]) == "Job", "job_yaml resource must be of 'kind' 'Job', recived " + get(
["kind"], "[unknown]"
)

assert_defined(["metadata", "name"])
Expand All @@ -166,18 +162,15 @@ def assert_defined(path_names: list, def_name=None):
job_yaml["metadata"]["name"] = force_job_name

if random_name_postfix_length > 0:
job_yaml["metadata"]["name"] += "-" + randomString(
random_name_postfix_length
)
job_yaml["metadata"]["name"] += "-" + randomString(random_name_postfix_length)

# assign current namespace if one is not defined.
if "namespace" not in job_yaml["metadata"]:
try:
job_yaml["metadata"]["namespace"] = self.get_current_namespace()
except Exception as ex:
raise Exception(
"Namespace was not provided in yaml and auto namespace resolution failed.",
ex,
"Namespace was not provided in yaml and auto namespace resolution failed.", ex,
)

# FIXME: Should be a better way to add missing values.
Expand All @@ -204,9 +197,7 @@ def assert_defined(path_names: list, def_name=None):

instance_id = randomString(15)
job_yaml["metadata"]["labels"][JOB_RUNNER_INSTANCE_ID_LABEL] = instance_id
job_yaml["spec"]["template"]["metadata"]["labels"][
JOB_RUNNER_INSTANCE_ID_LABEL
] = instance_id
job_yaml["spec"]["template"]["metadata"]["labels"][JOB_RUNNER_INSTANCE_ID_LABEL] = instance_id

return job_yaml

Expand Down Expand Up @@ -239,10 +230,7 @@ def execute_job(
"metadata" in job_yaml
and "labels" in job_yaml["metadata"]
and JOB_RUNNER_INSTANCE_ID_LABEL in job_yaml["metadata"]["labels"]
), (
"job_yaml is not configured correctly, "
+ "did you forget to call JobRunner.prepare_job_yaml?"
)
), ("job_yaml is not configured correctly, " + "did you forget to call JobRunner.prepare_job_yaml?")

metadata = job_yaml["metadata"]
name = metadata["name"]
Expand All @@ -263,34 +251,26 @@ def execute_job(
pass

if status is not None:
raise Exception(
f"Job {name} already exists in namespace {namespace}, cannot exec."
)
raise Exception(f"Job {name} already exists in namespace {namespace}, cannot exec.")

# starting the watcher.
watcher = ThreadedKubernetesNamespaceResourcesWatcher(coreClient)
watcher.auto_watch_pod_logs = read_logs
watcher.remove_deleted_kube_resources_from_memory = False
watcher.pipe(self)
watcher.watch_namespace(
namespace,
label_selector=f"{JOB_RUNNER_INSTANCE_ID_LABEL}={instance_id}",
watch_for_kinds=["Job", "Pod"],
namespace, label_selector=f"{JOB_RUNNER_INSTANCE_ID_LABEL}={instance_id}", watch_for_kinds=["Job", "Pod"],
)

# starting the job
batchClient.create_namespaced_job(namespace, job_yaml)

# wait for job to start
job_watcher = watcher.waitfor_status(
"Job", name, namespace, status="Running", timeout=start_timeout
)
job_watcher = watcher.waitfor_status("Job", name, namespace, status="Running", timeout=start_timeout)
self.emit("job_started", job_watcher, self)

# waiting for the job to completed.
job_watcher = watcher.waitfor_status(
"Job", name, namespace, status_list=["Failed", "Succeeded", "Deleted"]
)
job_watcher = watcher.waitfor_status("Job", name, namespace, status_list=["Failed", "Succeeded", "Deleted"],)

# not need to read status and logs anymore.
watcher.stop()
Expand Down
7 changes: 5 additions & 2 deletions experimental/core_tester/test_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,14 @@ def resource_status_changed(status, sender):


# load kubernetes configuration.
kubernetes.config.load_kube_config()
current_namespace = JobRunner.get_current_namespace()
config_file = "/home/zav/.kube/config_special" # change to test a specific config file
current_context = "docker-desktop" # Change to test a specific context.

# prepare the runner.
runner = JobRunner()
runner.load_kuberntes_configuration(config_file=config_file, context=current_context)
current_namespace = runner.get_current_namespace()
print("Current namespace: " + current_namespace)
runner.on("log", read_pod_log)
runner.on("status", resource_status_changed)

Expand Down
1 change: 0 additions & 1 deletion tests/airflow-webserver.pid

This file was deleted.

19 changes: 19 additions & 0 deletions tests/dags/test_job_operator_config_file.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import os
from airflow import DAG
from airflow_kubernetes_job_operator.kubernetes_job_operator import KubernetesJobOperator
from airflow.utils.dates import days_ago

default_args = {"owner": "tester", "start_date": days_ago(2), "retries": 0}
dag = DAG(
"job-operator-config-file", default_args=default_args, description="Test base job operator", schedule_interval=None,
)

job_task = KubernetesJobOperator(
task_id="test-job",
dag=dag,
image="ubuntu",
in_cluster=False,
cluster_context="docker-desktop",
config_file=os.path.expanduser("~/.kube/config_special"),
command=["bash", "-c", 'echo "all ok"'],
)

0 comments on commit 4f9d507

Please sign in to comment.