Skip to content

Commit

Permalink
adds burst to websocket adapter; move build to c++20
Browse files Browse the repository at this point in the history
Signed-off-by: Will Rieger <will.r.rieger@gmail.com>
  • Loading branch information
wrieg123 committed Aug 9, 2024
1 parent cf7e633 commit d4ad630
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 10 deletions.
4 changes: 2 additions & 2 deletions cpp/csp/adapters/websocket/ClientInputAdapter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ ClientInputAdapter::ClientInputAdapter(
void ClientInputAdapter::processMessage( void* c, size_t t, PushBatch* batch )
{

if( type() -> type() == CspType::Type::STRUCT )
if( dataType() -> type() == CspType::Type::STRUCT )
{
auto tick = m_converter -> asStruct( c, t );
pushTick( std::move(tick), batch );
} else if ( type() -> type() == CspType::Type::STRING )
} else if ( dataType() -> type() == CspType::Type::STRING )
{
pushTick( std::string((char const*)c, t), batch );
}
Expand Down
8 changes: 3 additions & 5 deletions cpp/csp/python/adapters/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,7 @@ if(CSP_BUILD_PARQUET_ADAPTER)
endif()

if(CSP_BUILD_WS_CLIENT_ADAPTER)
set(CMAKE_CXX_STANDARD 17)
add_library(websocketadapterimpl SHARED websocketadapterimpl.cpp)
target_link_libraries(websocketadapterimpl csp_core csp_engine cspimpl csp_websocket_client_adapter)
install(TARGETS websocketadapterimpl RUNTIME DESTINATION ${CSP_RUNTIME_INSTALL_SUBDIR})
set(CMAKE_CXX_STANDARD 20)
add_library(websocketadapterimpl SHARED websocketadapterimpl.cpp)
target_link_libraries(websocketadapterimpl csp_core csp_engine cspimpl csp_websocket_client_adapter)
install(TARGETS websocketadapterimpl RUNTIME DESTINATION ${CSP_RUNTIME_INSTALL_SUBDIR})
endif()
4 changes: 2 additions & 2 deletions csp/adapters/websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ def subscribe(
properties["field_map"] = field_map
properties["meta_field_map"] = meta_field_map

return _websocket_input_adapter_def(self, ts_type, properties, push_mode)
return _websocket_input_adapter_def(self, ts_type, properties, push_mode=push_mode)

def send(self, x: ts["T"]):
return _websocket_output_adapter_def(self, x)
Expand All @@ -453,7 +453,7 @@ def update_headers(self, x: ts[List[WebsocketHeaderUpdate]]):

def status(self, push_mode=csp.PushMode.NON_COLLAPSING):
ts_type = Status
return status_adapter_def(self, ts_type, push_mode)
return status_adapter_def(self, ts_type, push_mode=push_mode)

def _create(self, engine, memo):
"""method needs to return the wrapped c++ adapter manager"""
Expand Down
3 changes: 2 additions & 1 deletion csp/impl/wiring/adapters.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from datetime import timedelta
from typing import List

from csp.impl.__cspimpl import _cspimpl
from csp.impl.mem_cache import csp_memoized_graph_object
Expand Down Expand Up @@ -30,7 +31,7 @@ def _instantiate_impl(cls, __forced_tvars, name, args, kwargs):
# Note that we augment the returned Edge to be list of expected type, but not the output def
# output def remains the original type
if kwargs.get("push_mode", None) == PushMode.BURST:
output.tstype = tstype.ts[[output.tstype.typ]]
output.tstype = tstype.ts[List[output.tstype.typ]]

return output

Expand Down
33 changes: 33 additions & 0 deletions csp/tests/adapters/test_websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import threading
import unittest
from datetime import datetime, timedelta
from typing import List

import csp
from csp import ts
Expand Down Expand Up @@ -126,3 +127,35 @@ def g():
csp.stop_engine(ws.status())

csp.run(g, starttime=datetime.now(pytz.UTC), realtime=True)

def test_send_recv_burst_json(self):
class MsgStruct(csp.Struct):
a: int
b: str

@csp.node
def send_msg_on_open(status: ts[Status]) -> ts[str]:
if csp.ticked(status):
return MsgStruct(a=1234, b="im a string").to_json()

@csp.node
def my_edge_that_handles_burst(objs: ts[List[MsgStruct]]) -> ts[bool]:
if csp.ticked(objs):
return True

@csp.graph
def g():
ws = WebsocketAdapterManager("ws://localhost:8000/")
status = ws.status()
ws.send(send_msg_on_open(status))
recv = ws.subscribe(MsgStruct, JSONTextMessageMapper(), push_mode=csp.PushMode.BURST)
_ = my_edge_that_handles_burst(recv)
csp.add_graph_output("recv", recv)
csp.stop_engine(recv)

msgs = csp.run(g, starttime=datetime.now(pytz.UTC), realtime=True)
obj = msgs["recv"][0][1]
assert isinstance(obj, list)
innerObj = obj[0]
assert innerObj.a == 1234
assert innerObj.b == "im a string"

0 comments on commit d4ad630

Please sign in to comment.