Skip to content

Commit

Permalink
expose engine shutdown to adapter manager and push/pushpull adapters (#…
Browse files Browse the repository at this point in the history
…312)

Expose engine shutdown to adapter manager and push/pushpull adapters

---------

Signed-off-by: Gajowniczek, Artur <artur.gajowniczek@point72.com>
Co-authored-by: Adam Glustein <Adam.Glustein@Point72.com>
  • Loading branch information
argaj and AdamGlustein authored Jul 25, 2024
1 parent 4681d7b commit dce7312
Show file tree
Hide file tree
Showing 9 changed files with 229 additions and 6 deletions.
12 changes: 11 additions & 1 deletion cpp/csp/python/Exception.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,20 @@ class PythonPassthrough : public csp::Exception
csp::Exception( exType, r, file, func, line )
{
//Fetch the current error to clear out the error indicator while the stack gets unwound
//We own the references to all the members assigned in PyErr_Fetch
//We need to hold the reference since PyErr_Restore takes back a reference to each of its arguments
PyErr_Fetch( &m_type, &m_value, &m_traceback );
}

PythonPassthrough( PyObject * pyException ) :
csp::Exception( "", "" )
{
// Note: all of these methods return strong references, so we own them like in the other constructor
m_type = PyObject_Type( pyException );
m_value = PyObject_Str( pyException );
m_traceback = PyException_GetTraceback( pyException );
}

void restore()
{
if( !description().empty() )
Expand All @@ -39,7 +50,6 @@ class PythonPassthrough : public csp::Exception
PyObject * m_type;
PyObject * m_value;
PyObject * m_traceback;

};

CSP_DECLARE_EXCEPTION( AttributeError, ::csp::Exception );
Expand Down
14 changes: 12 additions & 2 deletions cpp/csp/python/PyAdapterManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,15 @@ class PyAdapterManager : public AdapterManager
static PyObject * PyAdapterManager_PyObject_starttime( PyAdapterManager_PyObject * self ) { return toPython( self -> manager -> starttime() ); }
static PyObject * PyAdapterManager_PyObject_endtime( PyAdapterManager_PyObject * self ) { return toPython( self -> manager -> endtime() ); }

static PyObject * PyAdapterManager_PyObject_shutdown_engine( PyAdapterManager_PyObject * self, PyObject * pyException )
{
CSP_BEGIN_METHOD;

self -> manager -> rootEngine() -> shutdown( PyEngine_shutdown_make_exception( pyException ) );

CSP_RETURN_NONE;
}

static int PyAdapterManager_init( PyAdapterManager_PyObject *self, PyObject *args, PyObject *kwds )
{
CSP_BEGIN_METHOD;
Expand All @@ -83,8 +92,9 @@ static int PyAdapterManager_init( PyAdapterManager_PyObject *self, PyObject *arg
}

static PyMethodDef PyAdapterManager_methods[] = {
{ "starttime", (PyCFunction) PyAdapterManager_PyObject_starttime, METH_NOARGS, "starttime" },
{ "endtime", (PyCFunction) PyAdapterManager_PyObject_endtime, METH_NOARGS, "endtime" },
{ "starttime", (PyCFunction) PyAdapterManager_PyObject_starttime, METH_NOARGS, "starttime" },
{ "endtime", (PyCFunction) PyAdapterManager_PyObject_endtime, METH_NOARGS, "endtime" },
{ "shutdown_engine", (PyCFunction) PyAdapterManager_PyObject_shutdown_engine, METH_O, "shutdown_engine" },
{NULL}
};

Expand Down
16 changes: 16 additions & 0 deletions cpp/csp/python/PyEngine.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#define _IN_CSP_PYTHON_PYENGINE_H

#include <csp/core/Time.h>
#include <csp/python/Exception.h>
#include <csp/engine/RootEngine.h>
#include <csp/python/PyObjectPtr.h>
#include <Python.h>
Expand Down Expand Up @@ -54,6 +55,21 @@ class CSPIMPL_EXPORT PyEngine final: public PyObject
Engine * m_engine;
};

inline std::exception_ptr PyEngine_shutdown_make_exception( PyObject * pyException )
{
if( !PyExceptionInstance_Check( pyException ) )
{
PyObjectPtr pyExceptionStr = PyObjectPtr::own( PyObject_Str( pyException ) );
if( !pyExceptionStr.ptr() )
CSP_THROW( PythonPassthrough, "" );
std::string pyExceptionString = PyUnicode_AsUTF8( pyExceptionStr.ptr() );
std::string desc = "Expected Exception object as argument for shutdown_engine: got " + pyExceptionString + " of type " + Py_TYPE( pyException ) -> tp_name;
return std::make_exception_ptr( csp::Exception( "TypeError", desc ) );
}
else
return std::make_exception_ptr( PythonPassthrough( pyException ) );
}

};

#endif
12 changes: 11 additions & 1 deletion cpp/csp/python/PyPushInputAdapter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,11 +198,21 @@ struct PyPushInputAdapter_PyObject
CSP_RETURN_NONE;
}

static PyObject * shutdown_engine( PyPushInputAdapter_PyObject * self, PyObject * pyException )
{
CSP_BEGIN_METHOD;

self -> adapter -> rootEngine() -> shutdown( PyEngine_shutdown_make_exception( pyException ) );

CSP_RETURN_NONE;
}

static PyTypeObject PyType;
};

static PyMethodDef PyPushInputAdapter_PyObject_methods[] = {
{ "push_tick", (PyCFunction) PyPushInputAdapter_PyObject::pushTick, METH_VARARGS, "push new tick" },
{ "push_tick", (PyCFunction) PyPushInputAdapter_PyObject::pushTick, METH_VARARGS, "push new tick" },
{ "shutdown_engine", (PyCFunction) PyPushInputAdapter_PyObject::shutdown_engine, METH_O, "shutdown_engine" },
{NULL}
};

Expand Down
14 changes: 12 additions & 2 deletions cpp/csp/python/PyPushPullInputAdapter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,12 +119,22 @@ struct PyPushPullInputAdapter_PyObject
CSP_RETURN_NONE;
}

static PyObject * shutdown_engine( PyPushPullInputAdapter_PyObject * self, PyObject * pyException )
{
CSP_BEGIN_METHOD;

self -> adapter -> rootEngine() -> shutdown( PyEngine_shutdown_make_exception( pyException ) );

CSP_RETURN_NONE;
}

static PyTypeObject PyType;
};

static PyMethodDef PyPushPullInputAdapter_PyObject_methods[] = {
{ "push_tick", (PyCFunction) PyPushPullInputAdapter_PyObject::pushTick, METH_VARARGS, "push new tick" },
{ "flag_replay_complete", (PyCFunction) PyPushPullInputAdapter_PyObject::flagReplayComplete, METH_VARARGS, "finish replay ticks" },
{ "push_tick", (PyCFunction) PyPushPullInputAdapter_PyObject::pushTick, METH_VARARGS, "push new tick" },
{ "flag_replay_complete", (PyCFunction) PyPushPullInputAdapter_PyObject::flagReplayComplete, METH_VARARGS, "finish replay ticks" },
{ "shutdown_engine", (PyCFunction) PyPushPullInputAdapter_PyObject::shutdown_engine, METH_O, "shutdown engine" },
{NULL}
};

Expand Down
45 changes: 45 additions & 0 deletions csp/tests/impl/test_pushadapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,51 @@ def graph():
result = list(x[1] for x in result)
self.assertEqual(result, expected)

def test_adapter_engine_shutdown(self):
class MyPushAdapterImpl(PushInputAdapter):
def __init__(self):
self._thread = None
self._running = False

def start(self, starttime, endtime):
self._running = True
self._thread = threading.Thread(target=self._run)
self._thread.start()

def stop(self):
if self._running:
self._running = False
self._thread.join()

def _run(self):
pushed = False
while self._running:
if pushed:
time.sleep(0.1)
self.shutdown_engine(TypeError("Dummy exception message"))
else:
self.push_tick(0)
pushed = True

MyPushAdapter = py_push_adapter_def("MyPushAdapter", MyPushAdapterImpl, ts[int])

status = {"count": 0}

@csp.node
def node(x: ts[object]):
if csp.ticked(x):
status["count"] += 1

@csp.graph
def graph():
adapter = MyPushAdapter()
node(adapter)
csp.print("adapter", adapter)

with self.assertRaisesRegex(TypeError, "Dummy exception message"):
csp.run(graph, starttime=datetime.utcnow(), realtime=True)
self.assertEqual(status["count"], 1)

def test_help(self):
# for `help` to work on adapters, signature must be defined
sig = inspect.signature(test_adapter)
Expand Down
58 changes: 58 additions & 0 deletions csp/tests/impl/test_pushpulladapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,64 @@ def graph():
result = [out[1] for out in graph_out[0]]
self.assertEqual(result, [1, 2, 3])

def test_adapter_engine_shutdown(self):
class MyPushPullAdapterImpl(PushPullInputAdapter):
def __init__(self, typ, data, shutdown_before_live):
self._data = data
self._thread = None
self._running = False
self._shutdown_before_live = shutdown_before_live

def start(self, starttime, endtime):
self._running = True
self._thread = threading.Thread(target=self._run)
self._thread.start()

def stop(self):
if self._running:
self._running = False
self._thread.join()

def _run(self):
idx = 0
while self._running and idx < len(self._data):
if idx and self._shutdown_before_live:
time.sleep(0.1)
self.shutdown_engine(ValueError("Dummy exception message"))
t, v = self._data[idx]
self.push_tick(False, t, v)
idx += 1
self.flag_replay_complete()

idx = 0
while self._running:
self.push_tick(True, datetime.utcnow(), len(self._data) + 1)
if idx and not self._shutdown_before_live:
time.sleep(0.1)
self.shutdown_engine(TypeError("Dummy exception message"))
idx += 1

MyPushPullAdapter = py_pushpull_adapter_def(
"MyPushPullAdapter", MyPushPullAdapterImpl, ts["T"], typ="T", data=list, shutdown_before_live=bool
)

@csp.graph
def graph(shutdown_before_live: bool):
data = [(datetime(2020, 1, 1, 2), 1), (datetime(2020, 1, 1, 3), 2)]
adapter = MyPushPullAdapter(int, data, shutdown_before_live)
csp.print("adapter", adapter)

with self.assertRaisesRegex(ValueError, "Dummy exception message"):
csp.run(graph, True, starttime=datetime(2020, 1, 1, 1))
with self.assertRaisesRegex(TypeError, "Dummy exception message"):
csp.run(
graph,
False,
starttime=datetime(2020, 1, 1, 1),
endtime=datetime.utcnow() + timedelta(seconds=2),
realtime=True,
)


if __name__ == "__main__":
unittest.main()
49 changes: 49 additions & 0 deletions csp/tests/test_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,55 @@ def graph():
b[t].append(v)
self.assertEqual(results["b"], list(b.items()))

def test_adapter_manager_engine_shutdown(self):
from csp.impl.adaptermanager import AdapterManagerImpl, ManagedSimInputAdapter
from csp.impl.wiring import py_managed_adapter_def

class TestAdapterManager:
def __init__(self):
self._impl = None

def subscribe(self):
return TestAdapter(self)

def _create(self, engine, memo):
self._impl = TestAdapterManagerImpl(engine)
return self._impl

class TestAdapterManagerImpl(AdapterManagerImpl):
def __init__(self, engine):
super().__init__(engine)

def start(self, starttime, endtime):
pass

def stop(self):
pass

def process_next_sim_timeslice(self, now):
try:
[].pop()
except IndexError as e:
self.shutdown_engine(e)

class TestAdapterImpl(ManagedSimInputAdapter):
def __init__(self, manager_impl):
pass

TestAdapter = py_managed_adapter_def("TestAdapter", TestAdapterImpl, ts[int], TestAdapterManager)

def graph():
adapter = TestAdapterManager()
nc = adapter.subscribe()
csp.add_graph_output("nc", nc)

try:
csp.run(graph, starttime=datetime(2020, 1, 1), endtime=timedelta(seconds=1))
except IndexError:
tb = traceback.format_exc()

self.assertTrue("[].pop()" in tb and "process_next_sim_timeslice" in tb)

def test_feedback(self):
# Dummy example
class Request(csp.Struct):
Expand Down
15 changes: 15 additions & 0 deletions docs/wiki/how-tos/Write-Realtime-Input-Adapters.md
Original file line number Diff line number Diff line change
Expand Up @@ -405,3 +405,18 @@ csp.run(my_graph, starttime=datetime.utcnow(), endtime=timedelta(seconds=10), re
```

Do note that realtime adapters will only run in realtime engines (note the `realtime=True` argument to `csp.run`).

## Engine shutdown

In case a pushing thread hits a terminal error, an exception can be passed to the main engine thread to shut down gracefully through a `shutdown_engine(exc: Exception)` method exposed by `PushInputAdapter`, `PushPullInputAdapter` and `AdapterManagerImpl`.

For example:

```python
def _run(self):
while self._running:
try:
requests.get(endpoint) # API call over a network, may fail
except Exception as exc:
self.shutdown_engine(exc)
```

0 comments on commit dce7312

Please sign in to comment.