From 247f83124a396e7068e1688d012ce407074325ad Mon Sep 17 00:00:00 2001 From: Adam Glustein Date: Mon, 8 Jul 2024 16:07:32 -0400 Subject: [PATCH] Expose engine shutdown method for Python push-type adapters Co-authored-by: Artur Gajowniczek Signed-off-by: Adam Glustein --- cpp/csp/python/Exception.h | 11 +++++ cpp/csp/python/PyAdapterManager.cpp | 18 +++++++- cpp/csp/python/PyEngine.h | 16 +++++++ cpp/csp/python/PyPushInputAdapter.cpp | 3 +- cpp/csp/python/PyPushPullInputAdapter.cpp | 5 +- csp/impl/adaptermanager.py | 4 ++ csp/impl/error_handling.py | 12 +++++ csp/impl/pushadapter.py | 4 ++ csp/impl/pushpulladapter.py | 4 ++ csp/tests/impl/test_pushadapter.py | 44 ++++++++++++++++++ csp/tests/impl/test_pushpulladapter.py | 56 +++++++++++++++++++++++ csp/tests/test_engine.py | 42 +++++++++++++++++ 12 files changed, 214 insertions(+), 5 deletions(-) diff --git a/cpp/csp/python/Exception.h b/cpp/csp/python/Exception.h index 104a3509..1355c6f6 100644 --- a/cpp/csp/python/Exception.h +++ b/cpp/csp/python/Exception.h @@ -8,6 +8,17 @@ namespace csp::python { +class TracebackStringException : public std::exception +{ +public: + TracebackStringException( const char * message ) : m_message( message ) {} + + const char * what() const noexcept override { return m_message; } + +private: + const char * m_message; +}; + class PythonPassthrough : public csp::Exception { public: diff --git a/cpp/csp/python/PyAdapterManager.cpp b/cpp/csp/python/PyAdapterManager.cpp index 3a9ec721..c412a0ee 100644 --- a/cpp/csp/python/PyAdapterManager.cpp +++ b/cpp/csp/python/PyAdapterManager.cpp @@ -68,6 +68,19 @@ class PyAdapterManager : public AdapterManager static PyObject * PyAdapterManager_PyObject_starttime( PyAdapterManager_PyObject * self ) { return toPython( self -> manager -> starttime() ); } static PyObject * PyAdapterManager_PyObject_endtime( PyAdapterManager_PyObject * self ) { return toPython( self -> manager -> endtime() ); } +static PyObject * PyAdapterManager_PyObject__engine_shutdown( PyAdapterManager_PyObject * self, PyObject * args, PyObject * kwargs) +{ + CSP_BEGIN_METHOD; + + const char * msg; + if( !PyArg_ParseTuple( args, "s", &msg ) ) + return NULL; + + self -> manager -> rootEngine() -> shutdown( std::make_exception_ptr( TracebackStringException( msg ) ) ); + + CSP_RETURN_NONE; +} + static int PyAdapterManager_init( PyAdapterManager_PyObject *self, PyObject *args, PyObject *kwds ) { CSP_BEGIN_METHOD; @@ -83,8 +96,9 @@ static int PyAdapterManager_init( PyAdapterManager_PyObject *self, PyObject *arg } static PyMethodDef PyAdapterManager_methods[] = { - { "starttime", (PyCFunction) PyAdapterManager_PyObject_starttime, METH_NOARGS, "starttime" }, - { "endtime", (PyCFunction) PyAdapterManager_PyObject_endtime, METH_NOARGS, "endtime" }, + { "starttime", (PyCFunction) PyAdapterManager_PyObject_starttime, METH_NOARGS, "starttime" }, + { "endtime", (PyCFunction) PyAdapterManager_PyObject_endtime, METH_NOARGS, "endtime" }, + { "_engine_shutdown", (PyCFunction) PyAdapterManager_PyObject__engine_shutdown, METH_VARARGS, "_engine_shutdown" }, {NULL} }; diff --git a/cpp/csp/python/PyEngine.h b/cpp/csp/python/PyEngine.h index 4a694c0c..e059e81c 100644 --- a/cpp/csp/python/PyEngine.h +++ b/cpp/csp/python/PyEngine.h @@ -2,6 +2,7 @@ #define _IN_CSP_PYTHON_PYENGINE_H #include +#include #include #include #include @@ -54,6 +55,21 @@ class CSPIMPL_EXPORT PyEngine final: public PyObject Engine * m_engine; }; +// Generic engine shutdown function for push-type adapters +template +PyObject * PyEngine_shutdown( T * self, PyObject * args, PyObject * kwargs ) +{ + CSP_BEGIN_METHOD; + + const char * msg; + if( !PyArg_ParseTuple( args, "s", &msg ) ) + return NULL; + + self -> adapter -> rootEngine() -> shutdown( std::make_exception_ptr( TracebackStringException( msg ) ) ); + + CSP_RETURN_NONE; +} + }; #endif diff --git a/cpp/csp/python/PyPushInputAdapter.cpp b/cpp/csp/python/PyPushInputAdapter.cpp index 3b5f91b0..013e78db 100644 --- a/cpp/csp/python/PyPushInputAdapter.cpp +++ b/cpp/csp/python/PyPushInputAdapter.cpp @@ -202,7 +202,8 @@ struct PyPushInputAdapter_PyObject }; static PyMethodDef PyPushInputAdapter_PyObject_methods[] = { - { "push_tick", (PyCFunction) PyPushInputAdapter_PyObject::pushTick, METH_VARARGS, "push new tick" }, + { "push_tick", (PyCFunction) PyPushInputAdapter_PyObject::pushTick, METH_VARARGS, "push new tick" }, + { "_engine_shutdown", (PyCFunction) PyEngine_shutdown, METH_VARARGS, "engine shutdown" }, {NULL} }; diff --git a/cpp/csp/python/PyPushPullInputAdapter.cpp b/cpp/csp/python/PyPushPullInputAdapter.cpp index d53ed972..97199907 100644 --- a/cpp/csp/python/PyPushPullInputAdapter.cpp +++ b/cpp/csp/python/PyPushPullInputAdapter.cpp @@ -123,8 +123,9 @@ struct PyPushPullInputAdapter_PyObject }; static PyMethodDef PyPushPullInputAdapter_PyObject_methods[] = { - { "push_tick", (PyCFunction) PyPushPullInputAdapter_PyObject::pushTick, METH_VARARGS, "push new tick" }, - { "flag_replay_complete", (PyCFunction) PyPushPullInputAdapter_PyObject::flagReplayComplete, METH_VARARGS, "finish replay ticks" }, + { "push_tick", (PyCFunction) PyPushPullInputAdapter_PyObject::pushTick, METH_VARARGS, "push new tick" }, + { "flag_replay_complete", (PyCFunction) PyPushPullInputAdapter_PyObject::flagReplayComplete, METH_VARARGS, "finish replay ticks" }, + { "_engine_shutdown", (PyCFunction) PyEngine_shutdown, METH_VARARGS, "engine shutdown" }, {NULL} }; diff --git a/csp/impl/adaptermanager.py b/csp/impl/adaptermanager.py index 9ec42d0e..b53df833 100644 --- a/csp/impl/adaptermanager.py +++ b/csp/impl/adaptermanager.py @@ -2,6 +2,7 @@ import csp from csp.impl.__cspimpl import _cspimpl +from csp.impl.error_handling import format_engine_shutdown_stack class AdapterManagerImpl(_cspimpl.PyAdapterManager): @@ -24,6 +25,9 @@ def process_next_sim_timeslice(self, now): """ return None + def engine_shutdown(self, msg): + self._engine_shutdown(format_engine_shutdown_stack(msg)) + class ManagedSimInputAdapter(_cspimpl.PyManagedSimInputAdapter): def __init__(self, typ, field_map): diff --git a/csp/impl/error_handling.py b/csp/impl/error_handling.py index e662b7eb..8cf2eadb 100644 --- a/csp/impl/error_handling.py +++ b/csp/impl/error_handling.py @@ -1,5 +1,7 @@ import ast import os +import traceback +from typing import List import csp.impl from csp.impl.__cspimpl import _cspimpl @@ -48,3 +50,13 @@ def set_print_full_exception_stack(new_value: bool): res = ExceptionContext.PRINT_EXCEPTION_FULL_STACK ExceptionContext.PRINT_EXCEPTION_FULL_STACK = new_value return res + + +def format_engine_shutdown_stack(msg: str): + tb = traceback.format_stack() + tb = tb[:-2] # remove traceback call and internal engine shutdown method + if len(tb) > 1: + tb = tb[-2:] # only keep the current function and the caller (adapter manager) + tb.append(msg) # add the user's error message + tb = "".join(tb) # format into a single string + return tb diff --git a/csp/impl/pushadapter.py b/csp/impl/pushadapter.py index b6435673..e9fe48fc 100644 --- a/csp/impl/pushadapter.py +++ b/csp/impl/pushadapter.py @@ -1,4 +1,5 @@ from csp.impl.__cspimpl import _cspimpl +from csp.impl.error_handling import format_engine_shutdown_stack PushGroup = _cspimpl.PushGroup PushBatch = _cspimpl.PushBatch @@ -11,5 +12,8 @@ def start(self, starttime, endtime): def stop(self): pass + def engine_shutdown(self, msg): + self._engine_shutdown(format_engine_shutdown_stack(msg)) + # base class # def push_tick( self, value ) diff --git a/csp/impl/pushpulladapter.py b/csp/impl/pushpulladapter.py index f2beccd3..c92f8541 100644 --- a/csp/impl/pushpulladapter.py +++ b/csp/impl/pushpulladapter.py @@ -1,4 +1,5 @@ from csp.impl.__cspimpl import _cspimpl +from csp.impl.error_handling import format_engine_shutdown_stack PushGroup = _cspimpl.PushGroup PushBatch = _cspimpl.PushBatch @@ -11,5 +12,8 @@ def start(self, starttime, endtime): def stop(self): pass + def engine_shutdown(self, msg): + self._engine_shutdown(format_engine_shutdown_stack(msg)) + # base class # def push_tick( self, time, value ) diff --git a/csp/tests/impl/test_pushadapter.py b/csp/tests/impl/test_pushadapter.py index 1aa7c056..ed8a2c38 100644 --- a/csp/tests/impl/test_pushadapter.py +++ b/csp/tests/impl/test_pushadapter.py @@ -240,6 +240,50 @@ def graph(): result = list(x[1] for x in result) self.assertEqual(result, expected) + def test_adapter_engine_shutdown(self): + class MyPushAdapterImpl(PushInputAdapter): + def __init__(self): + self._thread = None + self._running = False + + def start(self, starttime, endtime): + self._running = True + self._thread = threading.Thread(target=self._run) + self._thread.start() + + def stop(self): + if self._running: + self._running = False + self._thread.join() + + def _run(self): + pushed = False + while self._running: + if pushed: + self.engine_shutdown("Dummy exception message") + else: + self.push_tick(0) + pushed = True + + MyPushAdapter = py_push_adapter_def("MyPushAdapter", MyPushAdapterImpl, ts[int]) + + status = {"count": 0} + + @csp.node + def node(x: ts[object]): + if csp.ticked(x): + status["count"] += 1 + + @csp.graph + def graph(): + adapter = MyPushAdapter() + node(adapter) + csp.print("adapter", adapter) + + with self.assertRaisesRegex(Exception, "Dummy exception message"): + csp.run(graph, starttime=datetime.utcnow(), realtime=True) + self.assertEqual(status["count"], 1) + if __name__ == "__main__": unittest.main() diff --git a/csp/tests/impl/test_pushpulladapter.py b/csp/tests/impl/test_pushpulladapter.py index 74faff89..132e84da 100644 --- a/csp/tests/impl/test_pushpulladapter.py +++ b/csp/tests/impl/test_pushpulladapter.py @@ -143,6 +143,62 @@ def graph(): result = [out[1] for out in graph_out[0]] self.assertEqual(result, [1, 2, 3]) + def test_adapter_engine_shutdown(self): + class MyPushPullAdapterImpl(PushPullInputAdapter): + def __init__(self, typ, data, shutdown_before_live): + self._data = data + self._thread = None + self._running = False + self._shutdown_before_live = shutdown_before_live + + def start(self, starttime, endtime): + self._running = True + self._thread = threading.Thread(target=self._run) + self._thread.start() + + def stop(self): + if self._running: + self._running = False + self._thread.join() + + def _run(self): + idx = 0 + while self._running and idx < len(self._data): + if idx and self._shutdown_before_live: + self.engine_shutdown("Dummy exception message") + t, v = self._data[idx] + self.push_tick(False, t, v) + idx += 1 + self.flag_replay_complete() + + idx = 0 + while self._running: + self.push_tick(True, datetime.utcnow(), len(self._data) + 1) + if idx and not self._shutdown_before_live: + self.engine_shutdown("Dummy exception message") + idx += 1 + + MyPushPullAdapter = py_pushpull_adapter_def( + "MyPushPullAdapter", MyPushPullAdapterImpl, ts["T"], typ="T", data=list, shutdown_before_live=bool + ) + + @csp.graph + def graph(shutdown_before_live: bool): + data = [(datetime(2020, 1, 1, 2), 1), (datetime(2020, 1, 1, 3), 2)] + adapter = MyPushPullAdapter(int, data, shutdown_before_live) + csp.print("adapter", adapter) + + with self.assertRaisesRegex(Exception, "Dummy exception message"): + csp.run(graph, True, starttime=datetime(2020, 1, 1, 1)) + with self.assertRaisesRegex(Exception, "Dummy exception message"): + csp.run( + graph, + False, + starttime=datetime(2020, 1, 1, 1), + endtime=datetime.utcnow() + timedelta(seconds=2), + realtime=True, + ) + if __name__ == "__main__": unittest.main() diff --git a/csp/tests/test_engine.py b/csp/tests/test_engine.py index adb67d37..9b069833 100644 --- a/csp/tests/test_engine.py +++ b/csp/tests/test_engine.py @@ -799,6 +799,48 @@ def graph(): b[t].append(v) self.assertEqual(results["b"], list(b.items())) + def test_adapter_manager_engine_shutdown(self): + from csp.impl.adaptermanager import AdapterManagerImpl, ManagedSimInputAdapter + from csp.impl.wiring import py_managed_adapter_def + + class TestAdapterManager: + def __init__(self): + self._impl = None + + def subscribe(self): + return TestAdapter(self) + + def _create(self, engine, memo): + self._impl = TestAdapterManagerImpl(engine) + return self._impl + + class TestAdapterManagerImpl(AdapterManagerImpl): + def __init__(self, engine): + super().__init__(engine) + + def start(self, starttime, endtime): + pass + + def stop(self): + pass + + def process_next_sim_timeslice(self, now): + self.engine_shutdown("Dummy exception message") + + class TestAdapterImpl(ManagedSimInputAdapter): + def __init__(self, manager_impl): + pass + + TestAdapter = py_managed_adapter_def("TestAdapter", TestAdapterImpl, ts[int], TestAdapterManager) + + def graph(): + adapter = TestAdapterManager() + nc = adapter.subscribe() + csp.add_graph_output("nc", nc) + + with self.assertRaisesRegex(Exception, "Dummy exception message"): + csp.run(graph, starttime=datetime(2020, 1, 1), endtime=timedelta(seconds=1)) + def test_feedback(self): # Dummy example class Request(csp.Struct):