diff --git a/.gitignore b/.gitignore index 804f3e6..eb0b8f2 100644 --- a/.gitignore +++ b/.gitignore @@ -6,6 +6,7 @@ Pipefile.lock # ignore the test airflow db. tests/airflow.db tests/logs/ +tests/airflow-webserver.pid # Byte-compiled / optimized / DLL files __pycache__/ diff --git a/airflow_kubernetes_job_operator/job_runner.py b/airflow_kubernetes_job_operator/job_runner.py index acb2ae6..3834d41 100644 --- a/airflow_kubernetes_job_operator/job_runner.py +++ b/airflow_kubernetes_job_operator/job_runner.py @@ -7,12 +7,13 @@ ThreadedKubernetesResourcesWatcher, ) from airflow_kubernetes_job_operator.event_handler import EventHandler -from airflow_kubernetes_job_operator.utils import randomString, get_yaml_path_value +from airflow_kubernetes_job_operator.utils import ( + randomString, + get_yaml_path_value, +) JOB_RUNNER_INSTANCE_ID_LABEL = "job-runner-instance-id" -KUBERNETES_IN_CLUSTER_SERVICE_ACCOUNT_PATH = ( - "/var/run/secrets/kubernetes.io/serviceaccount" -) +KUBERNETES_IN_CLUSTER_SERVICE_ACCOUNT_PATH = "/var/run/secrets/kubernetes.io/serviceaccount" class JobRunner(EventHandler): @@ -31,10 +32,11 @@ def __init__(self): print(info.status) print(info.yaml) """ + self._loaded_config_file = None super().__init__() def load_kuberntes_configuration( - self, in_cluster: bool = None, config_file: str = None, context: str = None + self, in_cluster: bool = None, config_file: str = None, context: str = None, ): """Loads the appropriate kubernetes configuration into the global context. @@ -48,9 +50,7 @@ def load_kuberntes_configuration( context {str} -- The context to load. If None loads the current context (default: {None}) """ - in_cluster = ( - in_cluster or os.environ.get("KUBERNETES_SERVICE_HOST", None) is not None - ) + in_cluster = in_cluster or os.environ.get("KUBERNETES_SERVICE_HOST", None) is not None # loading the current config to use. if in_cluster: @@ -61,39 +61,42 @@ def load_kuberntes_configuration( config_file ), f"Cannot find kubernetes configuration file @ {config_file}" + # NOTE: When loading from a config file there is no way + # (as of the current kubernetes package) to retrieve the loaded + # configuration. As such, kubernetes.config.list_kube_config_contexts + # will not provide the last loaded configuration and will revert + # to the default config. Therefore if a config file is loaded + # into the runner it will be used to retrieve the default namespace + # (or other locations) + self._loaded_config_file = config_file + kubernetes.config.load_kube_config(config_file=config_file, context=context) - @staticmethod - def get_current_namespace(): + def get_current_namespace(self): """Returns the current namespace. Returns: str """ namespace = "" try: - in_cluster_namespace_fpath = os.path.join( - KUBERNETES_IN_CLUSTER_SERVICE_ACCOUNT_PATH, "namespace" - ) + in_cluster_namespace_fpath = os.path.join(KUBERNETES_IN_CLUSTER_SERVICE_ACCOUNT_PATH, "namespace") if os.path.exists(in_cluster_namespace_fpath): with open(in_cluster_namespace_fpath, "r", encoding="utf-8") as nsfile: namespace = nsfile.read() else: - contexts, active_context = kubernetes.config.list_kube_config_contexts() + (contexts, active_context,) = kubernetes.config.list_kube_config_contexts( + config_file=self._loaded_config_file + ) namespace = ( - active_context["context"]["namespace"] - if "namespace" in active_context["context"] - else "default" + active_context["context"]["namespace"] if "namespace" in active_context["context"] else "default" ) except Exception as e: raise Exception( - "Could not resolve current namespace, you must provide a namespace or a context file", - e, + "Could not resolve current namespace, you must provide a namespace or a context file", e, ) return namespace - def prepare_job_yaml( - self, job_yaml, random_name_postfix_length: int = 0, force_job_name: str = None - ) -> dict: + def prepare_job_yaml(self, job_yaml, random_name_postfix_length: int = 0, force_job_name: str = None,) -> dict: """Pre-prepare the job yaml dictionary for execution, can also accept a string input. @@ -133,11 +136,7 @@ def prepare_job_yaml( return # make sure the yaml is an dict. - job_yaml = ( - copy.deepcopy(job_yaml) - if isinstance(job_yaml, dict) - else yaml.safe_load(job_yaml) - ) + job_yaml = copy.deepcopy(job_yaml) if isinstance(job_yaml, dict) else yaml.safe_load(job_yaml) def get(path_names, default=None): try: @@ -149,13 +148,10 @@ def get(path_names, default=None): def assert_defined(path_names: list, def_name=None): path_string = ".".join(map(lambda v: str(v), path_names)) - assert ( - get(path_names) is not None - ), f"job {def_name or path_names[-1]} must be defined @ {path_string}" + assert get(path_names) is not None, f"job {def_name or path_names[-1]} must be defined @ {path_string}" - assert get(["kind"]) == "Job", ( - "job_yaml resource must be of 'kind' 'Job', recived " - + get(["kind"], "[unknown]") + assert get(["kind"]) == "Job", "job_yaml resource must be of 'kind' 'Job', recived " + get( + ["kind"], "[unknown]" ) assert_defined(["metadata", "name"]) @@ -166,9 +162,7 @@ def assert_defined(path_names: list, def_name=None): job_yaml["metadata"]["name"] = force_job_name if random_name_postfix_length > 0: - job_yaml["metadata"]["name"] += "-" + randomString( - random_name_postfix_length - ) + job_yaml["metadata"]["name"] += "-" + randomString(random_name_postfix_length) # assign current namespace if one is not defined. if "namespace" not in job_yaml["metadata"]: @@ -176,8 +170,7 @@ def assert_defined(path_names: list, def_name=None): job_yaml["metadata"]["namespace"] = self.get_current_namespace() except Exception as ex: raise Exception( - "Namespace was not provided in yaml and auto namespace resolution failed.", - ex, + "Namespace was not provided in yaml and auto namespace resolution failed.", ex, ) # FIXME: Should be a better way to add missing values. @@ -204,9 +197,7 @@ def assert_defined(path_names: list, def_name=None): instance_id = randomString(15) job_yaml["metadata"]["labels"][JOB_RUNNER_INSTANCE_ID_LABEL] = instance_id - job_yaml["spec"]["template"]["metadata"]["labels"][ - JOB_RUNNER_INSTANCE_ID_LABEL - ] = instance_id + job_yaml["spec"]["template"]["metadata"]["labels"][JOB_RUNNER_INSTANCE_ID_LABEL] = instance_id return job_yaml @@ -239,10 +230,7 @@ def execute_job( "metadata" in job_yaml and "labels" in job_yaml["metadata"] and JOB_RUNNER_INSTANCE_ID_LABEL in job_yaml["metadata"]["labels"] - ), ( - "job_yaml is not configured correctly, " - + "did you forget to call JobRunner.prepare_job_yaml?" - ) + ), ("job_yaml is not configured correctly, " + "did you forget to call JobRunner.prepare_job_yaml?") metadata = job_yaml["metadata"] name = metadata["name"] @@ -263,9 +251,7 @@ def execute_job( pass if status is not None: - raise Exception( - f"Job {name} already exists in namespace {namespace}, cannot exec." - ) + raise Exception(f"Job {name} already exists in namespace {namespace}, cannot exec.") # starting the watcher. watcher = ThreadedKubernetesNamespaceResourcesWatcher(coreClient) @@ -273,24 +259,18 @@ def execute_job( watcher.remove_deleted_kube_resources_from_memory = False watcher.pipe(self) watcher.watch_namespace( - namespace, - label_selector=f"{JOB_RUNNER_INSTANCE_ID_LABEL}={instance_id}", - watch_for_kinds=["Job", "Pod"], + namespace, label_selector=f"{JOB_RUNNER_INSTANCE_ID_LABEL}={instance_id}", watch_for_kinds=["Job", "Pod"], ) # starting the job batchClient.create_namespaced_job(namespace, job_yaml) # wait for job to start - job_watcher = watcher.waitfor_status( - "Job", name, namespace, status="Running", timeout=start_timeout - ) + job_watcher = watcher.waitfor_status("Job", name, namespace, status="Running", timeout=start_timeout) self.emit("job_started", job_watcher, self) # waiting for the job to completed. - job_watcher = watcher.waitfor_status( - "Job", name, namespace, status_list=["Failed", "Succeeded", "Deleted"] - ) + job_watcher = watcher.waitfor_status("Job", name, namespace, status_list=["Failed", "Succeeded", "Deleted"],) # not need to read status and logs anymore. watcher.stop() diff --git a/experimental/core_tester/test_job_runner.py b/experimental/core_tester/test_job_runner.py index 3db3aea..afbc933 100644 --- a/experimental/core_tester/test_job_runner.py +++ b/experimental/core_tester/test_job_runner.py @@ -27,11 +27,14 @@ def resource_status_changed(status, sender): # load kubernetes configuration. -kubernetes.config.load_kube_config() -current_namespace = JobRunner.get_current_namespace() +config_file = "/home/zav/.kube/config_special" # change to test a specific config file +current_context = "docker-desktop" # Change to test a specific context. # prepare the runner. runner = JobRunner() +runner.load_kuberntes_configuration(config_file=config_file, context=current_context) +current_namespace = runner.get_current_namespace() +print("Current namespace: " + current_namespace) runner.on("log", read_pod_log) runner.on("status", resource_status_changed) diff --git a/tests/airflow-webserver.pid b/tests/airflow-webserver.pid deleted file mode 100644 index 8ec1add..0000000 --- a/tests/airflow-webserver.pid +++ /dev/null @@ -1 +0,0 @@ -683 diff --git a/tests/dags/test_job_operator_config_file.py b/tests/dags/test_job_operator_config_file.py new file mode 100644 index 0000000..56f1be6 --- /dev/null +++ b/tests/dags/test_job_operator_config_file.py @@ -0,0 +1,19 @@ +import os +from airflow import DAG +from airflow_kubernetes_job_operator.kubernetes_job_operator import KubernetesJobOperator +from airflow.utils.dates import days_ago + +default_args = {"owner": "tester", "start_date": days_ago(2), "retries": 0} +dag = DAG( + "job-operator-config-file", default_args=default_args, description="Test base job operator", schedule_interval=None, +) + +job_task = KubernetesJobOperator( + task_id="test-job", + dag=dag, + image="ubuntu", + in_cluster=False, + cluster_context="docker-desktop", + config_file=os.path.expanduser("~/.kube/config_special"), + command=["bash", "-c", 'echo "all ok"'], +)