Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix several issues #12

Merged
merged 12 commits into from
May 1, 2024
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

* Added the option to pass arguments into the long running task of a background worker.
* Added the option to manually control when the background worker task is set to **Done**.
* Added dispose function to control resource deallocation in a background worker.

### Changed

* Set background threads in the background worker as daemon threads to prevent blocking the main thread.
* Changed base class of `Message` from `UserDict` to `object` because in IronPython 2.7 `UserDict` is an old-style class. The behavior of dictionary-like is still preserved.

### Removed


Expand Down
Binary file modified docs/_images/background-task.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
48 changes: 39 additions & 9 deletions src/compas_eve/core.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
# Python 2/3 compatibility import list
try:
from collections import UserDict
except ImportError:
from UserDict import UserDict
from compas.data import json_dumps
from compas.data import json_loads

DEFAULT_TRANSPORT = None

Expand Down Expand Up @@ -61,21 +58,34 @@ def unadvertise(self, topic):
pass


class Message(UserDict):
class Message(object):
"""Message objects used for publishing and subscribing to/from topics.

A message is fundamentally a dictionary and behaves as one."""

def __init__(self, *args, **kwargs):
super(Message, self).__init__()
self.data = {}
self.data.update(*args, **kwargs)

def ToString(self):
return str(self)

def __str__(self):
return str(self.data)

def __getattr__(self, name):
return self.__dict__["data"][name]
return self.data[name]

def __getitem__(self, key):
return self.data[key]

def __setitem__(self, key, value):
self.data[key] = value

@classmethod
def parse(cls, value):
instance = cls()
instance.update(value)
instance = cls(**value)
return instance


Expand Down Expand Up @@ -103,6 +113,26 @@ def __init__(self, name, message_type, **options):
self.message_type = message_type
self.options = options

def _message_to_json(self, message):
"""Convert a message to a JSON string.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this (and _message_from_json) maybe made public? I see they are only used externally.
more generally, aren't these two doing something very similar to compas.data?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

they are doing something similar to compas.data but not exactly, a message can be a simple dictionary, an instance of Message, or a subclass of compas.data.Data and needs to support compas 1.x and compas 2.x

Normally, this method expects sub-classes of ``Message`` as input.
However, it can deal with regular dictionaries as well as classes
implementing the COMPAS data framework.
"""
try:
data = message.data
except (KeyError, AttributeError):
try:
data = message.__data__
except (KeyError, AttributeError):
data = dict(message)
return json_dumps(data)

def _message_from_json(self, json_message):
"""Converts a JSON string back into a message instance."""
return self.message_type.parse(json_loads(json_message))


class Publisher(object):
"""Publisher interface."""
Expand Down
59 changes: 54 additions & 5 deletions src/compas_eve/ghpython/background.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,27 @@ def do_something_long_and_complicated(worker):
Grasshopper environment object
long_running_function : function, optional
This function will be the main entry point for the long-running task.
dispose_function : function, optional
If defined, this function will be called when the worker is disposed. It can be used for clean-up tasks
and resource deallocation.
auto_set_done : bool, optional
If true, the worker state will be automatically set to ``Done`` after the function returns.
Defaults to ``True``.
args : tuple, optional
List or tuple of arguments for the invocation of the ``long_running_function``. Defaults to ``()``.
"""

def __init__(self, ghenv, long_running_function=None):
def __init__(self, ghenv, long_running_function=None, dispose_function=None, auto_set_done=True, args=()):
gonzalocasas marked this conversation as resolved.
Show resolved Hide resolved
super(BackgroundWorker, self).__init__()
self.ghenv = ghenv
self._is_working = False
self._is_done = False
self._is_cancelled = False
self._has_requested_cancellation = False
self.long_running_function = long_running_function
self.dispose_function = dispose_function
self.auto_set_done = auto_set_done
self.args = args

def is_working(self):
"""Indicate whether the worker is currently working or not."""
Expand All @@ -86,17 +97,37 @@ def start_work(self):
def _long_running_task_wrapper(worker):
try:
worker.set_internal_state_to_working()
result = self.long_running_function(self)
worker.set_internal_state_to_done(result)
result = self.long_running_function(self, *self.args)

# There are (at least) two types of long running functions:
# 1. Those that block the thread while working
# (e.g. they have a busy-wait or some kind of `while` loop)
# 2. Those that hookup event handlers and return immediately
# so then they don't need to block the thread.
# The first case means that we can set the state to "DONE"
# right after calling the function because if it returned, it really
# means it's done.
# The second case will return immediately, and setting the state to "DONE"
# would be wrong because the handlers are still going to trigger.
# In that case we can set the flag `auto_set_done` to `False` so that
# we don't automatically set the state to "DONE".
if self.auto_set_done:
worker.set_internal_state_to_done(result)
except Exception as e:
worker.display_message(str(e))
worker.set_internal_state_to_cancelled()

target = _long_running_task_wrapper
args = (self,)
self.thread = threading.Thread(target=target, args=args)
self.thread.daemon = True
self.thread.start()

def dispose(self):
"""Invoked when the worker is being disposed."""
if callable(self.dispose_function):
self.dispose_function(self)

def set_internal_state_to_working(self):
"""Set the internal state to ``working``."""
self._is_working = True
Expand Down Expand Up @@ -159,7 +190,9 @@ def ui_callback():
Rhino.RhinoApp.InvokeOnUiThread(System.Action(ui_callback))

@classmethod
def instance_by_component(cls, ghenv, long_running_function=None, force_new=False):
def instance_by_component(
cls, ghenv, long_running_function=None, dispose_function=None, auto_set_done=True, force_new=False, args=()
):
"""Get the worker instance assigned to the component.

This will get a persistant instance of a background worker
Expand All @@ -172,8 +205,16 @@ def instance_by_component(cls, ghenv, long_running_function=None, force_new=Fals
Grasshopper environment object
long_running_function : function, optional
This function will be the main entry point for the long-running task.
dispose_function : function, optional
If defined, this function will be called when the worker is disposed.
It can be used for clean-up tasks and resource deallocation.
auto_set_done : bool, optional
If true, the worker state will be automatically set to ``Done`` after the function returns.
Defaults to ``True``.
force_new : bool, optional
Force the creation of a new background worker, by default False.
args : tuple, optional
List or tuple of arguments for the invocation of the ``long_running_function``. Defaults to ``()``.

Returns
-------
Expand All @@ -186,11 +227,18 @@ def instance_by_component(cls, ghenv, long_running_function=None, force_new=Fals

if worker and force_new:
worker.request_cancellation()
worker.dispose()
worker = None
del scriptcontext.sticky[key]

if not worker:
worker = cls(ghenv, long_running_function=long_running_function)
worker = cls(
ghenv,
long_running_function=long_running_function,
dispose_function=dispose_function,
auto_set_done=auto_set_done,
args=args,
)
scriptcontext.sticky[key] = worker

return worker
Expand All @@ -212,5 +260,6 @@ def stop_instance_by_component(cls, ghenv):

if worker:
worker.request_cancellation()
worker.dispose()
worker = None
del scriptcontext.sticky[key]
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@


class BackgroundTaskComponent(component):
def RunScript(self, reset, task, on):
def RunScript(self, task, reset, on):
if not on:
BackgroundWorker.stop_instance_by_component(ghenv) # noqa: F821
return None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@
"isAdvancedMode": true,
"iconDisplay": 2,
"inputParameters": [
{
"name": "reset",
"description": "Resets the background worker."
},
{
"name": "task",
"description": "A Python function that will be executed by the background worker. The function does not need to return quickly, it can even have an infinite loop and keep running, it will not block the UI."
},
{
"name": "reset",
"description": "Resets the background worker."
},
{
"name": "on",
"description": "Turn ON or OFF the background worker.",
Expand Down
10 changes: 3 additions & 7 deletions src/compas_eve/mqtt/mqtt_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,7 @@
from ..core import Transport
from ..event_emitter import EventEmitterMixin

from compas.data import json_dumps
from compas.data import json_loads

import clr
import json
import os
import sys

Expand Down Expand Up @@ -75,11 +71,11 @@ def on_ready(self, callback):
self.once("ready", callback)

def publish(self, topic, message):
# TODO: can we avoid the additional cast to dict?
json_message = topic._message_to_json(message)
application_message = (
MqttApplicationMessageBuilder()
.WithTopic(topic.name)
.WithPayload(json_dumps(dict(message)))
.WithPayload(json_message)
.Build()
)

Expand All @@ -94,7 +90,7 @@ def subscribe(self, topic, callback):

def _local_callback(application_message):
payload = Encoding.UTF8.GetString(application_message.Payload)
msg = topic.message_type.parse(json_loads(payload))
msg = topic._message_from_json(payload)
callback(msg)

def _subscribe_callback(**kwargs):
Expand Down
8 changes: 2 additions & 6 deletions src/compas_eve/mqtt/mqtt_paho.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
from compas.data import json_dumps
from compas.data import json_loads

from ..core import Transport
from ..event_emitter import EventEmitterMixin

Expand Down Expand Up @@ -63,8 +60,7 @@ def publish(self, topic, message):
"""

def _callback(**kwargs):
# TODO: can we avoid the additional cast to dict?
json_message = json_dumps(dict(message))
json_message = topic._message_to_json(message)
self.client.publish(topic.name, json_message)

self.on_ready(_callback)
Expand All @@ -91,7 +87,7 @@ def subscribe(self, topic, callback):
subscribe_id = "{}:{}".format(event_key, id(callback))

def _local_callback(msg):
msg = topic.message_type.parse(json_loads(msg.payload.decode()))
msg = topic._message_from_json(msg.payload.decode())
callback(msg)

def _subscribe_callback(**kwargs):
Expand Down
Loading
Loading