diff --git a/cpp/csp/python/Exception.h b/cpp/csp/python/Exception.h index 104a3509a..5c7c2e80a 100644 --- a/cpp/csp/python/Exception.h +++ b/cpp/csp/python/Exception.h @@ -16,9 +16,20 @@ class PythonPassthrough : public csp::Exception csp::Exception( exType, r, file, func, line ) { //Fetch the current error to clear out the error indicator while the stack gets unwound + //We own the references to all the members assigned in PyErr_Fetch + //We need to hold the reference since PyErr_Restore takes back a reference to each of its arguments PyErr_Fetch( &m_type, &m_value, &m_traceback ); } + PythonPassthrough( PyObject * pyException ) : + csp::Exception( "", "" ) + { + // Note: all of these methods return strong references, so we own them like in the other constructor + m_type = PyObject_Type( pyException ); + m_value = PyObject_Str( pyException ); + m_traceback = PyException_GetTraceback( pyException ); + } + void restore() { if( !description().empty() ) @@ -39,7 +50,6 @@ class PythonPassthrough : public csp::Exception PyObject * m_type; PyObject * m_value; PyObject * m_traceback; - }; CSP_DECLARE_EXCEPTION( AttributeError, ::csp::Exception ); diff --git a/cpp/csp/python/PyAdapterManager.cpp b/cpp/csp/python/PyAdapterManager.cpp index 3a9ec7211..d6547e135 100644 --- a/cpp/csp/python/PyAdapterManager.cpp +++ b/cpp/csp/python/PyAdapterManager.cpp @@ -68,6 +68,15 @@ class PyAdapterManager : public AdapterManager static PyObject * PyAdapterManager_PyObject_starttime( PyAdapterManager_PyObject * self ) { return toPython( self -> manager -> starttime() ); } static PyObject * PyAdapterManager_PyObject_endtime( PyAdapterManager_PyObject * self ) { return toPython( self -> manager -> endtime() ); } +static PyObject * PyAdapterManager_PyObject_shutdown_engine( PyAdapterManager_PyObject * self, PyObject * pyException ) +{ + CSP_BEGIN_METHOD; + + self -> manager -> rootEngine() -> shutdown( PyEngine_shutdown_make_exception( pyException ) ); + + CSP_RETURN_NONE; +} + static int PyAdapterManager_init( PyAdapterManager_PyObject *self, PyObject *args, PyObject *kwds ) { CSP_BEGIN_METHOD; @@ -83,8 +92,9 @@ static int PyAdapterManager_init( PyAdapterManager_PyObject *self, PyObject *arg } static PyMethodDef PyAdapterManager_methods[] = { - { "starttime", (PyCFunction) PyAdapterManager_PyObject_starttime, METH_NOARGS, "starttime" }, - { "endtime", (PyCFunction) PyAdapterManager_PyObject_endtime, METH_NOARGS, "endtime" }, + { "starttime", (PyCFunction) PyAdapterManager_PyObject_starttime, METH_NOARGS, "starttime" }, + { "endtime", (PyCFunction) PyAdapterManager_PyObject_endtime, METH_NOARGS, "endtime" }, + { "shutdown_engine", (PyCFunction) PyAdapterManager_PyObject_shutdown_engine, METH_O, "shutdown_engine" }, {NULL} }; diff --git a/cpp/csp/python/PyEngine.h b/cpp/csp/python/PyEngine.h index 4a694c0c6..d58289501 100644 --- a/cpp/csp/python/PyEngine.h +++ b/cpp/csp/python/PyEngine.h @@ -2,6 +2,7 @@ #define _IN_CSP_PYTHON_PYENGINE_H #include +#include #include #include #include @@ -54,6 +55,21 @@ class CSPIMPL_EXPORT PyEngine final: public PyObject Engine * m_engine; }; +inline std::exception_ptr PyEngine_shutdown_make_exception( PyObject * pyException ) +{ + if( !PyExceptionInstance_Check( pyException ) ) + { + PyObjectPtr pyExceptionStr = PyObjectPtr::own( PyObject_Str( pyException ) ); + if( !pyExceptionStr.ptr() ) + CSP_THROW( PythonPassthrough, "" ); + std::string pyExceptionString = PyUnicode_AsUTF8( pyExceptionStr.ptr() ); + std::string desc = "Expected Exception object as argument for shutdown_engine: got " + pyExceptionString + " of type " + Py_TYPE( pyException ) -> tp_name; + return std::make_exception_ptr( csp::Exception( "TypeError", desc ) ); + } + else + return std::make_exception_ptr( PythonPassthrough( pyException ) ); +} + }; #endif diff --git a/cpp/csp/python/PyPushInputAdapter.cpp b/cpp/csp/python/PyPushInputAdapter.cpp index 3b5f91b07..cd034844a 100644 --- a/cpp/csp/python/PyPushInputAdapter.cpp +++ b/cpp/csp/python/PyPushInputAdapter.cpp @@ -198,11 +198,21 @@ struct PyPushInputAdapter_PyObject CSP_RETURN_NONE; } + static PyObject * shutdown_engine( PyPushInputAdapter_PyObject * self, PyObject * pyException ) + { + CSP_BEGIN_METHOD; + + self -> adapter -> rootEngine() -> shutdown( PyEngine_shutdown_make_exception( pyException ) ); + + CSP_RETURN_NONE; + } + static PyTypeObject PyType; }; static PyMethodDef PyPushInputAdapter_PyObject_methods[] = { - { "push_tick", (PyCFunction) PyPushInputAdapter_PyObject::pushTick, METH_VARARGS, "push new tick" }, + { "push_tick", (PyCFunction) PyPushInputAdapter_PyObject::pushTick, METH_VARARGS, "push new tick" }, + { "shutdown_engine", (PyCFunction) PyPushInputAdapter_PyObject::shutdown_engine, METH_O, "shutdown_engine" }, {NULL} }; diff --git a/cpp/csp/python/PyPushPullInputAdapter.cpp b/cpp/csp/python/PyPushPullInputAdapter.cpp index d53ed972d..71190660c 100644 --- a/cpp/csp/python/PyPushPullInputAdapter.cpp +++ b/cpp/csp/python/PyPushPullInputAdapter.cpp @@ -119,12 +119,22 @@ struct PyPushPullInputAdapter_PyObject CSP_RETURN_NONE; } + static PyObject * shutdown_engine( PyPushPullInputAdapter_PyObject * self, PyObject * pyException ) + { + CSP_BEGIN_METHOD; + + self -> adapter -> rootEngine() -> shutdown( PyEngine_shutdown_make_exception( pyException ) ); + + CSP_RETURN_NONE; + } + static PyTypeObject PyType; }; static PyMethodDef PyPushPullInputAdapter_PyObject_methods[] = { - { "push_tick", (PyCFunction) PyPushPullInputAdapter_PyObject::pushTick, METH_VARARGS, "push new tick" }, - { "flag_replay_complete", (PyCFunction) PyPushPullInputAdapter_PyObject::flagReplayComplete, METH_VARARGS, "finish replay ticks" }, + { "push_tick", (PyCFunction) PyPushPullInputAdapter_PyObject::pushTick, METH_VARARGS, "push new tick" }, + { "flag_replay_complete", (PyCFunction) PyPushPullInputAdapter_PyObject::flagReplayComplete, METH_VARARGS, "finish replay ticks" }, + { "shutdown_engine", (PyCFunction) PyPushPullInputAdapter_PyObject::shutdown_engine, METH_O, "shutdown engine" }, {NULL} }; diff --git a/csp/tests/impl/test_pushadapter.py b/csp/tests/impl/test_pushadapter.py index 03ff7b7f4..8773d95a7 100644 --- a/csp/tests/impl/test_pushadapter.py +++ b/csp/tests/impl/test_pushadapter.py @@ -241,6 +241,51 @@ def graph(): result = list(x[1] for x in result) self.assertEqual(result, expected) + def test_adapter_engine_shutdown(self): + class MyPushAdapterImpl(PushInputAdapter): + def __init__(self): + self._thread = None + self._running = False + + def start(self, starttime, endtime): + self._running = True + self._thread = threading.Thread(target=self._run) + self._thread.start() + + def stop(self): + if self._running: + self._running = False + self._thread.join() + + def _run(self): + pushed = False + while self._running: + if pushed: + time.sleep(0.1) + self.shutdown_engine(TypeError("Dummy exception message")) + else: + self.push_tick(0) + pushed = True + + MyPushAdapter = py_push_adapter_def("MyPushAdapter", MyPushAdapterImpl, ts[int]) + + status = {"count": 0} + + @csp.node + def node(x: ts[object]): + if csp.ticked(x): + status["count"] += 1 + + @csp.graph + def graph(): + adapter = MyPushAdapter() + node(adapter) + csp.print("adapter", adapter) + + with self.assertRaisesRegex(TypeError, "Dummy exception message"): + csp.run(graph, starttime=datetime.utcnow(), realtime=True) + self.assertEqual(status["count"], 1) + def test_help(self): # for `help` to work on adapters, signature must be defined sig = inspect.signature(test_adapter) diff --git a/csp/tests/impl/test_pushpulladapter.py b/csp/tests/impl/test_pushpulladapter.py index 74faff89f..b52ce0755 100644 --- a/csp/tests/impl/test_pushpulladapter.py +++ b/csp/tests/impl/test_pushpulladapter.py @@ -143,6 +143,64 @@ def graph(): result = [out[1] for out in graph_out[0]] self.assertEqual(result, [1, 2, 3]) + def test_adapter_engine_shutdown(self): + class MyPushPullAdapterImpl(PushPullInputAdapter): + def __init__(self, typ, data, shutdown_before_live): + self._data = data + self._thread = None + self._running = False + self._shutdown_before_live = shutdown_before_live + + def start(self, starttime, endtime): + self._running = True + self._thread = threading.Thread(target=self._run) + self._thread.start() + + def stop(self): + if self._running: + self._running = False + self._thread.join() + + def _run(self): + idx = 0 + while self._running and idx < len(self._data): + if idx and self._shutdown_before_live: + time.sleep(0.1) + self.shutdown_engine(ValueError("Dummy exception message")) + t, v = self._data[idx] + self.push_tick(False, t, v) + idx += 1 + self.flag_replay_complete() + + idx = 0 + while self._running: + self.push_tick(True, datetime.utcnow(), len(self._data) + 1) + if idx and not self._shutdown_before_live: + time.sleep(0.1) + self.shutdown_engine(TypeError("Dummy exception message")) + idx += 1 + + MyPushPullAdapter = py_pushpull_adapter_def( + "MyPushPullAdapter", MyPushPullAdapterImpl, ts["T"], typ="T", data=list, shutdown_before_live=bool + ) + + @csp.graph + def graph(shutdown_before_live: bool): + data = [(datetime(2020, 1, 1, 2), 1), (datetime(2020, 1, 1, 3), 2)] + adapter = MyPushPullAdapter(int, data, shutdown_before_live) + csp.print("adapter", adapter) + + with self.assertRaisesRegex(ValueError, "Dummy exception message"): + csp.run(graph, True, starttime=datetime(2020, 1, 1, 1)) + with self.assertRaisesRegex(TypeError, "Dummy exception message"): + csp.run( + graph, + False, + starttime=datetime(2020, 1, 1, 1), + endtime=datetime.utcnow() + timedelta(seconds=2), + realtime=True, + ) + if __name__ == "__main__": unittest.main() diff --git a/csp/tests/test_engine.py b/csp/tests/test_engine.py index adb67d372..a8ec0a965 100644 --- a/csp/tests/test_engine.py +++ b/csp/tests/test_engine.py @@ -799,6 +799,55 @@ def graph(): b[t].append(v) self.assertEqual(results["b"], list(b.items())) + def test_adapter_manager_engine_shutdown(self): + from csp.impl.adaptermanager import AdapterManagerImpl, ManagedSimInputAdapter + from csp.impl.wiring import py_managed_adapter_def + + class TestAdapterManager: + def __init__(self): + self._impl = None + + def subscribe(self): + return TestAdapter(self) + + def _create(self, engine, memo): + self._impl = TestAdapterManagerImpl(engine) + return self._impl + + class TestAdapterManagerImpl(AdapterManagerImpl): + def __init__(self, engine): + super().__init__(engine) + + def start(self, starttime, endtime): + pass + + def stop(self): + pass + + def process_next_sim_timeslice(self, now): + try: + [].pop() + except IndexError as e: + self.shutdown_engine(e) + + class TestAdapterImpl(ManagedSimInputAdapter): + def __init__(self, manager_impl): + pass + + TestAdapter = py_managed_adapter_def("TestAdapter", TestAdapterImpl, ts[int], TestAdapterManager) + + def graph(): + adapter = TestAdapterManager() + nc = adapter.subscribe() + csp.add_graph_output("nc", nc) + + try: + csp.run(graph, starttime=datetime(2020, 1, 1), endtime=timedelta(seconds=1)) + except IndexError: + tb = traceback.format_exc() + + self.assertTrue("[].pop()" in tb and "process_next_sim_timeslice" in tb) + def test_feedback(self): # Dummy example class Request(csp.Struct): diff --git a/docs/wiki/how-tos/Write-Realtime-Input-Adapters.md b/docs/wiki/how-tos/Write-Realtime-Input-Adapters.md index 2f0195d5a..3576c2509 100644 --- a/docs/wiki/how-tos/Write-Realtime-Input-Adapters.md +++ b/docs/wiki/how-tos/Write-Realtime-Input-Adapters.md @@ -405,3 +405,18 @@ csp.run(my_graph, starttime=datetime.utcnow(), endtime=timedelta(seconds=10), re ``` Do note that realtime adapters will only run in realtime engines (note the `realtime=True` argument to `csp.run`). + +## Engine shutdown + +In case a pushing thread hits a terminal error, an exception can be passed to the main engine thread to shut down gracefully through a `shutdown_engine(exc: Exception)` method exposed by `PushInputAdapter`, `PushPullInputAdapter` and `AdapterManagerImpl`. + +For example: + +```python +def _run(self): + while self._running: + try: + requests.get(endpoint) # API call over a network, may fail + except Exception as exc: + self.shutdown_engine(exc) +```