From 3f2d7b1d0f2c37c107e839c01ce14f331d98638f Mon Sep 17 00:00:00 2001 From: Adam Glustein Date: Mon, 8 Jul 2024 16:07:32 -0400 Subject: [PATCH 01/10] Expose engine shutdown method for Python push-type adapters Co-authored-by: Artur Gajowniczek Signed-off-by: Adam Glustein --- cpp/csp/python/Exception.h | 11 +++++ cpp/csp/python/PyAdapterManager.cpp | 18 +++++++- cpp/csp/python/PyEngine.h | 16 +++++++ cpp/csp/python/PyPushInputAdapter.cpp | 3 +- cpp/csp/python/PyPushPullInputAdapter.cpp | 5 +- csp/impl/adaptermanager.py | 4 ++ csp/impl/error_handling.py | 11 +++++ csp/impl/pushadapter.py | 4 ++ csp/impl/pushpulladapter.py | 4 ++ csp/tests/impl/test_pushadapter.py | 44 ++++++++++++++++++ csp/tests/impl/test_pushpulladapter.py | 56 +++++++++++++++++++++++ csp/tests/test_engine.py | 42 +++++++++++++++++ 12 files changed, 213 insertions(+), 5 deletions(-) diff --git a/cpp/csp/python/Exception.h b/cpp/csp/python/Exception.h index 104a3509a..1355c6f67 100644 --- a/cpp/csp/python/Exception.h +++ b/cpp/csp/python/Exception.h @@ -8,6 +8,17 @@ namespace csp::python { +class TracebackStringException : public std::exception +{ +public: + TracebackStringException( const char * message ) : m_message( message ) {} + + const char * what() const noexcept override { return m_message; } + +private: + const char * m_message; +}; + class PythonPassthrough : public csp::Exception { public: diff --git a/cpp/csp/python/PyAdapterManager.cpp b/cpp/csp/python/PyAdapterManager.cpp index 3a9ec7211..c412a0ee0 100644 --- a/cpp/csp/python/PyAdapterManager.cpp +++ b/cpp/csp/python/PyAdapterManager.cpp @@ -68,6 +68,19 @@ class PyAdapterManager : public AdapterManager static PyObject * PyAdapterManager_PyObject_starttime( PyAdapterManager_PyObject * self ) { return toPython( self -> manager -> starttime() ); } static PyObject * PyAdapterManager_PyObject_endtime( PyAdapterManager_PyObject * self ) { return toPython( self -> manager -> endtime() ); } +static PyObject * PyAdapterManager_PyObject__engine_shutdown( PyAdapterManager_PyObject * self, PyObject * args, PyObject * kwargs) +{ + CSP_BEGIN_METHOD; + + const char * msg; + if( !PyArg_ParseTuple( args, "s", &msg ) ) + return NULL; + + self -> manager -> rootEngine() -> shutdown( std::make_exception_ptr( TracebackStringException( msg ) ) ); + + CSP_RETURN_NONE; +} + static int PyAdapterManager_init( PyAdapterManager_PyObject *self, PyObject *args, PyObject *kwds ) { CSP_BEGIN_METHOD; @@ -83,8 +96,9 @@ static int PyAdapterManager_init( PyAdapterManager_PyObject *self, PyObject *arg } static PyMethodDef PyAdapterManager_methods[] = { - { "starttime", (PyCFunction) PyAdapterManager_PyObject_starttime, METH_NOARGS, "starttime" }, - { "endtime", (PyCFunction) PyAdapterManager_PyObject_endtime, METH_NOARGS, "endtime" }, + { "starttime", (PyCFunction) PyAdapterManager_PyObject_starttime, METH_NOARGS, "starttime" }, + { "endtime", (PyCFunction) PyAdapterManager_PyObject_endtime, METH_NOARGS, "endtime" }, + { "_engine_shutdown", (PyCFunction) PyAdapterManager_PyObject__engine_shutdown, METH_VARARGS, "_engine_shutdown" }, {NULL} }; diff --git a/cpp/csp/python/PyEngine.h b/cpp/csp/python/PyEngine.h index 4a694c0c6..e059e81c7 100644 --- a/cpp/csp/python/PyEngine.h +++ b/cpp/csp/python/PyEngine.h @@ -2,6 +2,7 @@ #define _IN_CSP_PYTHON_PYENGINE_H #include +#include #include #include #include @@ -54,6 +55,21 @@ class CSPIMPL_EXPORT PyEngine final: public PyObject Engine * m_engine; }; +// Generic engine shutdown function for push-type adapters +template +PyObject * PyEngine_shutdown( T * self, PyObject * args, PyObject * kwargs ) +{ + CSP_BEGIN_METHOD; + + const char * msg; + if( !PyArg_ParseTuple( args, "s", &msg ) ) + return NULL; + + self -> adapter -> rootEngine() -> shutdown( std::make_exception_ptr( TracebackStringException( msg ) ) ); + + CSP_RETURN_NONE; +} + }; #endif diff --git a/cpp/csp/python/PyPushInputAdapter.cpp b/cpp/csp/python/PyPushInputAdapter.cpp index 3b5f91b07..013e78db9 100644 --- a/cpp/csp/python/PyPushInputAdapter.cpp +++ b/cpp/csp/python/PyPushInputAdapter.cpp @@ -202,7 +202,8 @@ struct PyPushInputAdapter_PyObject }; static PyMethodDef PyPushInputAdapter_PyObject_methods[] = { - { "push_tick", (PyCFunction) PyPushInputAdapter_PyObject::pushTick, METH_VARARGS, "push new tick" }, + { "push_tick", (PyCFunction) PyPushInputAdapter_PyObject::pushTick, METH_VARARGS, "push new tick" }, + { "_engine_shutdown", (PyCFunction) PyEngine_shutdown, METH_VARARGS, "engine shutdown" }, {NULL} }; diff --git a/cpp/csp/python/PyPushPullInputAdapter.cpp b/cpp/csp/python/PyPushPullInputAdapter.cpp index d53ed972d..971999078 100644 --- a/cpp/csp/python/PyPushPullInputAdapter.cpp +++ b/cpp/csp/python/PyPushPullInputAdapter.cpp @@ -123,8 +123,9 @@ struct PyPushPullInputAdapter_PyObject }; static PyMethodDef PyPushPullInputAdapter_PyObject_methods[] = { - { "push_tick", (PyCFunction) PyPushPullInputAdapter_PyObject::pushTick, METH_VARARGS, "push new tick" }, - { "flag_replay_complete", (PyCFunction) PyPushPullInputAdapter_PyObject::flagReplayComplete, METH_VARARGS, "finish replay ticks" }, + { "push_tick", (PyCFunction) PyPushPullInputAdapter_PyObject::pushTick, METH_VARARGS, "push new tick" }, + { "flag_replay_complete", (PyCFunction) PyPushPullInputAdapter_PyObject::flagReplayComplete, METH_VARARGS, "finish replay ticks" }, + { "_engine_shutdown", (PyCFunction) PyEngine_shutdown, METH_VARARGS, "engine shutdown" }, {NULL} }; diff --git a/csp/impl/adaptermanager.py b/csp/impl/adaptermanager.py index 9ec42d0e0..b53df8338 100644 --- a/csp/impl/adaptermanager.py +++ b/csp/impl/adaptermanager.py @@ -2,6 +2,7 @@ import csp from csp.impl.__cspimpl import _cspimpl +from csp.impl.error_handling import format_engine_shutdown_stack class AdapterManagerImpl(_cspimpl.PyAdapterManager): @@ -24,6 +25,9 @@ def process_next_sim_timeslice(self, now): """ return None + def engine_shutdown(self, msg): + self._engine_shutdown(format_engine_shutdown_stack(msg)) + class ManagedSimInputAdapter(_cspimpl.PyManagedSimInputAdapter): def __init__(self, typ, field_map): diff --git a/csp/impl/error_handling.py b/csp/impl/error_handling.py index e662b7eb8..b4a0303b2 100644 --- a/csp/impl/error_handling.py +++ b/csp/impl/error_handling.py @@ -1,5 +1,6 @@ import ast import os +import traceback import csp.impl from csp.impl.__cspimpl import _cspimpl @@ -48,3 +49,13 @@ def set_print_full_exception_stack(new_value: bool): res = ExceptionContext.PRINT_EXCEPTION_FULL_STACK ExceptionContext.PRINT_EXCEPTION_FULL_STACK = new_value return res + + +def format_engine_shutdown_stack(msg: str): + tb = traceback.format_stack() + tb = tb[:-2] # remove traceback call and internal engine shutdown method + if len(tb) > 1: + tb = tb[-2:] # only keep the current function and the caller (adapter manager) + tb.append(msg) # add the user's error message + tb = "".join(tb) # format into a single string + return tb diff --git a/csp/impl/pushadapter.py b/csp/impl/pushadapter.py index b64356738..e9fe48fcd 100644 --- a/csp/impl/pushadapter.py +++ b/csp/impl/pushadapter.py @@ -1,4 +1,5 @@ from csp.impl.__cspimpl import _cspimpl +from csp.impl.error_handling import format_engine_shutdown_stack PushGroup = _cspimpl.PushGroup PushBatch = _cspimpl.PushBatch @@ -11,5 +12,8 @@ def start(self, starttime, endtime): def stop(self): pass + def engine_shutdown(self, msg): + self._engine_shutdown(format_engine_shutdown_stack(msg)) + # base class # def push_tick( self, value ) diff --git a/csp/impl/pushpulladapter.py b/csp/impl/pushpulladapter.py index f2beccd3a..c92f85411 100644 --- a/csp/impl/pushpulladapter.py +++ b/csp/impl/pushpulladapter.py @@ -1,4 +1,5 @@ from csp.impl.__cspimpl import _cspimpl +from csp.impl.error_handling import format_engine_shutdown_stack PushGroup = _cspimpl.PushGroup PushBatch = _cspimpl.PushBatch @@ -11,5 +12,8 @@ def start(self, starttime, endtime): def stop(self): pass + def engine_shutdown(self, msg): + self._engine_shutdown(format_engine_shutdown_stack(msg)) + # base class # def push_tick( self, time, value ) diff --git a/csp/tests/impl/test_pushadapter.py b/csp/tests/impl/test_pushadapter.py index 1aa7c0566..ed8a2c38f 100644 --- a/csp/tests/impl/test_pushadapter.py +++ b/csp/tests/impl/test_pushadapter.py @@ -240,6 +240,50 @@ def graph(): result = list(x[1] for x in result) self.assertEqual(result, expected) + def test_adapter_engine_shutdown(self): + class MyPushAdapterImpl(PushInputAdapter): + def __init__(self): + self._thread = None + self._running = False + + def start(self, starttime, endtime): + self._running = True + self._thread = threading.Thread(target=self._run) + self._thread.start() + + def stop(self): + if self._running: + self._running = False + self._thread.join() + + def _run(self): + pushed = False + while self._running: + if pushed: + self.engine_shutdown("Dummy exception message") + else: + self.push_tick(0) + pushed = True + + MyPushAdapter = py_push_adapter_def("MyPushAdapter", MyPushAdapterImpl, ts[int]) + + status = {"count": 0} + + @csp.node + def node(x: ts[object]): + if csp.ticked(x): + status["count"] += 1 + + @csp.graph + def graph(): + adapter = MyPushAdapter() + node(adapter) + csp.print("adapter", adapter) + + with self.assertRaisesRegex(Exception, "Dummy exception message"): + csp.run(graph, starttime=datetime.utcnow(), realtime=True) + self.assertEqual(status["count"], 1) + if __name__ == "__main__": unittest.main() diff --git a/csp/tests/impl/test_pushpulladapter.py b/csp/tests/impl/test_pushpulladapter.py index 74faff89f..132e84da1 100644 --- a/csp/tests/impl/test_pushpulladapter.py +++ b/csp/tests/impl/test_pushpulladapter.py @@ -143,6 +143,62 @@ def graph(): result = [out[1] for out in graph_out[0]] self.assertEqual(result, [1, 2, 3]) + def test_adapter_engine_shutdown(self): + class MyPushPullAdapterImpl(PushPullInputAdapter): + def __init__(self, typ, data, shutdown_before_live): + self._data = data + self._thread = None + self._running = False + self._shutdown_before_live = shutdown_before_live + + def start(self, starttime, endtime): + self._running = True + self._thread = threading.Thread(target=self._run) + self._thread.start() + + def stop(self): + if self._running: + self._running = False + self._thread.join() + + def _run(self): + idx = 0 + while self._running and idx < len(self._data): + if idx and self._shutdown_before_live: + self.engine_shutdown("Dummy exception message") + t, v = self._data[idx] + self.push_tick(False, t, v) + idx += 1 + self.flag_replay_complete() + + idx = 0 + while self._running: + self.push_tick(True, datetime.utcnow(), len(self._data) + 1) + if idx and not self._shutdown_before_live: + self.engine_shutdown("Dummy exception message") + idx += 1 + + MyPushPullAdapter = py_pushpull_adapter_def( + "MyPushPullAdapter", MyPushPullAdapterImpl, ts["T"], typ="T", data=list, shutdown_before_live=bool + ) + + @csp.graph + def graph(shutdown_before_live: bool): + data = [(datetime(2020, 1, 1, 2), 1), (datetime(2020, 1, 1, 3), 2)] + adapter = MyPushPullAdapter(int, data, shutdown_before_live) + csp.print("adapter", adapter) + + with self.assertRaisesRegex(Exception, "Dummy exception message"): + csp.run(graph, True, starttime=datetime(2020, 1, 1, 1)) + with self.assertRaisesRegex(Exception, "Dummy exception message"): + csp.run( + graph, + False, + starttime=datetime(2020, 1, 1, 1), + endtime=datetime.utcnow() + timedelta(seconds=2), + realtime=True, + ) + if __name__ == "__main__": unittest.main() diff --git a/csp/tests/test_engine.py b/csp/tests/test_engine.py index adb67d372..9b0698333 100644 --- a/csp/tests/test_engine.py +++ b/csp/tests/test_engine.py @@ -799,6 +799,48 @@ def graph(): b[t].append(v) self.assertEqual(results["b"], list(b.items())) + def test_adapter_manager_engine_shutdown(self): + from csp.impl.adaptermanager import AdapterManagerImpl, ManagedSimInputAdapter + from csp.impl.wiring import py_managed_adapter_def + + class TestAdapterManager: + def __init__(self): + self._impl = None + + def subscribe(self): + return TestAdapter(self) + + def _create(self, engine, memo): + self._impl = TestAdapterManagerImpl(engine) + return self._impl + + class TestAdapterManagerImpl(AdapterManagerImpl): + def __init__(self, engine): + super().__init__(engine) + + def start(self, starttime, endtime): + pass + + def stop(self): + pass + + def process_next_sim_timeslice(self, now): + self.engine_shutdown("Dummy exception message") + + class TestAdapterImpl(ManagedSimInputAdapter): + def __init__(self, manager_impl): + pass + + TestAdapter = py_managed_adapter_def("TestAdapter", TestAdapterImpl, ts[int], TestAdapterManager) + + def graph(): + adapter = TestAdapterManager() + nc = adapter.subscribe() + csp.add_graph_output("nc", nc) + + with self.assertRaisesRegex(Exception, "Dummy exception message"): + csp.run(graph, starttime=datetime(2020, 1, 1), endtime=timedelta(seconds=1)) + def test_feedback(self): # Dummy example class Request(csp.Struct): From 147133c1479454c5f51b3b80d508d89265733fdc Mon Sep 17 00:00:00 2001 From: Adam Glustein Date: Mon, 8 Jul 2024 16:07:32 -0400 Subject: [PATCH 02/10] Expose engine shutdown method for Python push-type adapters Co-authored-by: Artur Gajowniczek Signed-off-by: Adam Glustein Signed-off-by: Gajowniczek, Artur --- cpp/csp/python/Exception.h | 11 +++++ cpp/csp/python/PyAdapterManager.cpp | 18 ++++++- cpp/csp/python/PyEngine.h | 16 +++++++ cpp/csp/python/PyPushInputAdapter.cpp | 3 +- cpp/csp/python/PyPushPullInputAdapter.cpp | 5 +- csp/impl/adaptermanager.py | 4 ++ csp/impl/error_handling.py | 11 +++++ csp/impl/pushadapter.py | 4 ++ csp/impl/pushpulladapter.py | 4 ++ csp/tests/impl/test_pushadapter.py | 45 ++++++++++++++++++ csp/tests/impl/test_pushpulladapter.py | 58 +++++++++++++++++++++++ csp/tests/test_engine.py | 42 ++++++++++++++++ 12 files changed, 216 insertions(+), 5 deletions(-) diff --git a/cpp/csp/python/Exception.h b/cpp/csp/python/Exception.h index 104a3509a..1355c6f67 100644 --- a/cpp/csp/python/Exception.h +++ b/cpp/csp/python/Exception.h @@ -8,6 +8,17 @@ namespace csp::python { +class TracebackStringException : public std::exception +{ +public: + TracebackStringException( const char * message ) : m_message( message ) {} + + const char * what() const noexcept override { return m_message; } + +private: + const char * m_message; +}; + class PythonPassthrough : public csp::Exception { public: diff --git a/cpp/csp/python/PyAdapterManager.cpp b/cpp/csp/python/PyAdapterManager.cpp index 3a9ec7211..c412a0ee0 100644 --- a/cpp/csp/python/PyAdapterManager.cpp +++ b/cpp/csp/python/PyAdapterManager.cpp @@ -68,6 +68,19 @@ class PyAdapterManager : public AdapterManager static PyObject * PyAdapterManager_PyObject_starttime( PyAdapterManager_PyObject * self ) { return toPython( self -> manager -> starttime() ); } static PyObject * PyAdapterManager_PyObject_endtime( PyAdapterManager_PyObject * self ) { return toPython( self -> manager -> endtime() ); } +static PyObject * PyAdapterManager_PyObject__engine_shutdown( PyAdapterManager_PyObject * self, PyObject * args, PyObject * kwargs) +{ + CSP_BEGIN_METHOD; + + const char * msg; + if( !PyArg_ParseTuple( args, "s", &msg ) ) + return NULL; + + self -> manager -> rootEngine() -> shutdown( std::make_exception_ptr( TracebackStringException( msg ) ) ); + + CSP_RETURN_NONE; +} + static int PyAdapterManager_init( PyAdapterManager_PyObject *self, PyObject *args, PyObject *kwds ) { CSP_BEGIN_METHOD; @@ -83,8 +96,9 @@ static int PyAdapterManager_init( PyAdapterManager_PyObject *self, PyObject *arg } static PyMethodDef PyAdapterManager_methods[] = { - { "starttime", (PyCFunction) PyAdapterManager_PyObject_starttime, METH_NOARGS, "starttime" }, - { "endtime", (PyCFunction) PyAdapterManager_PyObject_endtime, METH_NOARGS, "endtime" }, + { "starttime", (PyCFunction) PyAdapterManager_PyObject_starttime, METH_NOARGS, "starttime" }, + { "endtime", (PyCFunction) PyAdapterManager_PyObject_endtime, METH_NOARGS, "endtime" }, + { "_engine_shutdown", (PyCFunction) PyAdapterManager_PyObject__engine_shutdown, METH_VARARGS, "_engine_shutdown" }, {NULL} }; diff --git a/cpp/csp/python/PyEngine.h b/cpp/csp/python/PyEngine.h index 4a694c0c6..e059e81c7 100644 --- a/cpp/csp/python/PyEngine.h +++ b/cpp/csp/python/PyEngine.h @@ -2,6 +2,7 @@ #define _IN_CSP_PYTHON_PYENGINE_H #include +#include #include #include #include @@ -54,6 +55,21 @@ class CSPIMPL_EXPORT PyEngine final: public PyObject Engine * m_engine; }; +// Generic engine shutdown function for push-type adapters +template +PyObject * PyEngine_shutdown( T * self, PyObject * args, PyObject * kwargs ) +{ + CSP_BEGIN_METHOD; + + const char * msg; + if( !PyArg_ParseTuple( args, "s", &msg ) ) + return NULL; + + self -> adapter -> rootEngine() -> shutdown( std::make_exception_ptr( TracebackStringException( msg ) ) ); + + CSP_RETURN_NONE; +} + }; #endif diff --git a/cpp/csp/python/PyPushInputAdapter.cpp b/cpp/csp/python/PyPushInputAdapter.cpp index 3b5f91b07..013e78db9 100644 --- a/cpp/csp/python/PyPushInputAdapter.cpp +++ b/cpp/csp/python/PyPushInputAdapter.cpp @@ -202,7 +202,8 @@ struct PyPushInputAdapter_PyObject }; static PyMethodDef PyPushInputAdapter_PyObject_methods[] = { - { "push_tick", (PyCFunction) PyPushInputAdapter_PyObject::pushTick, METH_VARARGS, "push new tick" }, + { "push_tick", (PyCFunction) PyPushInputAdapter_PyObject::pushTick, METH_VARARGS, "push new tick" }, + { "_engine_shutdown", (PyCFunction) PyEngine_shutdown, METH_VARARGS, "engine shutdown" }, {NULL} }; diff --git a/cpp/csp/python/PyPushPullInputAdapter.cpp b/cpp/csp/python/PyPushPullInputAdapter.cpp index d53ed972d..971999078 100644 --- a/cpp/csp/python/PyPushPullInputAdapter.cpp +++ b/cpp/csp/python/PyPushPullInputAdapter.cpp @@ -123,8 +123,9 @@ struct PyPushPullInputAdapter_PyObject }; static PyMethodDef PyPushPullInputAdapter_PyObject_methods[] = { - { "push_tick", (PyCFunction) PyPushPullInputAdapter_PyObject::pushTick, METH_VARARGS, "push new tick" }, - { "flag_replay_complete", (PyCFunction) PyPushPullInputAdapter_PyObject::flagReplayComplete, METH_VARARGS, "finish replay ticks" }, + { "push_tick", (PyCFunction) PyPushPullInputAdapter_PyObject::pushTick, METH_VARARGS, "push new tick" }, + { "flag_replay_complete", (PyCFunction) PyPushPullInputAdapter_PyObject::flagReplayComplete, METH_VARARGS, "finish replay ticks" }, + { "_engine_shutdown", (PyCFunction) PyEngine_shutdown, METH_VARARGS, "engine shutdown" }, {NULL} }; diff --git a/csp/impl/adaptermanager.py b/csp/impl/adaptermanager.py index 9ec42d0e0..b53df8338 100644 --- a/csp/impl/adaptermanager.py +++ b/csp/impl/adaptermanager.py @@ -2,6 +2,7 @@ import csp from csp.impl.__cspimpl import _cspimpl +from csp.impl.error_handling import format_engine_shutdown_stack class AdapterManagerImpl(_cspimpl.PyAdapterManager): @@ -24,6 +25,9 @@ def process_next_sim_timeslice(self, now): """ return None + def engine_shutdown(self, msg): + self._engine_shutdown(format_engine_shutdown_stack(msg)) + class ManagedSimInputAdapter(_cspimpl.PyManagedSimInputAdapter): def __init__(self, typ, field_map): diff --git a/csp/impl/error_handling.py b/csp/impl/error_handling.py index e662b7eb8..b4a0303b2 100644 --- a/csp/impl/error_handling.py +++ b/csp/impl/error_handling.py @@ -1,5 +1,6 @@ import ast import os +import traceback import csp.impl from csp.impl.__cspimpl import _cspimpl @@ -48,3 +49,13 @@ def set_print_full_exception_stack(new_value: bool): res = ExceptionContext.PRINT_EXCEPTION_FULL_STACK ExceptionContext.PRINT_EXCEPTION_FULL_STACK = new_value return res + + +def format_engine_shutdown_stack(msg: str): + tb = traceback.format_stack() + tb = tb[:-2] # remove traceback call and internal engine shutdown method + if len(tb) > 1: + tb = tb[-2:] # only keep the current function and the caller (adapter manager) + tb.append(msg) # add the user's error message + tb = "".join(tb) # format into a single string + return tb diff --git a/csp/impl/pushadapter.py b/csp/impl/pushadapter.py index b64356738..e9fe48fcd 100644 --- a/csp/impl/pushadapter.py +++ b/csp/impl/pushadapter.py @@ -1,4 +1,5 @@ from csp.impl.__cspimpl import _cspimpl +from csp.impl.error_handling import format_engine_shutdown_stack PushGroup = _cspimpl.PushGroup PushBatch = _cspimpl.PushBatch @@ -11,5 +12,8 @@ def start(self, starttime, endtime): def stop(self): pass + def engine_shutdown(self, msg): + self._engine_shutdown(format_engine_shutdown_stack(msg)) + # base class # def push_tick( self, value ) diff --git a/csp/impl/pushpulladapter.py b/csp/impl/pushpulladapter.py index f2beccd3a..c92f85411 100644 --- a/csp/impl/pushpulladapter.py +++ b/csp/impl/pushpulladapter.py @@ -1,4 +1,5 @@ from csp.impl.__cspimpl import _cspimpl +from csp.impl.error_handling import format_engine_shutdown_stack PushGroup = _cspimpl.PushGroup PushBatch = _cspimpl.PushBatch @@ -11,5 +12,8 @@ def start(self, starttime, endtime): def stop(self): pass + def engine_shutdown(self, msg): + self._engine_shutdown(format_engine_shutdown_stack(msg)) + # base class # def push_tick( self, time, value ) diff --git a/csp/tests/impl/test_pushadapter.py b/csp/tests/impl/test_pushadapter.py index 1aa7c0566..b77f38f65 100644 --- a/csp/tests/impl/test_pushadapter.py +++ b/csp/tests/impl/test_pushadapter.py @@ -240,6 +240,51 @@ def graph(): result = list(x[1] for x in result) self.assertEqual(result, expected) + def test_adapter_engine_shutdown(self): + class MyPushAdapterImpl(PushInputAdapter): + def __init__(self): + self._thread = None + self._running = False + + def start(self, starttime, endtime): + self._running = True + self._thread = threading.Thread(target=self._run) + self._thread.start() + + def stop(self): + if self._running: + self._running = False + self._thread.join() + + def _run(self): + pushed = False + while self._running: + if pushed: + time.sleep(0.1) + self.engine_shutdown("Dummy exception message") + else: + self.push_tick(0) + pushed = True + + MyPushAdapter = py_push_adapter_def("MyPushAdapter", MyPushAdapterImpl, ts[int]) + + status = {"count": 0} + + @csp.node + def node(x: ts[object]): + if csp.ticked(x): + status["count"] += 1 + + @csp.graph + def graph(): + adapter = MyPushAdapter() + node(adapter) + csp.print("adapter", adapter) + + with self.assertRaisesRegex(Exception, "Dummy exception message"): + csp.run(graph, starttime=datetime.utcnow(), realtime=True) + self.assertEqual(status["count"], 1) + if __name__ == "__main__": unittest.main() diff --git a/csp/tests/impl/test_pushpulladapter.py b/csp/tests/impl/test_pushpulladapter.py index 74faff89f..92557a2a8 100644 --- a/csp/tests/impl/test_pushpulladapter.py +++ b/csp/tests/impl/test_pushpulladapter.py @@ -143,6 +143,64 @@ def graph(): result = [out[1] for out in graph_out[0]] self.assertEqual(result, [1, 2, 3]) + def test_adapter_engine_shutdown(self): + class MyPushPullAdapterImpl(PushPullInputAdapter): + def __init__(self, typ, data, shutdown_before_live): + self._data = data + self._thread = None + self._running = False + self._shutdown_before_live = shutdown_before_live + + def start(self, starttime, endtime): + self._running = True + self._thread = threading.Thread(target=self._run) + self._thread.start() + + def stop(self): + if self._running: + self._running = False + self._thread.join() + + def _run(self): + idx = 0 + while self._running and idx < len(self._data): + if idx and self._shutdown_before_live: + time.sleep(0.1) + self.engine_shutdown("Dummy exception message") + t, v = self._data[idx] + self.push_tick(False, t, v) + idx += 1 + self.flag_replay_complete() + + idx = 0 + while self._running: + self.push_tick(True, datetime.utcnow(), len(self._data) + 1) + if idx and not self._shutdown_before_live: + time.sleep(0.1) + self.engine_shutdown("Dummy exception message") + idx += 1 + + MyPushPullAdapter = py_pushpull_adapter_def( + "MyPushPullAdapter", MyPushPullAdapterImpl, ts["T"], typ="T", data=list, shutdown_before_live=bool + ) + + @csp.graph + def graph(shutdown_before_live: bool): + data = [(datetime(2020, 1, 1, 2), 1), (datetime(2020, 1, 1, 3), 2)] + adapter = MyPushPullAdapter(int, data, shutdown_before_live) + csp.print("adapter", adapter) + + with self.assertRaisesRegex(Exception, "Dummy exception message"): + csp.run(graph, True, starttime=datetime(2020, 1, 1, 1)) + with self.assertRaisesRegex(Exception, "Dummy exception message"): + csp.run( + graph, + False, + starttime=datetime(2020, 1, 1, 1), + endtime=datetime.utcnow() + timedelta(seconds=2), + realtime=True, + ) + if __name__ == "__main__": unittest.main() diff --git a/csp/tests/test_engine.py b/csp/tests/test_engine.py index adb67d372..9b0698333 100644 --- a/csp/tests/test_engine.py +++ b/csp/tests/test_engine.py @@ -799,6 +799,48 @@ def graph(): b[t].append(v) self.assertEqual(results["b"], list(b.items())) + def test_adapter_manager_engine_shutdown(self): + from csp.impl.adaptermanager import AdapterManagerImpl, ManagedSimInputAdapter + from csp.impl.wiring import py_managed_adapter_def + + class TestAdapterManager: + def __init__(self): + self._impl = None + + def subscribe(self): + return TestAdapter(self) + + def _create(self, engine, memo): + self._impl = TestAdapterManagerImpl(engine) + return self._impl + + class TestAdapterManagerImpl(AdapterManagerImpl): + def __init__(self, engine): + super().__init__(engine) + + def start(self, starttime, endtime): + pass + + def stop(self): + pass + + def process_next_sim_timeslice(self, now): + self.engine_shutdown("Dummy exception message") + + class TestAdapterImpl(ManagedSimInputAdapter): + def __init__(self, manager_impl): + pass + + TestAdapter = py_managed_adapter_def("TestAdapter", TestAdapterImpl, ts[int], TestAdapterManager) + + def graph(): + adapter = TestAdapterManager() + nc = adapter.subscribe() + csp.add_graph_output("nc", nc) + + with self.assertRaisesRegex(Exception, "Dummy exception message"): + csp.run(graph, starttime=datetime(2020, 1, 1), endtime=timedelta(seconds=1)) + def test_feedback(self): # Dummy example class Request(csp.Struct): From 84cc40dd5f440abe68290d423be4e575270333ab Mon Sep 17 00:00:00 2001 From: "Gajowniczek, Artur" Date: Thu, 11 Jul 2024 09:12:12 -0400 Subject: [PATCH 03/10] passing exception to engine shutdown wip Signed-off-by: Gajowniczek, Artur --- cpp/csp/python/Exception.h | 13 +++++++++++++ cpp/csp/python/PyAdapterManager.cpp | 12 +++++++++--- csp/impl/adaptermanager.py | 2 +- 3 files changed, 23 insertions(+), 4 deletions(-) diff --git a/cpp/csp/python/Exception.h b/cpp/csp/python/Exception.h index 1355c6f67..6249d2ccc 100644 --- a/cpp/csp/python/Exception.h +++ b/cpp/csp/python/Exception.h @@ -30,6 +30,18 @@ class PythonPassthrough : public csp::Exception PyErr_Fetch( &m_type, &m_value, &m_traceback ); } + PythonPassthrough( PyObject * pyException ) : + m_pyException( pyException ), + m_type( PyObject_Type( pyException ) ), + m_value( PyObject_Str( pyException ) ), + m_traceback( PyException_GetTraceback( pyException ) ), + csp::Exception( PyUnicode_AsUTF8( m_type ), std::string( PyUnicode_AsUTF8( m_value ) ) ) + { + Py_INCREF( pyException ); + } + + ~PythonPassthrough() { Py_DECREF( m_pyException ); } + void restore() { if( !description().empty() ) @@ -50,6 +62,7 @@ class PythonPassthrough : public csp::Exception PyObject * m_type; PyObject * m_value; PyObject * m_traceback; + PyObject * m_pyException; }; diff --git a/cpp/csp/python/PyAdapterManager.cpp b/cpp/csp/python/PyAdapterManager.cpp index c412a0ee0..1ecd04b37 100644 --- a/cpp/csp/python/PyAdapterManager.cpp +++ b/cpp/csp/python/PyAdapterManager.cpp @@ -6,6 +6,7 @@ #include #include #include +#include namespace csp::python { @@ -72,11 +73,16 @@ static PyObject * PyAdapterManager_PyObject__engine_shutdown( PyAdapterManager_P { CSP_BEGIN_METHOD; - const char * msg; - if( !PyArg_ParseTuple( args, "s", &msg ) ) + PyObject * pyException = nullptr; + + if( !PyArg_ParseTuple( args, "O", &pyException ) ) return NULL; - self -> manager -> rootEngine() -> shutdown( std::make_exception_ptr( TracebackStringException( msg ) ) ); + PyObject * type = PyObject_Type( pyException ); + PyObject * val = PyObject_Str( pyException ); + PyObject * traceback = PyException_GetTraceback( pyException ); + + self -> manager -> rootEngine() -> shutdown( std::make_exception_ptr( PythonPassthrough( pyException ) ) ); CSP_RETURN_NONE; } diff --git a/csp/impl/adaptermanager.py b/csp/impl/adaptermanager.py index b53df8338..10c57bc2e 100644 --- a/csp/impl/adaptermanager.py +++ b/csp/impl/adaptermanager.py @@ -26,7 +26,7 @@ def process_next_sim_timeslice(self, now): return None def engine_shutdown(self, msg): - self._engine_shutdown(format_engine_shutdown_stack(msg)) + self._engine_shutdown(Exception('test')) # temporary for testing purposes class ManagedSimInputAdapter(_cspimpl.PyManagedSimInputAdapter): From 4ddc99f0e28a9dc04ac3070298cf0765fc8df83e Mon Sep 17 00:00:00 2001 From: Adam Glustein Date: Thu, 11 Jul 2024 14:25:10 -0400 Subject: [PATCH 04/10] Create PythonPassthrough exception of proper type in the shutdown_engine method Signed-off-by: Adam Glustein --- cpp/csp/python/Exception.h | 28 ++++++----------------- cpp/csp/python/PyAdapterManager.cpp | 15 +++++------- cpp/csp/python/PyEngine.h | 9 +++++--- cpp/csp/python/PyPushInputAdapter.cpp | 2 +- cpp/csp/python/PyPushPullInputAdapter.cpp | 2 +- csp/impl/adaptermanager.py | 4 ---- csp/impl/error_handling.py | 11 --------- csp/impl/pushadapter.py | 4 ---- csp/impl/pushpulladapter.py | 4 ---- csp/tests/impl/test_pushadapter.py | 4 ++-- csp/tests/impl/test_pushpulladapter.py | 8 +++---- csp/tests/test_engine.py | 4 ++-- 12 files changed, 29 insertions(+), 66 deletions(-) diff --git a/cpp/csp/python/Exception.h b/cpp/csp/python/Exception.h index 6249d2ccc..5c7c2e80a 100644 --- a/cpp/csp/python/Exception.h +++ b/cpp/csp/python/Exception.h @@ -8,17 +8,6 @@ namespace csp::python { -class TracebackStringException : public std::exception -{ -public: - TracebackStringException( const char * message ) : m_message( message ) {} - - const char * what() const noexcept override { return m_message; } - -private: - const char * m_message; -}; - class PythonPassthrough : public csp::Exception { public: @@ -27,21 +16,20 @@ class PythonPassthrough : public csp::Exception csp::Exception( exType, r, file, func, line ) { //Fetch the current error to clear out the error indicator while the stack gets unwound + //We own the references to all the members assigned in PyErr_Fetch + //We need to hold the reference since PyErr_Restore takes back a reference to each of its arguments PyErr_Fetch( &m_type, &m_value, &m_traceback ); } PythonPassthrough( PyObject * pyException ) : - m_pyException( pyException ), - m_type( PyObject_Type( pyException ) ), - m_value( PyObject_Str( pyException ) ), - m_traceback( PyException_GetTraceback( pyException ) ), - csp::Exception( PyUnicode_AsUTF8( m_type ), std::string( PyUnicode_AsUTF8( m_value ) ) ) + csp::Exception( "", "" ) { - Py_INCREF( pyException ); + // Note: all of these methods return strong references, so we own them like in the other constructor + m_type = PyObject_Type( pyException ); + m_value = PyObject_Str( pyException ); + m_traceback = PyException_GetTraceback( pyException ); } - ~PythonPassthrough() { Py_DECREF( m_pyException ); } - void restore() { if( !description().empty() ) @@ -62,8 +50,6 @@ class PythonPassthrough : public csp::Exception PyObject * m_type; PyObject * m_value; PyObject * m_traceback; - PyObject * m_pyException; - }; CSP_DECLARE_EXCEPTION( AttributeError, ::csp::Exception ); diff --git a/cpp/csp/python/PyAdapterManager.cpp b/cpp/csp/python/PyAdapterManager.cpp index 1ecd04b37..842483aa2 100644 --- a/cpp/csp/python/PyAdapterManager.cpp +++ b/cpp/csp/python/PyAdapterManager.cpp @@ -6,7 +6,6 @@ #include #include #include -#include namespace csp::python { @@ -69,19 +68,17 @@ class PyAdapterManager : public AdapterManager static PyObject * PyAdapterManager_PyObject_starttime( PyAdapterManager_PyObject * self ) { return toPython( self -> manager -> starttime() ); } static PyObject * PyAdapterManager_PyObject_endtime( PyAdapterManager_PyObject * self ) { return toPython( self -> manager -> endtime() ); } -static PyObject * PyAdapterManager_PyObject__engine_shutdown( PyAdapterManager_PyObject * self, PyObject * args, PyObject * kwargs) +static PyObject * PyAdapterManager_PyObject_shutdown_engine( PyAdapterManager_PyObject * self, PyObject * args, PyObject * kwargs ) { CSP_BEGIN_METHOD; - PyObject * pyException = nullptr; - + PyObject * pyException; if( !PyArg_ParseTuple( args, "O", &pyException ) ) return NULL; - - PyObject * type = PyObject_Type( pyException ); - PyObject * val = PyObject_Str( pyException ); - PyObject * traceback = PyException_GetTraceback( pyException ); + if( !PyExceptionInstance_Check( pyException ) ) + CSP_THROW( TypeError, "Expected Exception object as argument for shutdown_engine: got " << Py_TYPE( pyException ) -> tp_name ); + self -> manager -> rootEngine() -> shutdown( std::make_exception_ptr( PythonPassthrough( pyException ) ) ); CSP_RETURN_NONE; @@ -104,7 +101,7 @@ static int PyAdapterManager_init( PyAdapterManager_PyObject *self, PyObject *arg static PyMethodDef PyAdapterManager_methods[] = { { "starttime", (PyCFunction) PyAdapterManager_PyObject_starttime, METH_NOARGS, "starttime" }, { "endtime", (PyCFunction) PyAdapterManager_PyObject_endtime, METH_NOARGS, "endtime" }, - { "_engine_shutdown", (PyCFunction) PyAdapterManager_PyObject__engine_shutdown, METH_VARARGS, "_engine_shutdown" }, + { "shutdown_engine", (PyCFunction) PyAdapterManager_PyObject_shutdown_engine, METH_VARARGS, "shutdown_engine" }, {NULL} }; diff --git a/cpp/csp/python/PyEngine.h b/cpp/csp/python/PyEngine.h index e059e81c7..931792293 100644 --- a/cpp/csp/python/PyEngine.h +++ b/cpp/csp/python/PyEngine.h @@ -61,11 +61,14 @@ PyObject * PyEngine_shutdown( T * self, PyObject * args, PyObject * kwargs ) { CSP_BEGIN_METHOD; - const char * msg; - if( !PyArg_ParseTuple( args, "s", &msg ) ) + PyObject * pyException; + if( !PyArg_ParseTuple( args, "O", &pyException ) ) return NULL; - self -> adapter -> rootEngine() -> shutdown( std::make_exception_ptr( TracebackStringException( msg ) ) ); + if( !PyExceptionInstance_Check( pyException ) ) + CSP_THROW( TypeError, "Expected Exception object as argument for shutdown_engine: got " << Py_TYPE( pyException ) -> tp_name ); + + self -> adapter -> rootEngine() -> shutdown( std::make_exception_ptr( PythonPassthrough( pyException ) ) ); CSP_RETURN_NONE; } diff --git a/cpp/csp/python/PyPushInputAdapter.cpp b/cpp/csp/python/PyPushInputAdapter.cpp index 013e78db9..a9b01b446 100644 --- a/cpp/csp/python/PyPushInputAdapter.cpp +++ b/cpp/csp/python/PyPushInputAdapter.cpp @@ -203,7 +203,7 @@ struct PyPushInputAdapter_PyObject static PyMethodDef PyPushInputAdapter_PyObject_methods[] = { { "push_tick", (PyCFunction) PyPushInputAdapter_PyObject::pushTick, METH_VARARGS, "push new tick" }, - { "_engine_shutdown", (PyCFunction) PyEngine_shutdown, METH_VARARGS, "engine shutdown" }, + { "shutdown_engine", (PyCFunction) PyEngine_shutdown, METH_VARARGS, "shutdown_engine" }, {NULL} }; diff --git a/cpp/csp/python/PyPushPullInputAdapter.cpp b/cpp/csp/python/PyPushPullInputAdapter.cpp index 971999078..0f4ee5de9 100644 --- a/cpp/csp/python/PyPushPullInputAdapter.cpp +++ b/cpp/csp/python/PyPushPullInputAdapter.cpp @@ -125,7 +125,7 @@ struct PyPushPullInputAdapter_PyObject static PyMethodDef PyPushPullInputAdapter_PyObject_methods[] = { { "push_tick", (PyCFunction) PyPushPullInputAdapter_PyObject::pushTick, METH_VARARGS, "push new tick" }, { "flag_replay_complete", (PyCFunction) PyPushPullInputAdapter_PyObject::flagReplayComplete, METH_VARARGS, "finish replay ticks" }, - { "_engine_shutdown", (PyCFunction) PyEngine_shutdown, METH_VARARGS, "engine shutdown" }, + { "shutdown_engine", (PyCFunction) PyEngine_shutdown, METH_VARARGS, "shutdown engine" }, {NULL} }; diff --git a/csp/impl/adaptermanager.py b/csp/impl/adaptermanager.py index 10c57bc2e..9ec42d0e0 100644 --- a/csp/impl/adaptermanager.py +++ b/csp/impl/adaptermanager.py @@ -2,7 +2,6 @@ import csp from csp.impl.__cspimpl import _cspimpl -from csp.impl.error_handling import format_engine_shutdown_stack class AdapterManagerImpl(_cspimpl.PyAdapterManager): @@ -25,9 +24,6 @@ def process_next_sim_timeslice(self, now): """ return None - def engine_shutdown(self, msg): - self._engine_shutdown(Exception('test')) # temporary for testing purposes - class ManagedSimInputAdapter(_cspimpl.PyManagedSimInputAdapter): def __init__(self, typ, field_map): diff --git a/csp/impl/error_handling.py b/csp/impl/error_handling.py index b4a0303b2..e662b7eb8 100644 --- a/csp/impl/error_handling.py +++ b/csp/impl/error_handling.py @@ -1,6 +1,5 @@ import ast import os -import traceback import csp.impl from csp.impl.__cspimpl import _cspimpl @@ -49,13 +48,3 @@ def set_print_full_exception_stack(new_value: bool): res = ExceptionContext.PRINT_EXCEPTION_FULL_STACK ExceptionContext.PRINT_EXCEPTION_FULL_STACK = new_value return res - - -def format_engine_shutdown_stack(msg: str): - tb = traceback.format_stack() - tb = tb[:-2] # remove traceback call and internal engine shutdown method - if len(tb) > 1: - tb = tb[-2:] # only keep the current function and the caller (adapter manager) - tb.append(msg) # add the user's error message - tb = "".join(tb) # format into a single string - return tb diff --git a/csp/impl/pushadapter.py b/csp/impl/pushadapter.py index e9fe48fcd..b64356738 100644 --- a/csp/impl/pushadapter.py +++ b/csp/impl/pushadapter.py @@ -1,5 +1,4 @@ from csp.impl.__cspimpl import _cspimpl -from csp.impl.error_handling import format_engine_shutdown_stack PushGroup = _cspimpl.PushGroup PushBatch = _cspimpl.PushBatch @@ -12,8 +11,5 @@ def start(self, starttime, endtime): def stop(self): pass - def engine_shutdown(self, msg): - self._engine_shutdown(format_engine_shutdown_stack(msg)) - # base class # def push_tick( self, value ) diff --git a/csp/impl/pushpulladapter.py b/csp/impl/pushpulladapter.py index c92f85411..f2beccd3a 100644 --- a/csp/impl/pushpulladapter.py +++ b/csp/impl/pushpulladapter.py @@ -1,5 +1,4 @@ from csp.impl.__cspimpl import _cspimpl -from csp.impl.error_handling import format_engine_shutdown_stack PushGroup = _cspimpl.PushGroup PushBatch = _cspimpl.PushBatch @@ -12,8 +11,5 @@ def start(self, starttime, endtime): def stop(self): pass - def engine_shutdown(self, msg): - self._engine_shutdown(format_engine_shutdown_stack(msg)) - # base class # def push_tick( self, time, value ) diff --git a/csp/tests/impl/test_pushadapter.py b/csp/tests/impl/test_pushadapter.py index b77f38f65..0ac9bf98b 100644 --- a/csp/tests/impl/test_pushadapter.py +++ b/csp/tests/impl/test_pushadapter.py @@ -261,7 +261,7 @@ def _run(self): while self._running: if pushed: time.sleep(0.1) - self.engine_shutdown("Dummy exception message") + self.shutdown_engine(TypeError("Dummy exception message")) else: self.push_tick(0) pushed = True @@ -281,7 +281,7 @@ def graph(): node(adapter) csp.print("adapter", adapter) - with self.assertRaisesRegex(Exception, "Dummy exception message"): + with self.assertRaisesRegex(TypeError, "Dummy exception message"): csp.run(graph, starttime=datetime.utcnow(), realtime=True) self.assertEqual(status["count"], 1) diff --git a/csp/tests/impl/test_pushpulladapter.py b/csp/tests/impl/test_pushpulladapter.py index 92557a2a8..b52ce0755 100644 --- a/csp/tests/impl/test_pushpulladapter.py +++ b/csp/tests/impl/test_pushpulladapter.py @@ -166,7 +166,7 @@ def _run(self): while self._running and idx < len(self._data): if idx and self._shutdown_before_live: time.sleep(0.1) - self.engine_shutdown("Dummy exception message") + self.shutdown_engine(ValueError("Dummy exception message")) t, v = self._data[idx] self.push_tick(False, t, v) idx += 1 @@ -177,7 +177,7 @@ def _run(self): self.push_tick(True, datetime.utcnow(), len(self._data) + 1) if idx and not self._shutdown_before_live: time.sleep(0.1) - self.engine_shutdown("Dummy exception message") + self.shutdown_engine(TypeError("Dummy exception message")) idx += 1 MyPushPullAdapter = py_pushpull_adapter_def( @@ -190,9 +190,9 @@ def graph(shutdown_before_live: bool): adapter = MyPushPullAdapter(int, data, shutdown_before_live) csp.print("adapter", adapter) - with self.assertRaisesRegex(Exception, "Dummy exception message"): + with self.assertRaisesRegex(ValueError, "Dummy exception message"): csp.run(graph, True, starttime=datetime(2020, 1, 1, 1)) - with self.assertRaisesRegex(Exception, "Dummy exception message"): + with self.assertRaisesRegex(TypeError, "Dummy exception message"): csp.run( graph, False, diff --git a/csp/tests/test_engine.py b/csp/tests/test_engine.py index 9b0698333..88b3db9ba 100644 --- a/csp/tests/test_engine.py +++ b/csp/tests/test_engine.py @@ -825,7 +825,7 @@ def stop(self): pass def process_next_sim_timeslice(self, now): - self.engine_shutdown("Dummy exception message") + self.shutdown_engine(ValueError("Dummy exception message")) class TestAdapterImpl(ManagedSimInputAdapter): def __init__(self, manager_impl): @@ -838,7 +838,7 @@ def graph(): nc = adapter.subscribe() csp.add_graph_output("nc", nc) - with self.assertRaisesRegex(Exception, "Dummy exception message"): + with self.assertRaisesRegex(ValueError, "Dummy exception message"): csp.run(graph, starttime=datetime(2020, 1, 1), endtime=timedelta(seconds=1)) def test_feedback(self): From 34a1db1e6acb4bbfdef491258f507cb89cea3025 Mon Sep 17 00:00:00 2001 From: "Gajowniczek, Artur" Date: Fri, 12 Jul 2024 11:18:56 -0400 Subject: [PATCH 05/10] Add a test for traceback and update wiki Signed-off-by: Gajowniczek, Artur --- csp/tests/test_engine.py | 11 +++++++++-- .../wiki/how-tos/Write-Realtime-Input-Adapters.md | 15 +++++++++++++++ 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/csp/tests/test_engine.py b/csp/tests/test_engine.py index 88b3db9ba..d619ac5f8 100644 --- a/csp/tests/test_engine.py +++ b/csp/tests/test_engine.py @@ -825,7 +825,10 @@ def stop(self): pass def process_next_sim_timeslice(self, now): - self.shutdown_engine(ValueError("Dummy exception message")) + try: + [].pop() + except IndexError as e: + self.shutdown_engine(e) class TestAdapterImpl(ManagedSimInputAdapter): def __init__(self, manager_impl): @@ -838,8 +841,12 @@ def graph(): nc = adapter.subscribe() csp.add_graph_output("nc", nc) - with self.assertRaisesRegex(ValueError, "Dummy exception message"): + try: csp.run(graph, starttime=datetime(2020, 1, 1), endtime=timedelta(seconds=1)) + except IndexError: + tb = traceback.format_exc() + + self.assertTrue("[].pop()" in tb) def test_feedback(self): # Dummy example diff --git a/docs/wiki/how-tos/Write-Realtime-Input-Adapters.md b/docs/wiki/how-tos/Write-Realtime-Input-Adapters.md index 2f0195d5a..9a0e12e60 100644 --- a/docs/wiki/how-tos/Write-Realtime-Input-Adapters.md +++ b/docs/wiki/how-tos/Write-Realtime-Input-Adapters.md @@ -405,3 +405,18 @@ csp.run(my_graph, starttime=datetime.utcnow(), endtime=timedelta(seconds=10), re ``` Do note that realtime adapters will only run in realtime engines (note the `realtime=True` argument to `csp.run`). + +## Engine shutdown + +In case a pushing thread hits a terminal error, an exception can be passed to the main engine thread to shut down gracefully through a `shutdown_engine(exc: Exception)` method exposed by `PushInputAdapter`, `PushPullInputAdapter` and `AdapterManagerImpl`. + +For example: + +```python +def _run(self): + while self._running: + try: + requests.get(endpoint) # API call over a network, may fail + catch Exception as exc: + self.shutdown_engine(exc) +``` From 6a7b202f331a34223282d28d4fd686e9d9c39192 Mon Sep 17 00:00:00 2001 From: "Gajowniczek, Artur" Date: Tue, 16 Jul 2024 08:02:59 -0400 Subject: [PATCH 06/10] assert function name in traceback, shutdown engine if not an exception is passed Signed-off-by: Gajowniczek, Artur --- cpp/csp/python/PyAdapterManager.cpp | 9 ++++++--- csp/tests/test_engine.py | 2 +- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/cpp/csp/python/PyAdapterManager.cpp b/cpp/csp/python/PyAdapterManager.cpp index 842483aa2..2a22276a6 100644 --- a/cpp/csp/python/PyAdapterManager.cpp +++ b/cpp/csp/python/PyAdapterManager.cpp @@ -77,9 +77,12 @@ static PyObject * PyAdapterManager_PyObject_shutdown_engine( PyAdapterManager_Py return NULL; if( !PyExceptionInstance_Check( pyException ) ) - CSP_THROW( TypeError, "Expected Exception object as argument for shutdown_engine: got " << Py_TYPE( pyException ) -> tp_name ); - - self -> manager -> rootEngine() -> shutdown( std::make_exception_ptr( PythonPassthrough( pyException ) ) ); + { + std::string desc = "Expected Exception object as argument for shutdown_engine: got " + std::string( Py_TYPE( pyException ) -> tp_name ); + self -> manager -> rootEngine() -> shutdown( std::make_exception_ptr( csp::Exception( "TypeError", desc ) ) ); + } + else + self -> manager -> rootEngine() -> shutdown( std::make_exception_ptr( PythonPassthrough( pyException ) ) ); CSP_RETURN_NONE; } diff --git a/csp/tests/test_engine.py b/csp/tests/test_engine.py index d619ac5f8..a8ec0a965 100644 --- a/csp/tests/test_engine.py +++ b/csp/tests/test_engine.py @@ -846,7 +846,7 @@ def graph(): except IndexError: tb = traceback.format_exc() - self.assertTrue("[].pop()" in tb) + self.assertTrue("[].pop()" in tb and "process_next_sim_timeslice" in tb) def test_feedback(self): # Dummy example From b0be48801077423a61d2840db955b73b7aa02a4b Mon Sep 17 00:00:00 2001 From: "Gajowniczek, Artur" Date: Wed, 17 Jul 2024 15:47:14 -0400 Subject: [PATCH 07/10] Added shutdown on wrong type to PyEngine_shutdown Signed-off-by: Gajowniczek, Artur --- cpp/csp/python/PyEngine.h | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/cpp/csp/python/PyEngine.h b/cpp/csp/python/PyEngine.h index 931792293..9118ff265 100644 --- a/cpp/csp/python/PyEngine.h +++ b/cpp/csp/python/PyEngine.h @@ -66,9 +66,12 @@ PyObject * PyEngine_shutdown( T * self, PyObject * args, PyObject * kwargs ) return NULL; if( !PyExceptionInstance_Check( pyException ) ) - CSP_THROW( TypeError, "Expected Exception object as argument for shutdown_engine: got " << Py_TYPE( pyException ) -> tp_name ); - - self -> adapter -> rootEngine() -> shutdown( std::make_exception_ptr( PythonPassthrough( pyException ) ) ); + { + std::string desc = "Expected Exception object as argument for shutdown_engine: got " + std::string( Py_TYPE( pyException ) -> tp_name ); + self -> adapter -> rootEngine() -> shutdown( std::make_exception_ptr( csp::Exception( "TypeError", desc ) ) ); + } + else + self -> adapter -> rootEngine() -> shutdown( std::make_exception_ptr( PythonPassthrough( pyException ) ) ); CSP_RETURN_NONE; } From ad58a75a1f0148131c49a98d4ae0bb6ff5070475 Mon Sep 17 00:00:00 2001 From: "Gajowniczek, Artur" Date: Tue, 23 Jul 2024 15:07:41 -0400 Subject: [PATCH 08/10] clearer error msg, move creating exception logic to separate function, doc fix Signed-off-by: Gajowniczek, Artur --- cpp/csp/python/PyAdapterManager.cpp | 8 +------ cpp/csp/python/PyEngine.h | 22 +++++++++++++------ .../how-tos/Write-Realtime-Input-Adapters.md | 2 +- 3 files changed, 17 insertions(+), 15 deletions(-) diff --git a/cpp/csp/python/PyAdapterManager.cpp b/cpp/csp/python/PyAdapterManager.cpp index 2a22276a6..5544436f6 100644 --- a/cpp/csp/python/PyAdapterManager.cpp +++ b/cpp/csp/python/PyAdapterManager.cpp @@ -76,13 +76,7 @@ static PyObject * PyAdapterManager_PyObject_shutdown_engine( PyAdapterManager_Py if( !PyArg_ParseTuple( args, "O", &pyException ) ) return NULL; - if( !PyExceptionInstance_Check( pyException ) ) - { - std::string desc = "Expected Exception object as argument for shutdown_engine: got " + std::string( Py_TYPE( pyException ) -> tp_name ); - self -> manager -> rootEngine() -> shutdown( std::make_exception_ptr( csp::Exception( "TypeError", desc ) ) ); - } - else - self -> manager -> rootEngine() -> shutdown( std::make_exception_ptr( PythonPassthrough( pyException ) ) ); + self -> manager -> rootEngine() -> shutdown( PyEngine_shutdownMakeException( pyException ) ); CSP_RETURN_NONE; } diff --git a/cpp/csp/python/PyEngine.h b/cpp/csp/python/PyEngine.h index 9118ff265..196af0022 100644 --- a/cpp/csp/python/PyEngine.h +++ b/cpp/csp/python/PyEngine.h @@ -55,6 +55,20 @@ class CSPIMPL_EXPORT PyEngine final: public PyObject Engine * m_engine; }; +inline std::exception_ptr PyEngine_shutdownMakeException( PyObject * pyException ) +{ + if( !PyExceptionInstance_Check( pyException ) ) + { + PyObject* pyExceptionStr = PyObject_Str( pyException ); + std::string pyExceptionString = PyUnicode_AsUTF8( pyExceptionStr ); + std::string desc = "Expected Exception object as argument for shutdown_engine: got " + pyExceptionString + "of type " + Py_TYPE( pyException ) -> tp_name; + Py_DECREF(pyExceptionStr); + return std::make_exception_ptr( csp::Exception( "TypeError", desc ) ); + } + else + return std::make_exception_ptr( PythonPassthrough( pyException ) ); +} + // Generic engine shutdown function for push-type adapters template PyObject * PyEngine_shutdown( T * self, PyObject * args, PyObject * kwargs ) @@ -65,13 +79,7 @@ PyObject * PyEngine_shutdown( T * self, PyObject * args, PyObject * kwargs ) if( !PyArg_ParseTuple( args, "O", &pyException ) ) return NULL; - if( !PyExceptionInstance_Check( pyException ) ) - { - std::string desc = "Expected Exception object as argument for shutdown_engine: got " + std::string( Py_TYPE( pyException ) -> tp_name ); - self -> adapter -> rootEngine() -> shutdown( std::make_exception_ptr( csp::Exception( "TypeError", desc ) ) ); - } - else - self -> adapter -> rootEngine() -> shutdown( std::make_exception_ptr( PythonPassthrough( pyException ) ) ); + self -> adapter -> rootEngine() -> shutdown( PyEngine_shutdownMakeException( pyException ) ); CSP_RETURN_NONE; } diff --git a/docs/wiki/how-tos/Write-Realtime-Input-Adapters.md b/docs/wiki/how-tos/Write-Realtime-Input-Adapters.md index 9a0e12e60..3576c2509 100644 --- a/docs/wiki/how-tos/Write-Realtime-Input-Adapters.md +++ b/docs/wiki/how-tos/Write-Realtime-Input-Adapters.md @@ -417,6 +417,6 @@ def _run(self): while self._running: try: requests.get(endpoint) # API call over a network, may fail - catch Exception as exc: + except Exception as exc: self.shutdown_engine(exc) ``` From 9c89a39e0a44edded652af794824cf4f846973fd Mon Sep 17 00:00:00 2001 From: "Gajowniczek, Artur" Date: Wed, 24 Jul 2024 09:01:17 -0400 Subject: [PATCH 09/10] use PyObjectPtr, add space to error msg Signed-off-by: Gajowniczek, Artur --- cpp/csp/python/PyAdapterManager.cpp | 2 +- cpp/csp/python/PyEngine.h | 13 +++++++------ 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/cpp/csp/python/PyAdapterManager.cpp b/cpp/csp/python/PyAdapterManager.cpp index 5544436f6..367fc4bf4 100644 --- a/cpp/csp/python/PyAdapterManager.cpp +++ b/cpp/csp/python/PyAdapterManager.cpp @@ -76,7 +76,7 @@ static PyObject * PyAdapterManager_PyObject_shutdown_engine( PyAdapterManager_Py if( !PyArg_ParseTuple( args, "O", &pyException ) ) return NULL; - self -> manager -> rootEngine() -> shutdown( PyEngine_shutdownMakeException( pyException ) ); + self -> manager -> rootEngine() -> shutdown( PyEngine_shutdown_make_exception( pyException ) ); CSP_RETURN_NONE; } diff --git a/cpp/csp/python/PyEngine.h b/cpp/csp/python/PyEngine.h index 196af0022..6c8dee3a7 100644 --- a/cpp/csp/python/PyEngine.h +++ b/cpp/csp/python/PyEngine.h @@ -55,14 +55,15 @@ class CSPIMPL_EXPORT PyEngine final: public PyObject Engine * m_engine; }; -inline std::exception_ptr PyEngine_shutdownMakeException( PyObject * pyException ) +inline std::exception_ptr PyEngine_shutdown_make_exception( PyObject * pyException ) { if( !PyExceptionInstance_Check( pyException ) ) { - PyObject* pyExceptionStr = PyObject_Str( pyException ); - std::string pyExceptionString = PyUnicode_AsUTF8( pyExceptionStr ); - std::string desc = "Expected Exception object as argument for shutdown_engine: got " + pyExceptionString + "of type " + Py_TYPE( pyException ) -> tp_name; - Py_DECREF(pyExceptionStr); + PyObjectPtr pyExceptionStr = PyObjectPtr::own( PyObject_Str( pyException ) ); + if( !pyExceptionStr.ptr() ) + CSP_THROW( PythonPassthrough, "" ); + std::string pyExceptionString = PyUnicode_AsUTF8( pyExceptionStr.ptr() ); + std::string desc = "Expected Exception object as argument for shutdown_engine: got " + pyExceptionString + " of type " + Py_TYPE( pyException ) -> tp_name; return std::make_exception_ptr( csp::Exception( "TypeError", desc ) ); } else @@ -79,7 +80,7 @@ PyObject * PyEngine_shutdown( T * self, PyObject * args, PyObject * kwargs ) if( !PyArg_ParseTuple( args, "O", &pyException ) ) return NULL; - self -> adapter -> rootEngine() -> shutdown( PyEngine_shutdownMakeException( pyException ) ); + self -> adapter -> rootEngine() -> shutdown( PyEngine_shutdown_make_exception( pyException ) ); CSP_RETURN_NONE; } From 52ee0e10fcba03c82e6a16fd49660261924715a2 Mon Sep 17 00:00:00 2001 From: "Gajowniczek, Artur" Date: Thu, 25 Jul 2024 08:42:34 -0400 Subject: [PATCH 10/10] use METH_O in shutdown_engine Signed-off-by: Gajowniczek, Artur --- cpp/csp/python/PyAdapterManager.cpp | 8 ++------ cpp/csp/python/PyEngine.h | 15 --------------- cpp/csp/python/PyPushInputAdapter.cpp | 11 ++++++++++- cpp/csp/python/PyPushPullInputAdapter.cpp | 11 ++++++++++- 4 files changed, 22 insertions(+), 23 deletions(-) diff --git a/cpp/csp/python/PyAdapterManager.cpp b/cpp/csp/python/PyAdapterManager.cpp index 367fc4bf4..d6547e135 100644 --- a/cpp/csp/python/PyAdapterManager.cpp +++ b/cpp/csp/python/PyAdapterManager.cpp @@ -68,13 +68,9 @@ class PyAdapterManager : public AdapterManager static PyObject * PyAdapterManager_PyObject_starttime( PyAdapterManager_PyObject * self ) { return toPython( self -> manager -> starttime() ); } static PyObject * PyAdapterManager_PyObject_endtime( PyAdapterManager_PyObject * self ) { return toPython( self -> manager -> endtime() ); } -static PyObject * PyAdapterManager_PyObject_shutdown_engine( PyAdapterManager_PyObject * self, PyObject * args, PyObject * kwargs ) +static PyObject * PyAdapterManager_PyObject_shutdown_engine( PyAdapterManager_PyObject * self, PyObject * pyException ) { CSP_BEGIN_METHOD; - - PyObject * pyException; - if( !PyArg_ParseTuple( args, "O", &pyException ) ) - return NULL; self -> manager -> rootEngine() -> shutdown( PyEngine_shutdown_make_exception( pyException ) ); @@ -98,7 +94,7 @@ static int PyAdapterManager_init( PyAdapterManager_PyObject *self, PyObject *arg static PyMethodDef PyAdapterManager_methods[] = { { "starttime", (PyCFunction) PyAdapterManager_PyObject_starttime, METH_NOARGS, "starttime" }, { "endtime", (PyCFunction) PyAdapterManager_PyObject_endtime, METH_NOARGS, "endtime" }, - { "shutdown_engine", (PyCFunction) PyAdapterManager_PyObject_shutdown_engine, METH_VARARGS, "shutdown_engine" }, + { "shutdown_engine", (PyCFunction) PyAdapterManager_PyObject_shutdown_engine, METH_O, "shutdown_engine" }, {NULL} }; diff --git a/cpp/csp/python/PyEngine.h b/cpp/csp/python/PyEngine.h index 6c8dee3a7..d58289501 100644 --- a/cpp/csp/python/PyEngine.h +++ b/cpp/csp/python/PyEngine.h @@ -70,21 +70,6 @@ inline std::exception_ptr PyEngine_shutdown_make_exception( PyObject * pyExcepti return std::make_exception_ptr( PythonPassthrough( pyException ) ); } -// Generic engine shutdown function for push-type adapters -template -PyObject * PyEngine_shutdown( T * self, PyObject * args, PyObject * kwargs ) -{ - CSP_BEGIN_METHOD; - - PyObject * pyException; - if( !PyArg_ParseTuple( args, "O", &pyException ) ) - return NULL; - - self -> adapter -> rootEngine() -> shutdown( PyEngine_shutdown_make_exception( pyException ) ); - - CSP_RETURN_NONE; -} - }; #endif diff --git a/cpp/csp/python/PyPushInputAdapter.cpp b/cpp/csp/python/PyPushInputAdapter.cpp index a9b01b446..cd034844a 100644 --- a/cpp/csp/python/PyPushInputAdapter.cpp +++ b/cpp/csp/python/PyPushInputAdapter.cpp @@ -198,12 +198,21 @@ struct PyPushInputAdapter_PyObject CSP_RETURN_NONE; } + static PyObject * shutdown_engine( PyPushInputAdapter_PyObject * self, PyObject * pyException ) + { + CSP_BEGIN_METHOD; + + self -> adapter -> rootEngine() -> shutdown( PyEngine_shutdown_make_exception( pyException ) ); + + CSP_RETURN_NONE; + } + static PyTypeObject PyType; }; static PyMethodDef PyPushInputAdapter_PyObject_methods[] = { { "push_tick", (PyCFunction) PyPushInputAdapter_PyObject::pushTick, METH_VARARGS, "push new tick" }, - { "shutdown_engine", (PyCFunction) PyEngine_shutdown, METH_VARARGS, "shutdown_engine" }, + { "shutdown_engine", (PyCFunction) PyPushInputAdapter_PyObject::shutdown_engine, METH_O, "shutdown_engine" }, {NULL} }; diff --git a/cpp/csp/python/PyPushPullInputAdapter.cpp b/cpp/csp/python/PyPushPullInputAdapter.cpp index 0f4ee5de9..71190660c 100644 --- a/cpp/csp/python/PyPushPullInputAdapter.cpp +++ b/cpp/csp/python/PyPushPullInputAdapter.cpp @@ -119,13 +119,22 @@ struct PyPushPullInputAdapter_PyObject CSP_RETURN_NONE; } + static PyObject * shutdown_engine( PyPushPullInputAdapter_PyObject * self, PyObject * pyException ) + { + CSP_BEGIN_METHOD; + + self -> adapter -> rootEngine() -> shutdown( PyEngine_shutdown_make_exception( pyException ) ); + + CSP_RETURN_NONE; + } + static PyTypeObject PyType; }; static PyMethodDef PyPushPullInputAdapter_PyObject_methods[] = { { "push_tick", (PyCFunction) PyPushPullInputAdapter_PyObject::pushTick, METH_VARARGS, "push new tick" }, { "flag_replay_complete", (PyCFunction) PyPushPullInputAdapter_PyObject::flagReplayComplete, METH_VARARGS, "finish replay ticks" }, - { "shutdown_engine", (PyCFunction) PyEngine_shutdown, METH_VARARGS, "shutdown engine" }, + { "shutdown_engine", (PyCFunction) PyPushPullInputAdapter_PyObject::shutdown_engine, METH_O, "shutdown engine" }, {NULL} };