Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

expose engine shutdown to adapter manager and push/pushpull adapters #312

Merged
merged 13 commits into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion cpp/csp/python/Exception.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,20 @@ class PythonPassthrough : public csp::Exception
csp::Exception( exType, r, file, func, line )
{
//Fetch the current error to clear out the error indicator while the stack gets unwound
//We own the references to all the members assigned in PyErr_Fetch
//We need to hold the reference since PyErr_Restore takes back a reference to each of its arguments
PyErr_Fetch( &m_type, &m_value, &m_traceback );
}

PythonPassthrough( PyObject * pyException ) :
csp::Exception( "", "" )
{
// Note: all of these methods return strong references, so we own them like in the other constructor
m_type = PyObject_Type( pyException );
m_value = PyObject_Str( pyException );
m_traceback = PyException_GetTraceback( pyException );
}

void restore()
{
if( !description().empty() )
Expand All @@ -39,7 +50,6 @@ class PythonPassthrough : public csp::Exception
PyObject * m_type;
PyObject * m_value;
PyObject * m_traceback;

};

CSP_DECLARE_EXCEPTION( AttributeError, ::csp::Exception );
Expand Down
18 changes: 16 additions & 2 deletions cpp/csp/python/PyAdapterManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,19 @@ class PyAdapterManager : public AdapterManager
static PyObject * PyAdapterManager_PyObject_starttime( PyAdapterManager_PyObject * self ) { return toPython( self -> manager -> starttime() ); }
static PyObject * PyAdapterManager_PyObject_endtime( PyAdapterManager_PyObject * self ) { return toPython( self -> manager -> endtime() ); }

static PyObject * PyAdapterManager_PyObject_shutdown_engine( PyAdapterManager_PyObject * self, PyObject * args, PyObject * kwargs )
{
CSP_BEGIN_METHOD;

PyObject * pyException;
if( !PyArg_ParseTuple( args, "O", &pyException ) )
argaj marked this conversation as resolved.
Show resolved Hide resolved
return NULL;

self -> manager -> rootEngine() -> shutdown( PyEngine_shutdownMakeException( pyException ) );

CSP_RETURN_NONE;
}

static int PyAdapterManager_init( PyAdapterManager_PyObject *self, PyObject *args, PyObject *kwds )
{
CSP_BEGIN_METHOD;
Expand All @@ -83,8 +96,9 @@ static int PyAdapterManager_init( PyAdapterManager_PyObject *self, PyObject *arg
}

static PyMethodDef PyAdapterManager_methods[] = {
{ "starttime", (PyCFunction) PyAdapterManager_PyObject_starttime, METH_NOARGS, "starttime" },
{ "endtime", (PyCFunction) PyAdapterManager_PyObject_endtime, METH_NOARGS, "endtime" },
{ "starttime", (PyCFunction) PyAdapterManager_PyObject_starttime, METH_NOARGS, "starttime" },
{ "endtime", (PyCFunction) PyAdapterManager_PyObject_endtime, METH_NOARGS, "endtime" },
{ "shutdown_engine", (PyCFunction) PyAdapterManager_PyObject_shutdown_engine, METH_VARARGS, "shutdown_engine" },
argaj marked this conversation as resolved.
Show resolved Hide resolved
{NULL}
};

Expand Down
30 changes: 30 additions & 0 deletions cpp/csp/python/PyEngine.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#define _IN_CSP_PYTHON_PYENGINE_H

#include <csp/core/Time.h>
#include <csp/python/Exception.h>
#include <csp/engine/RootEngine.h>
#include <csp/python/PyObjectPtr.h>
#include <Python.h>
Expand Down Expand Up @@ -54,6 +55,35 @@ class CSPIMPL_EXPORT PyEngine final: public PyObject
Engine * m_engine;
};

inline std::exception_ptr PyEngine_shutdownMakeException( PyObject * pyException )
argaj marked this conversation as resolved.
Show resolved Hide resolved
{
if( !PyExceptionInstance_Check( pyException ) )
{
PyObject* pyExceptionStr = PyObject_Str( pyException );
argaj marked this conversation as resolved.
Show resolved Hide resolved
std::string pyExceptionString = PyUnicode_AsUTF8( pyExceptionStr );
std::string desc = "Expected Exception object as argument for shutdown_engine: got " + pyExceptionString + "of type " + Py_TYPE( pyException ) -> tp_name;
argaj marked this conversation as resolved.
Show resolved Hide resolved
Py_DECREF(pyExceptionStr);
argaj marked this conversation as resolved.
Show resolved Hide resolved
return std::make_exception_ptr( csp::Exception( "TypeError", desc ) );
}
else
return std::make_exception_ptr( PythonPassthrough( pyException ) );
}

// Generic engine shutdown function for push-type adapters
template<typename T>
PyObject * PyEngine_shutdown( T * self, PyObject * args, PyObject * kwargs )
argaj marked this conversation as resolved.
Show resolved Hide resolved
{
CSP_BEGIN_METHOD;

PyObject * pyException;
if( !PyArg_ParseTuple( args, "O", &pyException ) )
return NULL;

self -> adapter -> rootEngine() -> shutdown( PyEngine_shutdownMakeException( pyException ) );

CSP_RETURN_NONE;
}

};

#endif
3 changes: 2 additions & 1 deletion cpp/csp/python/PyPushInputAdapter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,8 @@ struct PyPushInputAdapter_PyObject
};

static PyMethodDef PyPushInputAdapter_PyObject_methods[] = {
{ "push_tick", (PyCFunction) PyPushInputAdapter_PyObject::pushTick, METH_VARARGS, "push new tick" },
{ "push_tick", (PyCFunction) PyPushInputAdapter_PyObject::pushTick, METH_VARARGS, "push new tick" },
{ "shutdown_engine", (PyCFunction) PyEngine_shutdown<PyPushInputAdapter_PyObject>, METH_VARARGS, "shutdown_engine" },
{NULL}
};

Expand Down
5 changes: 3 additions & 2 deletions cpp/csp/python/PyPushPullInputAdapter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,9 @@ struct PyPushPullInputAdapter_PyObject
};

static PyMethodDef PyPushPullInputAdapter_PyObject_methods[] = {
{ "push_tick", (PyCFunction) PyPushPullInputAdapter_PyObject::pushTick, METH_VARARGS, "push new tick" },
{ "flag_replay_complete", (PyCFunction) PyPushPullInputAdapter_PyObject::flagReplayComplete, METH_VARARGS, "finish replay ticks" },
{ "push_tick", (PyCFunction) PyPushPullInputAdapter_PyObject::pushTick, METH_VARARGS, "push new tick" },
{ "flag_replay_complete", (PyCFunction) PyPushPullInputAdapter_PyObject::flagReplayComplete, METH_VARARGS, "finish replay ticks" },
{ "shutdown_engine", (PyCFunction) PyEngine_shutdown<PyPushPullInputAdapter_PyObject>, METH_VARARGS, "shutdown engine" },
{NULL}
};

Expand Down
45 changes: 45 additions & 0 deletions csp/tests/impl/test_pushadapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,51 @@ def graph():
result = list(x[1] for x in result)
self.assertEqual(result, expected)

def test_adapter_engine_shutdown(self):
class MyPushAdapterImpl(PushInputAdapter):
def __init__(self):
self._thread = None
self._running = False

def start(self, starttime, endtime):
self._running = True
self._thread = threading.Thread(target=self._run)
self._thread.start()

def stop(self):
if self._running:
self._running = False
self._thread.join()

def _run(self):
pushed = False
while self._running:
if pushed:
time.sleep(0.1)
self.shutdown_engine(TypeError("Dummy exception message"))
else:
self.push_tick(0)
pushed = True

MyPushAdapter = py_push_adapter_def("MyPushAdapter", MyPushAdapterImpl, ts[int])

status = {"count": 0}

@csp.node
def node(x: ts[object]):
if csp.ticked(x):
status["count"] += 1

@csp.graph
def graph():
adapter = MyPushAdapter()
node(adapter)
csp.print("adapter", adapter)

with self.assertRaisesRegex(TypeError, "Dummy exception message"):
csp.run(graph, starttime=datetime.utcnow(), realtime=True)
self.assertEqual(status["count"], 1)


if __name__ == "__main__":
unittest.main()
58 changes: 58 additions & 0 deletions csp/tests/impl/test_pushpulladapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,64 @@ def graph():
result = [out[1] for out in graph_out[0]]
self.assertEqual(result, [1, 2, 3])

def test_adapter_engine_shutdown(self):
class MyPushPullAdapterImpl(PushPullInputAdapter):
def __init__(self, typ, data, shutdown_before_live):
self._data = data
self._thread = None
self._running = False
self._shutdown_before_live = shutdown_before_live

def start(self, starttime, endtime):
self._running = True
self._thread = threading.Thread(target=self._run)
self._thread.start()

def stop(self):
if self._running:
self._running = False
self._thread.join()

def _run(self):
idx = 0
while self._running and idx < len(self._data):
if idx and self._shutdown_before_live:
time.sleep(0.1)
self.shutdown_engine(ValueError("Dummy exception message"))
t, v = self._data[idx]
self.push_tick(False, t, v)
idx += 1
self.flag_replay_complete()

idx = 0
while self._running:
self.push_tick(True, datetime.utcnow(), len(self._data) + 1)
if idx and not self._shutdown_before_live:
time.sleep(0.1)
self.shutdown_engine(TypeError("Dummy exception message"))
idx += 1

MyPushPullAdapter = py_pushpull_adapter_def(
"MyPushPullAdapter", MyPushPullAdapterImpl, ts["T"], typ="T", data=list, shutdown_before_live=bool
)

@csp.graph
def graph(shutdown_before_live: bool):
data = [(datetime(2020, 1, 1, 2), 1), (datetime(2020, 1, 1, 3), 2)]
adapter = MyPushPullAdapter(int, data, shutdown_before_live)
csp.print("adapter", adapter)

with self.assertRaisesRegex(ValueError, "Dummy exception message"):
csp.run(graph, True, starttime=datetime(2020, 1, 1, 1))
with self.assertRaisesRegex(TypeError, "Dummy exception message"):
csp.run(
graph,
False,
starttime=datetime(2020, 1, 1, 1),
endtime=datetime.utcnow() + timedelta(seconds=2),
realtime=True,
)


if __name__ == "__main__":
unittest.main()
49 changes: 49 additions & 0 deletions csp/tests/test_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,55 @@ def graph():
b[t].append(v)
self.assertEqual(results["b"], list(b.items()))

def test_adapter_manager_engine_shutdown(self):
from csp.impl.adaptermanager import AdapterManagerImpl, ManagedSimInputAdapter
from csp.impl.wiring import py_managed_adapter_def

class TestAdapterManager:
def __init__(self):
self._impl = None

def subscribe(self):
return TestAdapter(self)

def _create(self, engine, memo):
self._impl = TestAdapterManagerImpl(engine)
return self._impl

class TestAdapterManagerImpl(AdapterManagerImpl):
def __init__(self, engine):
super().__init__(engine)

def start(self, starttime, endtime):
pass

def stop(self):
pass

def process_next_sim_timeslice(self, now):
argaj marked this conversation as resolved.
Show resolved Hide resolved
try:
[].pop()
except IndexError as e:
self.shutdown_engine(e)

class TestAdapterImpl(ManagedSimInputAdapter):
def __init__(self, manager_impl):
pass

TestAdapter = py_managed_adapter_def("TestAdapter", TestAdapterImpl, ts[int], TestAdapterManager)

def graph():
adapter = TestAdapterManager()
nc = adapter.subscribe()
csp.add_graph_output("nc", nc)

try:
csp.run(graph, starttime=datetime(2020, 1, 1), endtime=timedelta(seconds=1))
except IndexError:
tb = traceback.format_exc()

self.assertTrue("[].pop()" in tb and "process_next_sim_timeslice" in tb)

def test_feedback(self):
# Dummy example
class Request(csp.Struct):
Expand Down
15 changes: 15 additions & 0 deletions docs/wiki/how-tos/Write-Realtime-Input-Adapters.md
Original file line number Diff line number Diff line change
Expand Up @@ -405,3 +405,18 @@ csp.run(my_graph, starttime=datetime.utcnow(), endtime=timedelta(seconds=10), re
```

Do note that realtime adapters will only run in realtime engines (note the `realtime=True` argument to `csp.run`).

## Engine shutdown

In case a pushing thread hits a terminal error, an exception can be passed to the main engine thread to shut down gracefully through a `shutdown_engine(exc: Exception)` method exposed by `PushInputAdapter`, `PushPullInputAdapter` and `AdapterManagerImpl`.
argaj marked this conversation as resolved.
Show resolved Hide resolved

For example:

```python
def _run(self):
while self._running:
try:
requests.get(endpoint) # API call over a network, may fail
except Exception as exc:
self.shutdown_engine(exc)
```