From d4ad630b75f33397840487667da00ff7945a1218 Mon Sep 17 00:00:00 2001 From: Will Rieger Date: Sat, 20 Jul 2024 11:01:29 -0400 Subject: [PATCH] adds burst to websocket adapter; move build to c++20 Signed-off-by: Will Rieger --- .../adapters/websocket/ClientInputAdapter.cpp | 4 +-- cpp/csp/python/adapters/CMakeLists.txt | 8 ++--- csp/adapters/websocket.py | 4 +-- csp/impl/wiring/adapters.py | 3 +- csp/tests/adapters/test_websocket.py | 33 +++++++++++++++++++ 5 files changed, 42 insertions(+), 10 deletions(-) diff --git a/cpp/csp/adapters/websocket/ClientInputAdapter.cpp b/cpp/csp/adapters/websocket/ClientInputAdapter.cpp index 548b0bc04..e4b0b7ff7 100644 --- a/cpp/csp/adapters/websocket/ClientInputAdapter.cpp +++ b/cpp/csp/adapters/websocket/ClientInputAdapter.cpp @@ -28,11 +28,11 @@ ClientInputAdapter::ClientInputAdapter( void ClientInputAdapter::processMessage( void* c, size_t t, PushBatch* batch ) { - if( type() -> type() == CspType::Type::STRUCT ) + if( dataType() -> type() == CspType::Type::STRUCT ) { auto tick = m_converter -> asStruct( c, t ); pushTick( std::move(tick), batch ); - } else if ( type() -> type() == CspType::Type::STRING ) + } else if ( dataType() -> type() == CspType::Type::STRING ) { pushTick( std::string((char const*)c, t), batch ); } diff --git a/cpp/csp/python/adapters/CMakeLists.txt b/cpp/csp/python/adapters/CMakeLists.txt index a82fb3817..abcae763e 100644 --- a/cpp/csp/python/adapters/CMakeLists.txt +++ b/cpp/csp/python/adapters/CMakeLists.txt @@ -40,9 +40,7 @@ if(CSP_BUILD_PARQUET_ADAPTER) endif() if(CSP_BUILD_WS_CLIENT_ADAPTER) - set(CMAKE_CXX_STANDARD 17) - add_library(websocketadapterimpl SHARED websocketadapterimpl.cpp) - target_link_libraries(websocketadapterimpl csp_core csp_engine cspimpl csp_websocket_client_adapter) - install(TARGETS websocketadapterimpl RUNTIME DESTINATION ${CSP_RUNTIME_INSTALL_SUBDIR}) - set(CMAKE_CXX_STANDARD 20) + add_library(websocketadapterimpl SHARED websocketadapterimpl.cpp) + target_link_libraries(websocketadapterimpl csp_core csp_engine cspimpl csp_websocket_client_adapter) + install(TARGETS websocketadapterimpl RUNTIME DESTINATION ${CSP_RUNTIME_INSTALL_SUBDIR}) endif() diff --git a/csp/adapters/websocket.py b/csp/adapters/websocket.py index e65232a13..384e4a251 100644 --- a/csp/adapters/websocket.py +++ b/csp/adapters/websocket.py @@ -443,7 +443,7 @@ def subscribe( properties["field_map"] = field_map properties["meta_field_map"] = meta_field_map - return _websocket_input_adapter_def(self, ts_type, properties, push_mode) + return _websocket_input_adapter_def(self, ts_type, properties, push_mode=push_mode) def send(self, x: ts["T"]): return _websocket_output_adapter_def(self, x) @@ -453,7 +453,7 @@ def update_headers(self, x: ts[List[WebsocketHeaderUpdate]]): def status(self, push_mode=csp.PushMode.NON_COLLAPSING): ts_type = Status - return status_adapter_def(self, ts_type, push_mode) + return status_adapter_def(self, ts_type, push_mode=push_mode) def _create(self, engine, memo): """method needs to return the wrapped c++ adapter manager""" diff --git a/csp/impl/wiring/adapters.py b/csp/impl/wiring/adapters.py index 48d0e6ba1..9c51bf8eb 100644 --- a/csp/impl/wiring/adapters.py +++ b/csp/impl/wiring/adapters.py @@ -1,4 +1,5 @@ from datetime import timedelta +from typing import List from csp.impl.__cspimpl import _cspimpl from csp.impl.mem_cache import csp_memoized_graph_object @@ -30,7 +31,7 @@ def _instantiate_impl(cls, __forced_tvars, name, args, kwargs): # Note that we augment the returned Edge to be list of expected type, but not the output def # output def remains the original type if kwargs.get("push_mode", None) == PushMode.BURST: - output.tstype = tstype.ts[[output.tstype.typ]] + output.tstype = tstype.ts[List[output.tstype.typ]] return output diff --git a/csp/tests/adapters/test_websocket.py b/csp/tests/adapters/test_websocket.py index bd2c6b6ba..41aeb3311 100644 --- a/csp/tests/adapters/test_websocket.py +++ b/csp/tests/adapters/test_websocket.py @@ -3,6 +3,7 @@ import threading import unittest from datetime import datetime, timedelta +from typing import List import csp from csp import ts @@ -126,3 +127,35 @@ def g(): csp.stop_engine(ws.status()) csp.run(g, starttime=datetime.now(pytz.UTC), realtime=True) + + def test_send_recv_burst_json(self): + class MsgStruct(csp.Struct): + a: int + b: str + + @csp.node + def send_msg_on_open(status: ts[Status]) -> ts[str]: + if csp.ticked(status): + return MsgStruct(a=1234, b="im a string").to_json() + + @csp.node + def my_edge_that_handles_burst(objs: ts[List[MsgStruct]]) -> ts[bool]: + if csp.ticked(objs): + return True + + @csp.graph + def g(): + ws = WebsocketAdapterManager("ws://localhost:8000/") + status = ws.status() + ws.send(send_msg_on_open(status)) + recv = ws.subscribe(MsgStruct, JSONTextMessageMapper(), push_mode=csp.PushMode.BURST) + _ = my_edge_that_handles_burst(recv) + csp.add_graph_output("recv", recv) + csp.stop_engine(recv) + + msgs = csp.run(g, starttime=datetime.now(pytz.UTC), realtime=True) + obj = msgs["recv"][0][1] + assert isinstance(obj, list) + innerObj = obj[0] + assert innerObj.a == 1234 + assert innerObj.b == "im a string"