Skip to content

Commit

Permalink
Expose engine shutdown method for Python push-type adapters
Browse files Browse the repository at this point in the history
Co-authored-by: Artur Gajowniczek <artur.gajowniczek@point72.com>
Signed-off-by: Adam Glustein <Adam.Glustein@Point72.com>
  • Loading branch information
AdamGlustein and argaj committed Jul 8, 2024
1 parent ab390db commit 247f831
Show file tree
Hide file tree
Showing 12 changed files with 214 additions and 5 deletions.
11 changes: 11 additions & 0 deletions cpp/csp/python/Exception.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,17 @@
namespace csp::python
{

class TracebackStringException : public std::exception
{
public:
TracebackStringException( const char * message ) : m_message( message ) {}

const char * what() const noexcept override { return m_message; }

private:
const char * m_message;
};

class PythonPassthrough : public csp::Exception
{
public:
Expand Down
18 changes: 16 additions & 2 deletions cpp/csp/python/PyAdapterManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,19 @@ class PyAdapterManager : public AdapterManager
static PyObject * PyAdapterManager_PyObject_starttime( PyAdapterManager_PyObject * self ) { return toPython( self -> manager -> starttime() ); }
static PyObject * PyAdapterManager_PyObject_endtime( PyAdapterManager_PyObject * self ) { return toPython( self -> manager -> endtime() ); }

static PyObject * PyAdapterManager_PyObject__engine_shutdown( PyAdapterManager_PyObject * self, PyObject * args, PyObject * kwargs)
{
CSP_BEGIN_METHOD;

const char * msg;
if( !PyArg_ParseTuple( args, "s", &msg ) )
return NULL;

self -> manager -> rootEngine() -> shutdown( std::make_exception_ptr( TracebackStringException( msg ) ) );

CSP_RETURN_NONE;
}

static int PyAdapterManager_init( PyAdapterManager_PyObject *self, PyObject *args, PyObject *kwds )
{
CSP_BEGIN_METHOD;
Expand All @@ -83,8 +96,9 @@ static int PyAdapterManager_init( PyAdapterManager_PyObject *self, PyObject *arg
}

static PyMethodDef PyAdapterManager_methods[] = {
{ "starttime", (PyCFunction) PyAdapterManager_PyObject_starttime, METH_NOARGS, "starttime" },
{ "endtime", (PyCFunction) PyAdapterManager_PyObject_endtime, METH_NOARGS, "endtime" },
{ "starttime", (PyCFunction) PyAdapterManager_PyObject_starttime, METH_NOARGS, "starttime" },
{ "endtime", (PyCFunction) PyAdapterManager_PyObject_endtime, METH_NOARGS, "endtime" },
{ "_engine_shutdown", (PyCFunction) PyAdapterManager_PyObject__engine_shutdown, METH_VARARGS, "_engine_shutdown" },
{NULL}
};

Expand Down
16 changes: 16 additions & 0 deletions cpp/csp/python/PyEngine.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#define _IN_CSP_PYTHON_PYENGINE_H

#include <csp/core/Time.h>
#include <csp/python/Exception.h>
#include <csp/engine/RootEngine.h>
#include <csp/python/PyObjectPtr.h>
#include <Python.h>
Expand Down Expand Up @@ -54,6 +55,21 @@ class CSPIMPL_EXPORT PyEngine final: public PyObject
Engine * m_engine;
};

// Generic engine shutdown function for push-type adapters
template<typename T>
PyObject * PyEngine_shutdown( T * self, PyObject * args, PyObject * kwargs )
{
CSP_BEGIN_METHOD;

const char * msg;
if( !PyArg_ParseTuple( args, "s", &msg ) )
return NULL;

self -> adapter -> rootEngine() -> shutdown( std::make_exception_ptr( TracebackStringException( msg ) ) );

CSP_RETURN_NONE;
}

};

#endif
3 changes: 2 additions & 1 deletion cpp/csp/python/PyPushInputAdapter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,8 @@ struct PyPushInputAdapter_PyObject
};

static PyMethodDef PyPushInputAdapter_PyObject_methods[] = {
{ "push_tick", (PyCFunction) PyPushInputAdapter_PyObject::pushTick, METH_VARARGS, "push new tick" },
{ "push_tick", (PyCFunction) PyPushInputAdapter_PyObject::pushTick, METH_VARARGS, "push new tick" },
{ "_engine_shutdown", (PyCFunction) PyEngine_shutdown<PyPushInputAdapter_PyObject>, METH_VARARGS, "engine shutdown" },
{NULL}
};

Expand Down
5 changes: 3 additions & 2 deletions cpp/csp/python/PyPushPullInputAdapter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,9 @@ struct PyPushPullInputAdapter_PyObject
};

static PyMethodDef PyPushPullInputAdapter_PyObject_methods[] = {
{ "push_tick", (PyCFunction) PyPushPullInputAdapter_PyObject::pushTick, METH_VARARGS, "push new tick" },
{ "flag_replay_complete", (PyCFunction) PyPushPullInputAdapter_PyObject::flagReplayComplete, METH_VARARGS, "finish replay ticks" },
{ "push_tick", (PyCFunction) PyPushPullInputAdapter_PyObject::pushTick, METH_VARARGS, "push new tick" },
{ "flag_replay_complete", (PyCFunction) PyPushPullInputAdapter_PyObject::flagReplayComplete, METH_VARARGS, "finish replay ticks" },
{ "_engine_shutdown", (PyCFunction) PyEngine_shutdown<PyPushPullInputAdapter_PyObject>, METH_VARARGS, "engine shutdown" },
{NULL}
};

Expand Down
4 changes: 4 additions & 0 deletions csp/impl/adaptermanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import csp
from csp.impl.__cspimpl import _cspimpl
from csp.impl.error_handling import format_engine_shutdown_stack


class AdapterManagerImpl(_cspimpl.PyAdapterManager):
Expand All @@ -24,6 +25,9 @@ def process_next_sim_timeslice(self, now):
"""
return None

def engine_shutdown(self, msg):
self._engine_shutdown(format_engine_shutdown_stack(msg))


class ManagedSimInputAdapter(_cspimpl.PyManagedSimInputAdapter):
def __init__(self, typ, field_map):
Expand Down
12 changes: 12 additions & 0 deletions csp/impl/error_handling.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import ast
import os
import traceback
from typing import List

import csp.impl
from csp.impl.__cspimpl import _cspimpl
Expand Down Expand Up @@ -48,3 +50,13 @@ def set_print_full_exception_stack(new_value: bool):
res = ExceptionContext.PRINT_EXCEPTION_FULL_STACK
ExceptionContext.PRINT_EXCEPTION_FULL_STACK = new_value
return res


def format_engine_shutdown_stack(msg: str):
tb = traceback.format_stack()
tb = tb[:-2] # remove traceback call and internal engine shutdown method
if len(tb) > 1:
tb = tb[-2:] # only keep the current function and the caller (adapter manager)
tb.append(msg) # add the user's error message
tb = "".join(tb) # format into a single string
return tb
4 changes: 4 additions & 0 deletions csp/impl/pushadapter.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from csp.impl.__cspimpl import _cspimpl
from csp.impl.error_handling import format_engine_shutdown_stack

PushGroup = _cspimpl.PushGroup
PushBatch = _cspimpl.PushBatch
Expand All @@ -11,5 +12,8 @@ def start(self, starttime, endtime):
def stop(self):
pass

def engine_shutdown(self, msg):
self._engine_shutdown(format_engine_shutdown_stack(msg))

# base class
# def push_tick( self, value )
4 changes: 4 additions & 0 deletions csp/impl/pushpulladapter.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from csp.impl.__cspimpl import _cspimpl
from csp.impl.error_handling import format_engine_shutdown_stack

PushGroup = _cspimpl.PushGroup
PushBatch = _cspimpl.PushBatch
Expand All @@ -11,5 +12,8 @@ def start(self, starttime, endtime):
def stop(self):
pass

def engine_shutdown(self, msg):
self._engine_shutdown(format_engine_shutdown_stack(msg))

# base class
# def push_tick( self, time, value )
44 changes: 44 additions & 0 deletions csp/tests/impl/test_pushadapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,50 @@ def graph():
result = list(x[1] for x in result)
self.assertEqual(result, expected)

def test_adapter_engine_shutdown(self):
class MyPushAdapterImpl(PushInputAdapter):
def __init__(self):
self._thread = None
self._running = False

def start(self, starttime, endtime):
self._running = True
self._thread = threading.Thread(target=self._run)
self._thread.start()

def stop(self):
if self._running:
self._running = False
self._thread.join()

def _run(self):
pushed = False
while self._running:
if pushed:
self.engine_shutdown("Dummy exception message")
else:
self.push_tick(0)
pushed = True

MyPushAdapter = py_push_adapter_def("MyPushAdapter", MyPushAdapterImpl, ts[int])

status = {"count": 0}

@csp.node
def node(x: ts[object]):
if csp.ticked(x):
status["count"] += 1

@csp.graph
def graph():
adapter = MyPushAdapter()
node(adapter)
csp.print("adapter", adapter)

with self.assertRaisesRegex(Exception, "Dummy exception message"):
csp.run(graph, starttime=datetime.utcnow(), realtime=True)
self.assertEqual(status["count"], 1)


if __name__ == "__main__":
unittest.main()
56 changes: 56 additions & 0 deletions csp/tests/impl/test_pushpulladapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,62 @@ def graph():
result = [out[1] for out in graph_out[0]]
self.assertEqual(result, [1, 2, 3])

def test_adapter_engine_shutdown(self):
class MyPushPullAdapterImpl(PushPullInputAdapter):
def __init__(self, typ, data, shutdown_before_live):
self._data = data
self._thread = None
self._running = False
self._shutdown_before_live = shutdown_before_live

def start(self, starttime, endtime):
self._running = True
self._thread = threading.Thread(target=self._run)
self._thread.start()

def stop(self):
if self._running:
self._running = False
self._thread.join()

def _run(self):
idx = 0
while self._running and idx < len(self._data):
if idx and self._shutdown_before_live:
self.engine_shutdown("Dummy exception message")
t, v = self._data[idx]
self.push_tick(False, t, v)
idx += 1
self.flag_replay_complete()

idx = 0
while self._running:
self.push_tick(True, datetime.utcnow(), len(self._data) + 1)
if idx and not self._shutdown_before_live:
self.engine_shutdown("Dummy exception message")
idx += 1

MyPushPullAdapter = py_pushpull_adapter_def(
"MyPushPullAdapter", MyPushPullAdapterImpl, ts["T"], typ="T", data=list, shutdown_before_live=bool
)

@csp.graph
def graph(shutdown_before_live: bool):
data = [(datetime(2020, 1, 1, 2), 1), (datetime(2020, 1, 1, 3), 2)]
adapter = MyPushPullAdapter(int, data, shutdown_before_live)
csp.print("adapter", adapter)

with self.assertRaisesRegex(Exception, "Dummy exception message"):
csp.run(graph, True, starttime=datetime(2020, 1, 1, 1))
with self.assertRaisesRegex(Exception, "Dummy exception message"):
csp.run(
graph,
False,
starttime=datetime(2020, 1, 1, 1),
endtime=datetime.utcnow() + timedelta(seconds=2),
realtime=True,
)


if __name__ == "__main__":
unittest.main()
42 changes: 42 additions & 0 deletions csp/tests/test_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,48 @@ def graph():
b[t].append(v)
self.assertEqual(results["b"], list(b.items()))

def test_adapter_manager_engine_shutdown(self):
from csp.impl.adaptermanager import AdapterManagerImpl, ManagedSimInputAdapter
from csp.impl.wiring import py_managed_adapter_def

class TestAdapterManager:
def __init__(self):
self._impl = None

def subscribe(self):
return TestAdapter(self)

def _create(self, engine, memo):
self._impl = TestAdapterManagerImpl(engine)
return self._impl

class TestAdapterManagerImpl(AdapterManagerImpl):
def __init__(self, engine):
super().__init__(engine)

def start(self, starttime, endtime):
pass

def stop(self):
pass

def process_next_sim_timeslice(self, now):
self.engine_shutdown("Dummy exception message")

class TestAdapterImpl(ManagedSimInputAdapter):
def __init__(self, manager_impl):
pass

TestAdapter = py_managed_adapter_def("TestAdapter", TestAdapterImpl, ts[int], TestAdapterManager)

def graph():
adapter = TestAdapterManager()
nc = adapter.subscribe()
csp.add_graph_output("nc", nc)

with self.assertRaisesRegex(Exception, "Dummy exception message"):
csp.run(graph, starttime=datetime(2020, 1, 1), endtime=timedelta(seconds=1))

def test_feedback(self):
# Dummy example
class Request(csp.Struct):
Expand Down

0 comments on commit 247f831

Please sign in to comment.