-
Notifications
You must be signed in to change notification settings - Fork 0
/
knime_rest_api.py
147 lines (135 loc) · 4.88 KB
/
knime_rest_api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
import logging
from typing import Callable
import requests
class KnimeRestApi:
"""
Class to interact with KNIME Rest API
"""
def __init__(self, config: dict, logger: logging.Logger) -> None:
"""
Constructor
:param config: configuration settings
"""
self.base_url = config["knime_rest_api_base_url"]
self.headers = {"accept": "application/vnd.mason+json"}
self.auth = (config["knime_rest_api_user"], config["knime_rest_api_password"]) # Basic authentication
self.verify = config["ca_cert_file"]
self.timeout = config["knime_rest_api_timeout_seconds"]
self.logger = logger
def _perform_api_call(self, api_call: Callable, url: str, **kwargs) -> requests.Response:
"""
Perform API call and log possible errors.
:param api_call: requests.get / requests.put / requests.delete
:param url: endpoint to perform the API call
:param kwargs: arguments for the API call
:return: response from the API call
"""
try:
self.logger.debug(f"Performing API call to {url}")
response = api_call(url, **kwargs)
if not response.ok:
self.logger.error(f"API call response not OK: [{response.status_code}] {response.text}")
raise
return response
except Exception as e:
self.logger.exception(f"Error exception reaching the API: {e}")
raise e
def list_jobs(self) -> dict:
"""
List all jobs managed by this KNIME Server
:return: jobs information in dict
"""
response = self._perform_api_call(
requests.get,
url=f"{self.base_url}/jobs/",
headers=self.headers,
auth=self.auth,
verify=self.verify,
timeout=self.timeout
)
return response.json()
def get_job_info(self, job_id: str) -> dict:
"""
Get job information from a finished job_id
:param job_id: job_id
:return: job information in dict
"""
response = self._perform_api_call(
requests.get,
url=f"{self.base_url}/jobs/{job_id}",
headers=self.headers,
auth=self.auth,
verify=self.verify,
timeout=self.timeout
)
return response.json()
def get_workflow_summary(self, job_id: str) -> dict:
"""
Get workflow summary from a finished job_id
:return: workflow summary in dict
"""
response = self._perform_api_call(
requests.get,
url=f"{self.base_url}/jobs/{job_id}/workflow-summary?format=JSON&includeExecutionInfo=true",
headers={"accept": "application/json"},
auth=self.auth,
verify=self.verify,
timeout=self.timeout
)
return response.json()
def download_workflow_data(self, path: str) -> bytes:
"""
Download workflow with summary and execution statistics from given path
:param path: workflow path
:return: workflow .knwf file from response as bytes
"""
response = self._perform_api_call(
requests.get,
url=f"{self.base_url}/repository/{path}:data",
headers=self.headers,
auth=self.auth,
verify=self.verify,
timeout=self.timeout
)
return response.content
def trigger_swap(self, job_id: str) -> None:
"""
Trigger swap & creation of workflow summary.
This is done to force the swap to reduce the waiting time and ensure the workflow summary is contained
:param job_id: job_id
"""
self._perform_api_call(
requests.put,
url=f"{self.base_url}/jobs/{job_id}/swap",
headers=self.headers,
auth=self.auth,
verify=self.verify,
timeout=self.timeout
)
def copy_job_in_repo(self, job_id: str, path: str) -> None:
"""
Copy job as workflow in server repository path, so users can't delete the job
:param job_id: job_id to be copied as workflow
:param path: server path to store the workflow files
"""
self._perform_api_call(
requests.put,
url=f"{self.base_url}/repository/{path}:data?from-job={job_id}",
headers=self.headers,
auth=self.auth,
verify=self.verify,
timeout=self.timeout
)
def delete_workflow_data(self, path: str) -> None:
"""
Delete workflow data stored in path
:param path: workflow path
"""
self._perform_api_call(
requests.delete,
url=f"{self.base_url}/repository/{path}",
headers=self.headers,
auth=self.auth,
verify=self.verify,
timeout=self.timeout
)