Skip to content

Commit

Permalink
Merge pull request #43 from geosolutions-it/ISSUE_42
Browse files Browse the repository at this point in the history
[Fixes #42] Async Request Management Proposal
  • Loading branch information
Alessio Fabiani authored Nov 11, 2019
2 parents 8c9807e + 55407ac commit 89dc0ec
Show file tree
Hide file tree
Showing 20 changed files with 582 additions and 200 deletions.
10 changes: 10 additions & 0 deletions src/wpsremote/busIndependentMessages.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,3 +143,13 @@ def __init__(self, originator, outputs):

def outputs(self):
return self._outputs


class CannotExecuteMessage(BusIndependentMessage):

def __init__(self, originator, outputs):
self.originator = originator
self._outputs = outputs

def outputs(self):
return self._outputs
File renamed without changes.
55 changes: 28 additions & 27 deletions src/wpsremote/processbot.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import busIndependentMessages
import computation_job_inputs
import computational_job_input_actions
import configInstance
import config_instance
import output_parameters
import resource_cleaner
from time import sleep
Expand Down Expand Up @@ -47,7 +47,7 @@ def __init__(self, remote_config_filepath, service_config_filepath, execute_mess
self._input_values = execute_message.variables()

# read remote config
remote_config = configInstance.create(remote_config_filepath)
remote_config = config_instance.create(remote_config_filepath)
bus_class_name = remote_config.get("DEFAULT", "bus_class_name")
uploader_class_name = None
try:
Expand All @@ -66,32 +66,33 @@ def __init__(self, remote_config_filepath, service_config_filepath, execute_mess

# the config file is read with raw=False because the unique_exe_id value
# will be used (interpolated) in the config
serviceConfig = configInstance.create(service_config_filepath,
case_sensitive=True,
variables={
'unique_exe_id': self._uniqueExeId,
'wps_execution_shared_dir': self._wps_execution_shared_dir
},
raw=False)

self.service = serviceConfig.get("DEFAULT", "service") # todo: what is?
self.namespace = serviceConfig.get("DEFAULT", "namespace")
self.description = serviceConfig.get("DEFAULT", "description")
self._active = serviceConfig.get("DEFAULT", "active").lower() == "true" # True

self._executable_path = serviceConfig.get("DEFAULT", "executable_path")
self._executable_cmd = serviceConfig.get("DEFAULT", "executable_cmd")
service_config = config_instance.create(service_config_filepath,
case_sensitive=True,
variables={
'unique_exe_id': self._uniqueExeId,
'wps_execution_shared_dir': self._wps_execution_shared_dir
},
raw=False)

self.service = service_config.get("DEFAULT", "service") # todo: what is?
self.namespace = service_config.get("DEFAULT", "namespace")
self.description = service_config.get("DEFAULT", "description")
self._active = service_config.get("DEFAULT", "active").lower() == "true" # True

self._executable_path = service_config.get("DEFAULT", "executable_path")
self._executable_cmd = service_config.get("DEFAULT", "executable_cmd")
if not os.path.isabs(self._executable_path):
full_executable_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), self._executable_path)
self._executable_cmd = self._executable_cmd.replace(self._executable_path, full_executable_path)
self._executable_path = full_executable_path

self._stdout_parser = serviceConfig.get_list("Logging", "stdout_parser")
self._stdout_action = serviceConfig.get_list("Logging", "stdout_action")
self._output_dir = serviceConfig.get_path("DEFAULT", "output_dir")
self._stdout_parser = service_config.get_list("Logging", "stdout_parser")
self._stdout_action = service_config.get_list("Logging", "stdout_action")
self._output_dir = service_config.get_path("DEFAULT", "output_dir")
if not os.path.isabs(self._output_dir):
self._output_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), self._output_dir)
self._max_running_time = datetime.timedelta(seconds=serviceConfig.getint("DEFAULT", "max_running_time_seconds"))
self._max_running_time = datetime.timedelta(
seconds=service_config.getint("DEFAULT", "max_running_time_seconds"))

# create the concrete uploader object
if uploader_class_name:
Expand All @@ -116,19 +117,19 @@ def __init__(self, remote_config_filepath, service_config_filepath, execute_mess
self._uploader = None

input_sections = OrderedDict()
for input_section in [s for s in serviceConfig.sections() if 'input' in s.lower() or 'const' in s.lower()]:
input_sections[input_section] = serviceConfig.items_without_defaults(input_section, raw=False)
for input_section in [s for s in service_config.sections() if 'input' in s.lower() or 'const' in s.lower()]:
input_sections[input_section] = service_config.items_without_defaults(input_section, raw=False)
self._input_parameters_defs = computation_job_inputs.ComputationJobInputs.create_from_config(input_sections)

output_sections = OrderedDict()
for output_section in [s for s in serviceConfig.sections() if 'output' in s.lower()]:
output_sections[output_section] = serviceConfig.items_without_defaults(output_section, raw=False)
for output_section in [s for s in service_config.sections() if 'output' in s.lower()]:
output_sections[output_section] = service_config.items_without_defaults(output_section, raw=False)
self._output_parameters_defs = output_parameters.OutputParameters.create_from_config(
output_sections, self._wps_execution_shared_dir, self._uploader)

action_sections = OrderedDict()
for action_section in [s for s in serviceConfig.sections() if 'action' in s.lower()]:
action_sections[action_section] = serviceConfig.items_without_defaults(action_section, raw=False)
for action_section in [s for s in service_config.sections() if 'action' in s.lower()]:
action_sections[action_section] = service_config.items_without_defaults(action_section, raw=False)
self._input_params_actions = computational_job_input_actions.ComputationalJobInputActions.create_from_config(
action_sections)

Expand Down
8 changes: 4 additions & 4 deletions src/wpsremote/resource_cleaner.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# This code is licensed under the GPL 2.0 license, available at the root
# application directory.

import configInstance
import config_instance
import os
import path
import unittest
Expand Down Expand Up @@ -160,7 +160,7 @@ def read(self):
self.read_from_file(self.filepath())

def read_from_file(self, filepath):
config = configInstance.create(filepath, raw=True) # todo: use file lock
config = config_instance.create(filepath, raw=True) # todo: use file lock
self._start_time = config.get("DEFAULT", "start_time")
try:
self._start_time = datetime.datetime.strptime(self._start_time, "%Y-%m-%dT%H:%M:%S")
Expand Down Expand Up @@ -322,7 +322,7 @@ def test_workflow(self):

import subprocess
import random
sendbox_root = path.path("./")
sandbox_root = path.path("./")

Resource.sleep_time_seconds = datetime.timedelta(seconds=5)
Resource.process_time_threshold = datetime.timedelta(seconds=1)
Expand All @@ -338,7 +338,7 @@ def test_workflow(self):
stderr=subprocess.STDOUT)
prcbot_pid = invoked_process.pid
unique_id = str(random.randint(1, 1000)).zfill(5)
sendbox_path = sendbox_root / str(unique_id)
sendbox_path = sandbox_root / str(unique_id)

r = Resource()
r.set_from_servicebot(unique_id, sendbox_path)
Expand Down
81 changes: 69 additions & 12 deletions src/wpsremote/resource_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,52 @@
logger = logging.getLogger("servicebot.resource_monitor")


class ProcessWeight(object):

weight = 0
coefficient = 1.0

def __init__(self, process_weight):
if process_weight:
self.weight = float(process_weight['weight'])
self.coefficient = float(process_weight['coefficient'])

# ability to customize process load on per request basis
def request_weight(self, exec_request):
# this one is the default implementation
return (self.coefficient * self.weight)


class ResourceMonitor(threading.Thread):

# Total Capacity of this machine
capacity = 100

# Current load
load = 0
proc_load = 0
resource_load = 0
running_procs_load = {}

load_threshold = 95
load_average_scan_minutes = 15
cores = psutil.cpu_count()

try:
cores = psutil.cpu_count()
except BaseException:
cores = 1
cpu_perc = []
vmem_perc = []

lock = threading.Lock()

def __init__(self, load_average_scan_minutes):
def __init__(self, capacity, load_threshold, load_average_scan_minutes):
threading.Thread.__init__(self)

ResourceMonitor.capacity = capacity
ResourceMonitor.load_threshold = load_threshold
ResourceMonitor.load_average_scan_minutes = load_average_scan_minutes

ResourceMonitor.lock.acquire()

ResourceMonitor.vmem_perc.append(psutil.virtual_memory().percent)
Expand Down Expand Up @@ -64,25 +99,47 @@ def proc_is_running(self, proc_defs):
('name' in _p and _p['name'] in name) and \
('cwd' in _p and _p['cwd'] in path) and \
('cmdline' in _p and _p['cmdline'] in cmdline):
ResourceMonitor.proc_load = 100
return True
except BaseException:
import traceback
tb = traceback.format_exc()
logger.debug(tb)
# print(tb)

ResourceMonitor.proc_load = 0
return False

def update_stats(self):
ResourceMonitor.lock.acquire()

ResourceMonitor.vmem_perc[1] = (ResourceMonitor.vmem_perc[0] + ResourceMonitor.vmem_perc[1]) / 2.0
ResourceMonitor.vmem_perc[0] = (ResourceMonitor.vmem_perc[1] + psutil.virtual_memory().percent) / 2.0

ResourceMonitor.cpu_perc[1] = ResourceMonitor.cpu_perc[0]
ResourceMonitor.cpu_perc[0] = psutil.cpu_percent(
interval=(ResourceMonitor.load_average_scan_minutes*60), percpu=False)

ResourceMonitor.lock.release()
try:
# Acquiring thread lock
ResourceMonitor.lock.acquire()

# Used memory perc
ResourceMonitor.vmem_perc[1] = (ResourceMonitor.vmem_perc[0] + ResourceMonitor.vmem_perc[1]) / 2.0
ResourceMonitor.vmem_perc[0] = (ResourceMonitor.vmem_perc[1] + psutil.virtual_memory().percent) / 2.0

# Used cpu perc
ResourceMonitor.cpu_perc[1] = ResourceMonitor.cpu_perc[0]
ResourceMonitor.cpu_perc[0] = psutil.cpu_percent(
interval=(ResourceMonitor.load_average_scan_minutes*60), percpu=False)

vmem = psutil.virtual_memory().percent
if ResourceMonitor.vmem_perc[0] > 0:
vmem = (vmem + ResourceMonitor.vmem_perc[0]) / 2.0

loadavg = psutil.cpu_percent(interval=0, percpu=False)
if ResourceMonitor.cpu_perc[0] > 0:
loadavg = (loadavg + ResourceMonitor.cpu_perc[0]) / 2.0

if vmem > ResourceMonitor.load_threshold or loadavg > ResourceMonitor.load_threshold:
ResourceMonitor.resource_load = 100
else:
ResourceMonitor.resource_load = 0

finally:
# Releaseing thread lock
ResourceMonitor.lock.release()

def run(self):
while True:
Expand Down
Loading

0 comments on commit 89dc0ec

Please sign in to comment.