Skip to content

Commit

Permalink
Merge pull request #355 from wrieg123/fix-burst-in-ws
Browse files Browse the repository at this point in the history
Adds burst to websocket adapter; move build to c++20
  • Loading branch information
wrieg123 authored Aug 12, 2024
2 parents cc77b7d + e91797b commit ab64b4e
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 9 deletions.
4 changes: 2 additions & 2 deletions cpp/csp/adapters/websocket/ClientInputAdapter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ ClientInputAdapter::ClientInputAdapter(
void ClientInputAdapter::processMessage( void* c, size_t t, PushBatch* batch )
{

if( type() -> type() == CspType::Type::STRUCT )
if( dataType() -> type() == CspType::Type::STRUCT )
{
auto tick = m_converter -> asStruct( c, t );
pushTick( std::move(tick), batch );
} else if ( type() -> type() == CspType::Type::STRING )
} else if ( dataType() -> type() == CspType::Type::STRING )
{
pushTick( std::string((char const*)c, t), batch );
}
Expand Down
8 changes: 3 additions & 5 deletions cpp/csp/python/adapters/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,7 @@ if(CSP_BUILD_PARQUET_ADAPTER)
endif()

if(CSP_BUILD_WS_CLIENT_ADAPTER)
set(CMAKE_CXX_STANDARD 17)
add_library(websocketadapterimpl SHARED websocketadapterimpl.cpp)
target_link_libraries(websocketadapterimpl csp_core csp_engine cspimpl csp_websocket_client_adapter)
install(TARGETS websocketadapterimpl RUNTIME DESTINATION ${CSP_RUNTIME_INSTALL_SUBDIR})
set(CMAKE_CXX_STANDARD 20)
add_library(websocketadapterimpl SHARED websocketadapterimpl.cpp)
target_link_libraries(websocketadapterimpl csp_core csp_engine cspimpl csp_websocket_client_adapter)
install(TARGETS websocketadapterimpl RUNTIME DESTINATION ${CSP_RUNTIME_INSTALL_SUBDIR})
endif()
4 changes: 2 additions & 2 deletions csp/adapters/websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ def subscribe(
properties["field_map"] = field_map
properties["meta_field_map"] = meta_field_map

return _websocket_input_adapter_def(self, ts_type, properties, push_mode)
return _websocket_input_adapter_def(self, ts_type, properties, push_mode=push_mode)

def send(self, x: ts["T"]):
return _websocket_output_adapter_def(self, x)
Expand All @@ -452,7 +452,7 @@ def update_headers(self, x: ts[List[WebsocketHeaderUpdate]]):

def status(self, push_mode=csp.PushMode.NON_COLLAPSING):
ts_type = Status
return status_adapter_def(self, ts_type, push_mode)
return status_adapter_def(self, ts_type, push_mode=push_mode)

def _create(self, engine, memo):
"""method needs to return the wrapped c++ adapter manager"""
Expand Down
33 changes: 33 additions & 0 deletions csp/tests/adapters/test_websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import threading
import unittest
from datetime import datetime, timedelta
from typing import List

import csp
from csp import ts
Expand Down Expand Up @@ -126,3 +127,35 @@ def g():
csp.stop_engine(ws.status())

csp.run(g, starttime=datetime.now(pytz.UTC), realtime=True)

def test_send_recv_burst_json(self):
class MsgStruct(csp.Struct):
a: int
b: str

@csp.node
def send_msg_on_open(status: ts[Status]) -> ts[str]:
if csp.ticked(status):
return MsgStruct(a=1234, b="im a string").to_json()

@csp.node
def my_edge_that_handles_burst(objs: ts[List[MsgStruct]]) -> ts[bool]:
if csp.ticked(objs):
return True

@csp.graph
def g():
ws = WebsocketAdapterManager("ws://localhost:8000/")
status = ws.status()
ws.send(send_msg_on_open(status))
recv = ws.subscribe(MsgStruct, JSONTextMessageMapper(), push_mode=csp.PushMode.BURST)
_ = my_edge_that_handles_burst(recv)
csp.add_graph_output("recv", recv)
csp.stop_engine(recv)

msgs = csp.run(g, starttime=datetime.now(pytz.UTC), realtime=True)
obj = msgs["recv"][0][1]
assert isinstance(obj, list)
innerObj = obj[0]
assert innerObj.a == 1234
assert innerObj.b == "im a string"

0 comments on commit ab64b4e

Please sign in to comment.